قراءة PySpark JSON ()

Qra T Pyspark Json



أثناء العمل مع PySpark DataFrames ، يجب تخزينه في PySpark DataFrame إذا كنت تريد معالجة بيانات JSON. بعد التخزين في DataFrame ، يمكننا تطبيق العمليات والأساليب المختلفة على البيانات. أيضًا ، هناك العديد من المزايا إذا قمنا بتحويل JSON إلى PySpark DataFrame لأنه بسيط ويمكننا تحويل / تقسيم البيانات بطريقة أبسط.

موضوع المحتويات:

قراءة JSON في PySpark DataFrame باستخدام Pandas.read_json ()







قراءة JSON إلى PySpark DataFrame باستخدام Spark.read.json ()



قراءة JSON إلى PySpark DataFrame باستخدام PySpark SQL



في هذا البرنامج التعليمي ، سننظر في كيفية قراءة JSON في PySpark DataFrame باستخدام pandas.read_json () و spark.read.json () و spark.sql. في جميع السيناريوهات ، سننظر في الأمثلة المختلفة من خلال النظر في تنسيقات JSON المختلفة.





قم بتثبيت مكتبة PySpark قبل تنفيذ الأمثلة التالية.

نقطة تثبيت pyspark

بعد التثبيت الناجح ، يمكنك رؤية الإخراج على النحو التالي:



قراءة JSON في PySpark DataFrame باستخدام Pandas.read_json ()

في PySpark ، يتم استخدام طريقة createDataFrame () لإنشاء DataFrame مباشرة. هنا ، نحتاج فقط إلى تمرير ملف / مسار JSON إلى ملف JSON من خلال طريقة pandas.read_json (). تأخذ طريقة read_json () هذه اسم الملف / المسار المتاح في وحدة Pandas. هذا هو السبب في أنه من الضروري استيراد واستخدام وحدة Pandas.

بناء الجملة:

spark_app.createDataFrame (pandas.read_json ( 'file_name.json' ))

مثال:

لننشئ ملف JSON باسم 'student_skill.json' يحتوي على سجلين. هنا ، المفاتيح / الأعمدة هي 'الطالب 1' و 'الطالب 2'. الصفوف هي الاسم والعمر والمهارة 1 والمهارة 2.

استيراد pyspark

استيراد الباندا

من pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName ( 'تلميح Linux' ) .getOrCreate ()

# استخدام pandas.read_json ()

filter_skills = linuxhint_spark_app.createDataFrame (pandas.read_json ( 'student_skill.json' ))

filter_skills.show ()

انتاج:

يمكننا أن نرى أن بيانات JSON يتم تحويلها إلى PySpark DataFrame بأعمدة وصفوف محددة.

2. قراءة JSON إلى PySpark DataFrame باستخدام Spark.read.json ()

() read.json هي طريقة مشابهة لـ read_json () في Pandas. هنا ، تأخذ read.json () مسارًا إلى JSON أو مباشرة إلى ملف JSON ، وتحميله مباشرة في PySpark DataFrame. ليست هناك حاجة لاستخدام طريقة createDataFrame () في هذا السيناريو. إذا كنت تريد قراءة ملفات JSON متعددة في وقت واحد ، فنحن بحاجة إلى تمرير قائمة بأسماء ملفات JSON من خلال قائمة مفصولة بفاصلة. يتم تخزين جميع سجلات JSON في DataFrame واحد.

بناء الجملة:

ملف واحد - spark_app.read.json ( 'file_name.json' )

ملفات متعددة - spark_app.read.json ([ 'file1.json' و 'file2.json' ، ...])

السيناريو 1: قراءة JSON التي تحتوي على سطر واحد

إذا كان ملف JSON الخاص بك موجودًا في تنسيقات السجل 1 ، أو السجل 2 ، أو السجل 3 ... (سطر واحد) ، فيمكننا تسميته بتنسيق JSON مع أسطر مفردة. يقوم Spark بمعالجة هذه السجلات وتخزينها في PySpark DataFrame كصفوف. كل سجل عبارة عن صف في PySpark DataFrame.

