PySpark Pandas_Udf ()

Pyspark Pandas Udf



يمكن تحويل PySpark DataFrame باستخدام وظيفة pandas_udf (). إنها وظيفة يحددها المستخدم ويتم تطبيقها على PySpark DataFrame مع السهم. يمكننا إجراء العمليات الموجهة باستخدام pandas_udf (). يمكن تنفيذه عن طريق تمرير هذه الوظيفة كديكور. دعنا نتعمق في هذا الدليل لمعرفة البنية والمعلمات والأمثلة المختلفة.

موضوع المحتويات:

إذا كنت تريد أن تعرف عن تثبيت PySpark DataFrame والوحدة النمطية ، فانتقل إلى هذا شرط .







Pyspark.sql.functions.pandas_udf ()

يتوفر pandas_udf () في وحدة وظائف sql في PySpark والتي يمكن استيرادها باستخدام الكلمة الأساسية 'من'. يتم استخدامه لأداء العمليات الموجهة على PySpark DataFrame لدينا. يتم تنفيذ هذه الوظيفة مثل المصمم بتمرير ثلاث معلمات. بعد ذلك ، يمكننا إنشاء دالة معرّفة من قِبل المستخدم تُرجع البيانات بتنسيق متجه (مثل استخدامنا سلسلة / NumPy لهذا الغرض) باستخدام سهم. ضمن هذه الوظيفة ، يمكننا إرجاع النتيجة.



البنية والنحو:



أولاً ، دعونا نلقي نظرة على بنية وصياغة هذه الوظيفة:

pandas_udf (نوع البيانات)
def function_name (عملية) -> convert_format:
بيان العودة

هنا ، function_name هو اسم الوظيفة المحددة لدينا. يحدد نوع البيانات نوع البيانات التي يتم إرجاعها بواسطة هذه الوظيفة. يمكننا إرجاع النتيجة باستخدام الكلمة الأساسية 'عودة'. يتم تنفيذ جميع العمليات داخل الوظيفة مع تعيين السهم.





Pandas_udf (الوظيفة ونوع الإرجاع)

  1. المعلمة الأولى هي الوظيفة المعرفة من قبل المستخدم والتي يتم تمريرها إليها.
  2. يتم استخدام المعلمة الثانية لتحديد نوع بيانات الإرجاع من الوظيفة.

بيانات:

في هذا الدليل بأكمله ، نستخدم PySpark DataFrame واحدًا فقط للتوضيح. يتم تطبيق جميع الوظائف المعرفة من قبل المستخدم التي نحددها على PySpark DataFrame. تأكد من إنشاء DataFrame هذا في بيئتك أولاً بعد تثبيت PySpark.



استيراد pyspark

من pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName ( 'تلميح Linux' ) .getOrCreate ()

من pyspark.sql.functions استيراد pandas_udf

من pyspark.sql.types import *

استيراد الباندا كباندا

# تفاصيل الخضار

الخضار = [{ 'يكتب' : 'الخضروات' و 'اسم' : 'طماطم' و 'locate_country' : 'الولايات المتحدة الأمريكية' و 'كمية' : 800 } ،

{ 'يكتب' : 'فاكهة' و 'اسم' : 'موز' و 'locate_country' : 'الصين' و 'كمية' : عشرين } ،

{ 'يكتب' : 'الخضروات' و 'اسم' : 'طماطم' و 'locate_country' : 'الولايات المتحدة الأمريكية' و 'كمية' : 800 } ،

{ 'يكتب' : 'الخضروات' و 'اسم' : 'مانجو' و 'locate_country' : 'اليابان' و 'كمية' : 0 } ،

{ 'يكتب' : 'فاكهة' و 'اسم' : 'ليمون' و 'locate_country' : 'الهند' و 'كمية' : 1700 } ،

{ 'يكتب' : 'الخضروات' و 'اسم' : 'طماطم' و 'locate_country' : 'الولايات المتحدة الأمريكية' و 'كمية' : 1200 } ،

{ 'يكتب' : 'الخضروات' و 'اسم' : 'مانجو' و 'locate_country' : 'اليابان' و 'كمية' : 0 } ،

{ 'يكتب' : 'فاكهة' و 'اسم' : 'ليمون' و 'locate_country' : 'الهند' و 'كمية' : 0 }

]

