Pyspark.sql.DataFrameWriter.saveAsTable ()
أولاً ، سنرى كيفية كتابة PySpark DataFrame الموجود في الجدول باستخدام وظيفة write.saveAsTable (). يأخذ اسم الجدول والمعلمات الاختيارية الأخرى مثل الأوضاع ، partionBy ، وما إلى ذلك ، لكتابة DataFrame إلى الجدول. يتم تخزينه كملف باركيه.
بناء الجملة:
dataframe_obj.write.saveAsTable (المسار / اسم_الجدول ، الوضع ، التقسيم بواسطة ، ...)
- Table_name هو اسم الجدول الذي تم إنشاؤه من dataframe_obj.
- يمكننا إلحاق / الكتابة فوق بيانات الجدول باستخدام معلمة الوضع.
- يأخذ القسم الأعمدة المفردة / المتعددة لإنشاء أقسام بناءً على القيم الموجودة في هذه الأعمدة المتوفرة.
مثال 1:
قم بإنشاء PySpark DataFrame مع 5 صفوف و 4 أعمدة. اكتب Dataframe هذا إلى جدول يسمى “Agri_Table1”.
استيراد pyspark
من pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName ( 'تلميح Linux' ) .getOrCreate ()
# بيانات الزراعة مع 5 صفوف و 5 أعمدة
agri = [{ 'نوع التربة' : 'أسود' و 'الري_توافر' : 'لا' و 'فدان' : 2500 و 'حالة_التربة' : 'جاف' و
'دولة' : 'الولايات المتحدة الأمريكية' } ،
{ 'نوع التربة' : 'أسود' و 'الري_توافر' : 'نعم' و 'فدان' : 3500 و 'حالة_التربة' : 'مبتل' و
'دولة' : 'الهند' } ،
{ 'نوع التربة' : 'أحمر' و 'الري_توافر' : 'نعم' و 'فدان' : 210 و 'حالة_التربة' : 'جاف' و
'دولة' : 'المملكة المتحدة' } ،
{ 'نوع التربة' : 'آخر' و 'الري_توافر' : 'لا' و 'فدان' : 1000 و 'حالة_التربة' : 'مبتل' و
'دولة' : 'الولايات المتحدة الأمريكية' } ،
{ 'نوع التربة' : 'رمل' و 'الري_توافر' : 'لا' و 'فدان' : 500 و 'حالة_التربة' : 'جاف' و
'دولة' : 'الهند' }]
# إنشاء إطار البيانات من البيانات أعلاه
agri_df = linuxhint_spark_app.createDataFrame (agri)
agri_df.show ()
# اكتب DataFrame أعلاه إلى الجدول.
agri_df.coalesce ( 1 ) .write.saveAsTable ( 'Agri_Table1' )
انتاج:
يمكننا أن نرى أنه تم إنشاء ملف باركيه واحد باستخدام بيانات PySpark السابقة.
المثال الثاني:
ضع في اعتبارك DataFrame السابق واكتب 'Agri_Table2' إلى الجدول بتقسيم السجلات بناءً على القيم الموجودة في عمود 'البلد'.
# اكتب DataFrame أعلاه إلى الجدول مع معامل partitionByagri_df.write.saveAsTable ( 'Agri_Table2' ، التقسيم بواسطة = [ 'دولة' ])
انتاج:
هناك ثلاث قيم فريدة في عمود 'البلد' - 'الهند' و 'المملكة المتحدة' و 'الولايات المتحدة الأمريكية'. لذلك ، يتم إنشاء ثلاثة أقسام. كل قسم يحمل ملفات الباركيه.
Pyspark.sql.DataFrameReader.table ()
لنقم بتحميل الجدول في PySpark DataFrame باستخدام وظيفة spark.read.table (). يستغرق الأمر معلمة واحدة فقط وهي اسم المسار / الجدول. يقوم بتحميل الجدول مباشرة في PySpark DataFrame ويمكن أيضًا تطبيق جميع وظائف SQL التي يتم تطبيقها على PySpark DataFrame على هذا DataFrame المحمل.
بناء الجملة:
spark_app.read.table (المسار / 'اسم_الجدول')في هذا السيناريو ، نستخدم الجدول السابق الذي تم إنشاؤه من PySpark DataFrame. تأكد من أنك بحاجة إلى تنفيذ مقتطفات التعليمات البرمجية للسيناريو السابق في بيئتك.
مثال:
قم بتحميل الجدول 'Agri_Table1' في DataFrame المسمى 'Load_data'.
load_data = linuxhint_spark_app.read.table ( 'Agri_Table1' )تحميل_data.show ()
انتاج:
يمكننا أن نرى أن الجدول تم تحميله في PySpark DataFrame.
تنفيذ استعلامات SQL
الآن ، نقوم بتنفيذ بعض استعلامات SQL على DataFrame المحمل باستخدام وظيفة spark.sql ().
# استخدم الأمر SELECT لعرض جميع الأعمدة من الجدول أعلاه.linuxhint_spark_app.sql ( 'SELECT * من Agri_Table1' ).يعرض()
# حيث الشرطية
linuxhint_spark_app.sql ( 'SELECT * from Agri_Table1 WHERE Soil_status = 'Dry'' ).يعرض()
linuxhint_spark_app.sql ( 'SELECT * from Agri_Table1 WHERE Acres> 2000' ).يعرض()
انتاج:
- يعرض الاستعلام الأول جميع الأعمدة والسجلات من DataFrame.
- يعرض الاستعلام الثاني السجلات بناءً على عمود 'Soil_status'. لا يوجد سوى ثلاثة سجلات مع العنصر 'جاف'.
- يقوم الاستعلام الأخير بإرجاع سجلين مع 'فدان' أكبر من 2000.
Pyspark.sql.DataFrameWriter.insertInto ()
باستخدام وظيفة insertInto () ، يمكننا إلحاق DataFrame بالجدول الحالي. يمكننا استخدام هذه الوظيفة مع selectExpr () لتحديد أسماء الأعمدة ثم إدراجها في الجدول. تأخذ هذه الوظيفة أيضًا اسم الجدول كمعلمة.
بناء الجملة:
DataFrame_obj.write.insertInto ('Table_name')في هذا السيناريو ، نستخدم الجدول السابق الذي تم إنشاؤه من PySpark DataFrame. تأكد من أنك بحاجة إلى تنفيذ مقتطفات التعليمات البرمجية للسيناريو السابق في بيئتك.
مثال:
قم بإنشاء DataFrame جديد بسجلين وأدخلهما في جدول Agri_Table1.
استيراد pysparkمن pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName ( 'تلميح Linux' ) .getOrCreate ()
# بيانات الزراعة مع صفين
agri = [{ 'نوع التربة' : 'رمل' و 'الري_توافر' : 'لا' و 'فدان' : 2500 و 'حالة_التربة' : 'جاف' و
'دولة' : 'الولايات المتحدة الأمريكية' } ،
{ 'نوع التربة' : 'رمل' و 'الري_توافر' : 'لا' و 'فدان' : 1200 و 'حالة_التربة' : 'مبتل' و
'دولة' : 'اليابان' }]
# إنشاء إطار البيانات من البيانات أعلاه
agri_df2 = linuxhint_spark_app.createDataFrame (agri)
agri_df2.show ()
# write.insertInto ()
agri_df2.selectExpr ( 'فدان' و 'دولة' و 'Irrigation_availability' و 'نوع التربة' و
'حالة_التربة' ) .write.insertInto ( 'Agri_Table1' )
# عرض Agri_Table النهائي 1
linuxhint_spark_app.sql ( 'SELECT * من Agri_Table1' ).يعرض()
انتاج:
الآن ، العدد الإجمالي للصفوف الموجودة في DataFrame هو 7.
خاتمة
أنت الآن تفهم كيفية كتابة PySpark DataFrame إلى الجدول باستخدام وظيفة write.saveAsTable (). يأخذ اسم الجدول والمعلمات الاختيارية الأخرى. بعد ذلك ، قمنا بتحميل هذا الجدول في PySpark DataFrame باستخدام وظيفة spark.read.table (). يستغرق الأمر معلمة واحدة فقط وهي اسم المسار / الجدول. إذا كنت تريد إلحاق DataFrame الجديد بالجدول الموجود ، فاستخدم الوظيفة insertInto ().