لننشئ ملف JSON باسم 'filter_skills.json' يحتوي على 3 سجلات. اقرأ JSON هذا في PySpark DataFrame.

استيراد pyspark

من pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName ( 'تلميح Linux' ) .getOrCreate ()

# اقرأ filter_skills.json في PySpark DataFrame

filter_skills = linuxhint_spark_app.read.json ( 'filter_skills.json' )

filter_skills.show ()

انتاج:

يمكننا أن نرى أن بيانات JSON يتم تحويلها إلى PySpark DataFrame بسجلات وأسماء أعمدة محددة.

السيناريو 2: قراءة JSON التي لها سطور متعددة

إذا كان ملف JSON الخاص بك يحتوي على أسطر متعددة ، فأنت بحاجة إلى استخدام طريقة read.option (). json () لتمرير المعامل متعدد الأسطر الذي يجب ضبطه على صحيح. هذا يسمح لنا بتحميل JSON الذي يحتوي على أسطر متعددة في PySpark DataFrame.

read.option ( 'متعدد الأسطر' و 'حقيقي' ) .json ( 'file_name.json' )

لننشئ ملف JSON باسم 'multi.json' يحتوي على 3 سجلات. اقرأ JSON هذا في PySpark DataFrame.

استيراد pyspark

من pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName ( 'تلميح Linux' ) .getOrCreate ()

# اقرأ multi.json (وجود عدة أسطر) في PySpark DataFrame

مرشح_المهارات = linuxhint_spark_app.read.option ( 'متعدد الأسطر' و 'حقيقي' ) .json ( 'multi.json' )

filter_skills.show ()

انتاج:

السيناريو 3: قراءة JSON المتعددة

لقد ناقشنا بالفعل في مرحلة البداية من هذا البرنامج التعليمي فيما يتعلق بملفات JSON المتعددة. إذا كنت ترغب في قراءة ملفات JSON متعددة في وقت واحد وتخزينها في PySpark DataFrame واحد ، فنحن بحاجة إلى تمرير قائمة بأسماء الملفات إلى طريقة read.json ().

دعنا ننشئ ملفي JSON باسم 'filter_skills.json' و 'filter_skills2.json' وقم بتحميلهما في PySpark DataFrame.

يحتوي ملف 'filter_skills.json' على ثلاثة سجلات.

يحتفظ ملف 'filter_skill2.json' بسجل واحد فقط.

استيراد pyspark

من pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName ( 'تلميح Linux' ) .getOrCreate ()

# قراءة ملفات مرشح_المهارات و مرشح_ مهارات 2 في وقت واحد في PySpark DataFrame

filter_skills = linuxhint_spark_app.read.json ([ 'filter_skills.json' و 'مرشح_المهارات2.json' ])

filter_skills.show ()

انتاج:

أخيرًا ، يحتفظ DataFrame بأربعة سجلات. تنتمي السجلات الثلاثة الأولى إلى JSON الأول وتنتمي السجلات الأخيرة إلى JSON الثاني.

قراءة JSON إلى PySpark DataFrame باستخدام Spark.read.json ()

() read.json هي طريقة مشابهة لـ read_json () في Pandas. هنا ، تأخذ read.json () مسارًا إلى JSON أو مباشرة إلى ملف JSON وتحميله مباشرة في PySpark DataFrame. ليست هناك حاجة لاستخدام طريقة createDataFrame () في هذا السيناريو. إذا كنت تريد قراءة ملفات JSON متعددة في وقت واحد ، فنحن بحاجة إلى تمرير قائمة بأسماء ملفات JSON من خلال قائمة مفصولة بفاصلة. يتم تخزين جميع سجلات JSON في DataFrame واحد.

بناء الجملة:

ملف واحد - spark_app.read.json ( 'file_name.json' )

ملفات متعددة - spark_app.read.json ([ 'file1.json' و 'file2.json' ، ...])

السيناريو 1: قراءة JSON التي تحتوي على سطر واحد