# إنشاء إطار بيانات السوق من البيانات أعلاه

market_df = linuxhint_spark_app.createDataFrame (نباتي)

market_df.show ()

انتاج:

هنا ، نقوم بإنشاء DataFrame هذا بأربعة أعمدة و 8 صفوف. الآن ، نستخدم pandas_udf () لإنشاء الوظائف المعرفة من قبل المستخدم وتطبيقها على هذه الأعمدة.

Pandas_udf () بأنواع بيانات مختلفة

في هذا السيناريو ، نقوم بإنشاء بعض الوظائف المعرفة من قبل المستخدم باستخدام pandas_udf () وتطبيقها على الأعمدة وعرض النتائج باستخدام طريقة select (). في كل حالة ، نستخدم pandas.Series أثناء قيامنا بالعمليات الموجهة. هذا يعتبر قيم العمود صفيف أحادي البعد ويتم تطبيق العملية على العمود. في المصمم نفسه ، نحدد نوع إرجاع الوظيفة.

مثال 1: Pandas_udf () مع نوع السلسلة

هنا ، نقوم بإنشاء وظيفتين معرفتين من قبل المستخدم مع نوع إرجاع السلسلة لتحويل قيم عمود نوع السلسلة إلى أحرف كبيرة وصغيرة. أخيرًا ، نطبق هذه الوظائف على عمودي 'النوع' و 'locate_country'.

# تحويل عمود الكتابة إلى الأحرف الكبيرة باستخدام pandas_udf

pandas_udf (StringType ())

def type_upper_case (i: panda.Series) -> panda.Series:

إرجاع i.str.upper ()

# تحويل عمود locate_country إلى أحرف صغيرة باستخدام pandas_udf

pandas_udf (StringType ())

def country_lower_case (i: panda.Series) -> panda.Series:

إرجاع i.str.lower ()

# عرض الأعمدة باستخدام select ()

market_df.select ( 'يكتب' ، type_upper_case ( 'يكتب' ) ، 'locate_country' و
country_lower_case ( 'locate_country' )).يعرض()

انتاج:

توضيح:

الوظيفة StringType () متاحة في الوحدة النمطية pyspark.sql.types. لقد قمنا بالفعل باستيراد هذه الوحدة أثناء إنشاء PySpark DataFrame.

  1. أولاً ، تقوم UDF (دالة معرّفة من قبل المستخدم) بإرجاع السلاسل بأحرف كبيرة باستخدام الدالة str.upper (). يتوفر str.upper () في بنية بيانات السلسلة (حيث نقوم بالتحويل إلى سلسلة بسهم داخل الوظيفة) والذي يحول السلسلة المحددة إلى أحرف كبيرة. أخيرًا ، يتم تطبيق هذه الوظيفة على عمود 'النوع' المحدد داخل طريقة select (). في السابق ، كانت جميع السلاسل الموجودة في عمود النوع مكتوبة بأحرف صغيرة. الآن ، تم تغييرها إلى الأحرف الكبيرة.
  2. ثانيًا ، تُرجع UDF السلاسل بأحرف كبيرة باستخدام الدالة str.lower (). يتوفر str.lower () في بنية بيانات السلسلة التي تحول السلسلة المحددة إلى أحرف صغيرة. أخيرًا ، يتم تطبيق هذه الوظيفة على عمود 'النوع' المحدد داخل طريقة select (). في السابق ، كانت جميع السلاسل في عمود النوع مكتوبة بأحرف كبيرة. الآن ، تم تغييرها إلى الأحرف الصغيرة.

مثال 2: Pandas_udf () بنوع صحيح

دعنا ننشئ UDF يحول عمود PySpark DataFrame الصحيح إلى سلسلة Pandas ونضيف 100 إلى كل قيمة. مرر عمود 'الكمية' إلى هذه الوظيفة داخل طريقة select ().

# أضف 100

pandas_udf (نوع صحيح ())

def add_100 (i: panda.Series) -> panda.Series:

العودة أنا + 100

# قم بتمرير عمود الكمية إلى الوظيفة والعرض أعلاه.

market_df.select ( 'كمية' ، add_100 ( 'كمية' )).يعرض()

انتاج:

توضيح:

داخل UDF ، نقوم بتكرار جميع القيم وتحويلها إلى سلسلة. بعد ذلك ، نضيف 100 لكل قيمة في السلسلة. أخيرًا ، نقوم بتمرير عمود 'الكمية' إلى هذه الوظيفة ويمكننا أن نرى أنه تمت إضافة 100 إلى جميع القيم.

Pandas_udf () مع أنواع بيانات مختلفة باستخدام Groupby () & Agg ()

دعونا نلقي نظرة على الأمثلة لتمرير UDF إلى الأعمدة المجمعة. هنا ، يتم تجميع قيم العمود أولاً باستخدام وظيفة groupby () ويتم التجميع باستخدام الدالة agg (). نجتاز UDF الخاص بنا داخل هذه الدالة التجميعية.

بناء الجملة:

pyspark_dataframe_object.groupby ( 'grouping_column' ) .agg (UDF
(pyspark_dataframe_object [ 'عمود' ]))

هنا ، يتم تجميع القيم الموجودة في عمود التجميع أولاً. بعد ذلك ، يتم التجميع على كل بيانات مجمعة فيما يتعلق بـ UDF الخاص بنا.

مثال 1: Pandas_udf () بمتوسط ​​إجمالي ()

هنا ، نقوم بإنشاء دالة معرّفة من قبل المستخدم مع تعويم نوع الإرجاع. داخل الدالة ، نحسب المتوسط ​​باستخدام دالة المتوسط ​​(). يتم تمرير هذا UDF إلى عمود 'الكمية' للحصول على متوسط ​​الكمية لكل نوع.

# إرجاع المتوسط ​​/ المتوسط

pandas_udf ( 'يطفو' )

def average_function (i: panda.Series) -> تعويم:

عودة i.mean ()

# قم بتمرير عمود الكمية إلى الوظيفة عن طريق تجميع عمود النوع.

market_df.groupby ( 'يكتب' ) .agg (متوسط_الوظيفة (market_df [ 'كمية' ])).يعرض()

انتاج:

نحن نقوم بالتجميع بناءً على العناصر الموجودة في عمود 'النوع'. يتم تشكيل مجموعتين - 'الفاكهة' و 'الخضار'. لكل مجموعة ، يتم حساب المتوسط ​​وإرجاعه.

مثال 2: Pandas_udf () مع إجمالي الحد الأقصى () والحد الأدنى ()

هنا ، نقوم بإنشاء وظيفتين معرفتين من قبل المستخدم مع نوع الإرجاع الصحيح (int). يُرجع أول UDF القيمة الدنيا ويُرجع UDF الثاني القيمة القصوى.

# pandas_udf التي تُرجع الحد الأدنى للقيمة

pandas_udf ( 'int' )

def min_ (i: panda.Series) -> int:

العودة i.min ()

# pandas_udf التي تُرجع القيمة القصوى

pandas_udf ( 'int' )

def max_ (i: panda.Series) -> int:

إرجاع i.max ()

# مرر عمود الكمية إلى min_ pandas_udf عن طريق تجميع locate_country.

market_df.groupby ( 'locate_country' ) .agg (min_ (market_df [ 'كمية' ])).يعرض()

# مرر عمود الكمية إلى max_ pandas_udf عن طريق تجميع locate_country.

market_df.groupby ( 'locate_country' ) .agg (max_ (market_df [ 'كمية' ])).يعرض()

انتاج:

لإرجاع الحد الأدنى والحد الأقصى للقيم ، نستخدم الدالتين min () و max () في نوع إرجاع UDFs. الآن ، نقوم بتجميع البيانات في عمود 'locate_country'. يتم تشكيل أربع مجموعات ('الصين' ، 'الهند' ، 'اليابان' ، 'الولايات المتحدة الأمريكية'). لكل مجموعة ، نعيد الكمية القصوى. وبالمثل ، نعيد الحد الأدنى للكمية.

خاتمة

في الأساس ، يتم استخدام pandas_udf () لأداء العمليات الموجهة على PySpark DataFrame الخاص بنا. لقد رأينا كيفية إنشاء pandas_udf () وتطبيقه على PySpark DataFrame. لفهم أفضل ، ناقشنا الأمثلة المختلفة من خلال النظر في جميع أنواع البيانات (سلسلة ، عدد عشري ، وعدد صحيح). يمكن استخدام pandas_udf () مع groupby () من خلال دالة agg ().