كيفية قراءة وكتابة جدول البيانات في PySpark

Kyfyt Qra T Wktabt Jdwl Albyanat Fy Pyspark



تكون معالجة البيانات في PySpark أسرع إذا تم تحميل البيانات في شكل جدول. باستخدام هذا ، باستخدام SQl Expressions ، ستكون المعالجة سريعة. لذا ، فإن تحويل PySpark DataFrame / RDD إلى جدول قبل إرساله للمعالجة هو الأسلوب الأفضل. سنرى اليوم كيفية قراءة بيانات الجدول في PySpark DataFrame ، وكتابة PySpark DataFrame إلى الجدول ، وإدراج DataFrame جديد في الجدول الحالي باستخدام الوظائف المضمنة. دعنا نذهب!

Pyspark.sql.DataFrameWriter.saveAsTable ()

أولاً ، سنرى كيفية كتابة PySpark DataFrame الموجود في الجدول باستخدام وظيفة write.saveAsTable (). يأخذ اسم الجدول والمعلمات الاختيارية الأخرى مثل الأوضاع ، partionBy ، وما إلى ذلك ، لكتابة DataFrame إلى الجدول. يتم تخزينه كملف باركيه.

بناء الجملة:







dataframe_obj.write.saveAsTable (المسار / اسم_الجدول ، الوضع ، التقسيم بواسطة ، ...)
  1. Table_name هو اسم الجدول الذي تم إنشاؤه من dataframe_obj.
  2. يمكننا إلحاق / الكتابة فوق بيانات الجدول باستخدام معلمة الوضع.
  3. يأخذ القسم الأعمدة المفردة / المتعددة لإنشاء أقسام بناءً على القيم الموجودة في هذه الأعمدة المتوفرة.

مثال 1:

قم بإنشاء PySpark DataFrame مع 5 صفوف و 4 أعمدة. اكتب Dataframe هذا إلى جدول يسمى “Agri_Table1”.



استيراد pyspark

من pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName ( 'تلميح Linux' ) .getOrCreate ()

# بيانات الزراعة مع 5 صفوف و 5 أعمدة

agri = [{ 'نوع التربة' : 'أسود' و 'الري_توافر' : 'لا' و 'فدان' : 2500 و 'حالة_التربة' : 'جاف' و
'دولة' : 'الولايات المتحدة الأمريكية' } ،

{ 'نوع التربة' : 'أسود' و 'الري_توافر' : 'نعم' و 'فدان' : 3500 و 'حالة_التربة' : 'مبتل' و
'دولة' : 'الهند' } ،

{ 'نوع التربة' : 'أحمر' و 'الري_توافر' : 'نعم' و 'فدان' : 210 و 'حالة_التربة' : 'جاف' و
'دولة' : 'المملكة المتحدة' } ،

{ 'نوع التربة' : 'آخر' و 'الري_توافر' : 'لا' و 'فدان' : 1000 و 'حالة_التربة' : 'مبتل' و
'دولة' : 'الولايات المتحدة الأمريكية' } ،

{ 'نوع التربة' : 'رمل' و 'الري_توافر' : 'لا' و 'فدان' : 500 و 'حالة_التربة' : 'جاف' و
'دولة' : 'الهند' }]



# إنشاء إطار البيانات من البيانات أعلاه

agri_df = linuxhint_spark_app.createDataFrame (agri)

agri_df.show ()

# اكتب DataFrame أعلاه إلى الجدول.

agri_df.coalesce ( 1 ) .write.saveAsTable ( 'Agri_Table1' )

انتاج:







يمكننا أن نرى أنه تم إنشاء ملف باركيه واحد باستخدام بيانات PySpark السابقة.



المثال الثاني:

ضع في اعتبارك DataFrame السابق واكتب 'Agri_Table2' إلى الجدول بتقسيم السجلات بناءً على القيم الموجودة في عمود 'البلد'.

# اكتب DataFrame أعلاه إلى الجدول مع معامل partitionBy

agri_df.write.saveAsTable ( 'Agri_Table2' ، التقسيم بواسطة = [ 'دولة' ])

انتاج:

هناك ثلاث قيم فريدة في عمود 'البلد' - 'الهند' و 'المملكة المتحدة' و 'الولايات المتحدة الأمريكية'. لذلك ، يتم إنشاء ثلاثة أقسام. كل قسم يحمل ملفات الباركيه.

Pyspark.sql.DataFrameReader.table ()