إذا كان ملف JSON الخاص بك في تنسيق record1 ، أو record2 ، أو record3 ... (سطر واحد) ، فيمكننا تسميته بتنسيق JSON مع أسطر مفردة. يقوم Spark بمعالجة هذه السجلات وتخزينها في PySpark DataFrame كصفوف. كل سجل عبارة عن صف في PySpark DataFrame.

لننشئ ملف JSON باسم 'filter_skills.json' يحتوي على 3 سجلات. اقرأ JSON هذا في PySpark DataFrame.

استيراد pyspark

من pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName ( 'تلميح Linux' ) .getOrCreate ()

# اقرأ filter_skills.json في PySpark DataFrame

filter_skills = linuxhint_spark_app.read.json ( 'filter_skills.json' )

filter_skills.show ()

انتاج:

يمكننا أن نرى أن بيانات JSON يتم تحويلها إلى PySpark DataFrame بسجلات وأسماء أعمدة محددة.

قراءة JSON إلى PySpark DataFrame باستخدام PySpark SQL

من الممكن إنشاء عرض مؤقت لبيانات JSON الخاصة بنا باستخدام PySpark SQL. مباشرة ، يمكننا توفير JSON في وقت إنشاء العرض المؤقت. انظر إلى الصيغة التالية. بعد ذلك ، يمكننا استخدام الأمر SELECT لعرض PySpark DataFrame.

بناء الجملة:

spark_app.sql ( 'إنشاء عرض مؤقت VIEW_NAME باستخدام خيارات json (المسار‘ file_name.json ') ' )

هنا ، 'VIEW_NAME' هو عرض بيانات JSON و 'file_name' هو اسم ملف JSON.

مثال 1:

ضع في اعتبارك ملف JSON المستخدم في الأمثلة السابقة - 'مرشح_المهارات.جسون'. حدد جميع الصفوف من DataFrame باستخدام SELECT مع عامل التشغيل '*'. هنا ، * يختار جميع الأعمدة من PySpark DataFrame.

استيراد pyspark

استيراد الباندا

من pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName ( 'تلميح Linux' ) .getOrCreate ()

# استخدام spark.sql لإنشاء عرض من JSON

مرشح_المهارات = linuxhint_spark_app.sql ( 'CREATE TEMPORARY VIEW Candidate_data باستخدام خيارات json (المسار 'filter_skills.json')' )

# استخدم استعلام التحديد لتحديد جميع السجلات من Candidate_data.

linuxhint_spark_app.sql ( 'SELECT * from Candidate_data' ).يعرض()

انتاج:

إجمالي عدد السجلات في PySpark DataFrame (تمت قراءته من JSON) هو 3.

المثال 2:

الآن ، قم بتصفية السجلات في PySpark DataFrame بناءً على عمود العمر. استخدم عامل التشغيل 'أكبر من' في الفئة العمرية للحصول على الصفوف ذات العمر الأكبر من 22.

# استخدم استعلام التحديد لتحديد السجلات مع تقدم العمر> 22.

linuxhint_spark_app.sql ( 'SELECT * from Candidate_data حيث العمر> 22' ).يعرض()

انتاج:

يوجد سجل واحد فقط في PySpark DataFrame بعمر أكبر من 22.

خاتمة

لقد تعلمنا الطرق الثلاث المختلفة لقراءة JSON في PySpark DataFrame. أولاً ، تعلمنا كيفية استخدام طريقة read_json () المتوفرة في وحدة Pandas لقراءة JSON إلى PySpark DataFrame. بعد ذلك ، تعلمنا كيفية قراءة ملفات JSON أحادية / متعددة الأسطر باستخدام طريقة spark.read.json () مع الخيار (). لقراءة ملفات JSON متعددة في وقت واحد ، نحتاج إلى تمرير قائمة بأسماء الملفات إلى هذه الطريقة. باستخدام PySpark SQL ، تتم قراءة ملف JSON في العرض المؤقت ويتم عرض DataFrame باستخدام استعلام SELECT.