لنقم بتحميل الجدول في PySpark DataFrame باستخدام وظيفة spark.read.table (). يستغرق الأمر معلمة واحدة فقط وهي اسم المسار / الجدول. يقوم بتحميل الجدول مباشرة في PySpark DataFrame ويمكن أيضًا تطبيق جميع وظائف SQL التي يتم تطبيقها على PySpark DataFrame على هذا DataFrame المحمل.

بناء الجملة:

spark_app.read.table (المسار / 'اسم_الجدول')

في هذا السيناريو ، نستخدم الجدول السابق الذي تم إنشاؤه من PySpark DataFrame. تأكد من أنك بحاجة إلى تنفيذ مقتطفات التعليمات البرمجية للسيناريو السابق في بيئتك.

مثال:

قم بتحميل الجدول 'Agri_Table1' في DataFrame المسمى 'Load_data'.

load_data = linuxhint_spark_app.read.table ( 'Agri_Table1' )

تحميل_data.show ()

انتاج:

يمكننا أن نرى أن الجدول تم تحميله في PySpark DataFrame.

تنفيذ استعلامات SQL

الآن ، نقوم بتنفيذ بعض استعلامات SQL على DataFrame المحمل باستخدام وظيفة spark.sql ().

# استخدم الأمر SELECT لعرض جميع الأعمدة من الجدول أعلاه.

linuxhint_spark_app.sql ( 'SELECT * من Agri_Table1' ).يعرض()

# حيث الشرطية

linuxhint_spark_app.sql ( 'SELECT * from Agri_Table1 WHERE Soil_status = 'Dry'' ).يعرض()

linuxhint_spark_app.sql ( 'SELECT * from Agri_Table1 WHERE Acres> 2000' ).يعرض()

انتاج:

  1. يعرض الاستعلام الأول جميع الأعمدة والسجلات من DataFrame.
  2. يعرض الاستعلام الثاني السجلات بناءً على عمود 'Soil_status'. لا يوجد سوى ثلاثة سجلات مع العنصر 'جاف'.
  3. يقوم الاستعلام الأخير بإرجاع سجلين مع 'فدان' أكبر من 2000.

Pyspark.sql.DataFrameWriter.insertInto ()

باستخدام وظيفة insertInto () ، يمكننا إلحاق DataFrame بالجدول الحالي. يمكننا استخدام هذه الوظيفة مع selectExpr () لتحديد أسماء الأعمدة ثم إدراجها في الجدول. تأخذ هذه الوظيفة أيضًا اسم الجدول كمعلمة.

بناء الجملة:

DataFrame_obj.write.insertInto ('Table_name')

في هذا السيناريو ، نستخدم الجدول السابق الذي تم إنشاؤه من PySpark DataFrame. تأكد من أنك بحاجة إلى تنفيذ مقتطفات التعليمات البرمجية للسيناريو السابق في بيئتك.

مثال:

قم بإنشاء DataFrame جديد بسجلين وأدخلهما في جدول Agri_Table1.

استيراد pyspark

من pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName ( 'تلميح Linux' ) .getOrCreate ()

# بيانات الزراعة مع صفين

agri = [{ 'نوع التربة' : 'رمل' و 'الري_توافر' : 'لا' و 'فدان' : 2500 و 'حالة_التربة' : 'جاف' و
'دولة' : 'الولايات المتحدة الأمريكية' } ،

{ 'نوع التربة' : 'رمل' و 'الري_توافر' : 'لا' و 'فدان' : 1200 و 'حالة_التربة' : 'مبتل' و
'دولة' : 'اليابان' }]

# إنشاء إطار البيانات من البيانات أعلاه

agri_df2 = linuxhint_spark_app.createDataFrame (agri)

agri_df2.show ()

# write.insertInto ()

agri_df2.selectExpr ( 'فدان' و 'دولة' و 'Irrigation_availability' و 'نوع التربة' و
'حالة_التربة' ) .write.insertInto ( 'Agri_Table1' )

# عرض Agri_Table النهائي 1

linuxhint_spark_app.sql ( 'SELECT * من Agri_Table1' ).يعرض()

انتاج:

الآن ، العدد الإجمالي للصفوف الموجودة في DataFrame هو 7.

خاتمة

أنت الآن تفهم كيفية كتابة PySpark DataFrame إلى الجدول باستخدام وظيفة write.saveAsTable (). يأخذ اسم الجدول والمعلمات الاختيارية الأخرى. بعد ذلك ، قمنا بتحميل هذا الجدول في PySpark DataFrame باستخدام وظيفة spark.read.table (). يستغرق الأمر معلمة واحدة فقط وهي اسم المسار / الجدول. إذا كنت تريد إلحاق DataFrame الجديد بالجدول الموجود ، فاستخدم الوظيفة insertInto ().