كيفية تنفيذ تدفق البيانات في الوقت الحقيقي في بايثون

Kyfyt Tnfydh Tdfq Albyanat Fy Alwqt Alhqyqy Fy Baythwn



يعد إتقان تنفيذ تدفق البيانات في الوقت الفعلي في Python بمثابة مهارة أساسية في عالم اليوم المرتبط بالبيانات. يستكشف هذا الدليل الخطوات الأساسية والأدوات الأساسية لاستخدام تدفق البيانات في الوقت الفعلي بشكل موثوق في Python. بدءًا من اختيار إطار عمل مناسب مثل Apache Kafka أو Apache Pulsar وحتى كتابة كود Python لاستهلاك البيانات ومعالجتها وتصورها الفعال دون عناء، سنكتسب المهارات اللازمة لإنشاء قنوات بيانات سريعة وفعالة في الوقت الفعلي.

المثال 1: تنفيذ تدفق البيانات في الوقت الفعلي في لغة بايثون

يعد تنفيذ تدفق البيانات في الوقت الفعلي في Python أمرًا بالغ الأهمية في عصرنا الحالي وعالمنا الذي يعتمد على البيانات. في هذا المثال التفصيلي، سنتعرف على عملية بناء نظام تدفق البيانات في الوقت الفعلي باستخدام Apache Kafka وPython في Google Colab.







لتهيئة المثال قبل أن نبدأ بالبرمجة، يعد إنشاء بيئة محددة في Google Colab أمرًا ضروريًا. أول شيء يتعين علينا القيام به هو تثبيت المكتبات اللازمة. نحن نستخدم مكتبة 'kafka-python' لتكامل كافكا.



! نقطة ثَبَّتَ كافكا بيثون


يقوم هذا الأمر بتثبيت مكتبة 'kafka-python' التي توفر وظائف Python والارتباطات الخاصة بـ Apache Kafka. بعد ذلك، نقوم باستيراد المكتبات المطلوبة لمشروعنا. استيراد المكتبات المطلوبة بما في ذلك 'KafkaProducer' و'KafkaConsumer' هي فئات من مكتبة 'kafka-python' التي تسمح لنا بالتفاعل مع وسطاء Kafka. JSON هي مكتبة Python للعمل مع بيانات JSON التي نستخدمها لإجراء تسلسل وإلغاء تسلسل الرسائل.



من استيراد كافكا KafkaProducer، KafkaConsumer
استيراد json


إنشاء منتج كافكا





وهذا مهم لأن منتج كافكا يرسل البيانات إلى موضوع كافكا. في مثالنا، قمنا بإنشاء منتج لإرسال بيانات محاكاة في الوقت الفعلي إلى موضوع يسمى 'موضوع الوقت الحقيقي'.

نقوم بإنشاء مثيل 'KafkaProducer' الذي يحدد عنوان وسيط Kafka على أنه 'localhost:9092'. بعد ذلك، نستخدم وظيفة 'value_serializer'، وهي وظيفة تقوم بتسلسل البيانات قبل إرسالها إلى كافكا. في حالتنا، تقوم دالة lambda بتشفير البيانات بتنسيق JSON بترميز UTF-8. الآن، دعونا نحاكي بعض البيانات في الوقت الفعلي ونرسلها إلى موضوع كافكا.



Producer = كافكاProducer ( bootstrap_servers = 'المضيف المحلي: 9092' ,
value_serializer =lambda v: json.dumps ( في ) .تشفير ( 'utf-8' ) )
# محاكاة البيانات في الوقت الحقيقي
البيانات = { 'معرف_المستشعر' : 1 , 'درجة حرارة' : 25.5 , 'رطوبة' : 60.2 }
# إرسال البيانات إلى الموضوع
منتج.إرسال ( 'موضوع في الوقت الحقيقي' ، بيانات )


في هذه السطور، نحدد قاموس 'البيانات' الذي يمثل بيانات الاستشعار المحاكية. ثم نستخدم طريقة 'الإرسال' لنشر هذه البيانات إلى 'الموضوع في الوقت الفعلي'.

بعد ذلك، نريد إنشاء مستهلك كافكا، ويقرأ مستهلك كافكا البيانات من موضوع كافكا. نحن ننشئ مستهلكًا لاستهلاك الرسائل ومعالجتها في 'موضوع الوقت الفعلي'. نقوم بإنشاء مثيل 'KafkaConsumer'، مع تحديد الموضوع الذي نريد استهلاكه، على سبيل المثال، (موضوع في الوقت الفعلي) وعنوان وسيط Kafka. بعد ذلك، فإن 'value_deserializer' هي وظيفة تعمل على إلغاء تسلسل البيانات التي يتم تلقيها من كافكا. في حالتنا، تقوم دالة lambda بفك ترميز البيانات على أنها JSON بتشفير UTF-8.

Consumer = KafkaConsumer ( 'موضوع في الوقت الحقيقي' ,
bootstrap_servers = 'المضيف المحلي: 9092' ,
value_deserializer =لامدا س: json.loads ( x.decode ( 'utf-8' ) ) )


نحن نستخدم حلقة تكرارية لاستهلاك ومعالجة الرسائل من الموضوع بشكل مستمر.

# قراءة ومعالجة البيانات في الوقت الحقيقي
ل رسالة في مستهلك:
البيانات = الرسالة. القيمة
مطبعة ( F 'البيانات المستلمة: {البيانات}' )


نقوم باسترداد قيمة كل رسالة وبيانات الاستشعار المحاكية لدينا داخل الحلقة وطباعتها على وحدة التحكم. يتضمن تشغيل منتج ومستهلك كافكا تشغيل هذا الكود في Google Colab وتنفيذ خلايا الكود بشكل فردي. يرسل المنتج البيانات المحاكاة إلى موضوع كافكا، ويقوم المستهلك بقراءة البيانات المستلمة وطباعتها.


تحليل الإخراج أثناء تشغيل التعليمات البرمجية

سوف نلاحظ البيانات في الوقت الحقيقي التي يتم إنتاجها واستهلاكها. قد يختلف تنسيق البيانات اعتمادًا على المحاكاة لدينا أو مصدر البيانات الفعلي. في هذا المثال التفصيلي، نغطي العملية الكاملة لإعداد نظام تدفق البيانات في الوقت الفعلي باستخدام Apache Kafka وPython في Google Colab. وسنشرح كل سطر من التعليمات البرمجية وأهميته في بناء هذا النظام. يعد تدفق البيانات في الوقت الفعلي قدرة قوية، وهذا المثال بمثابة أساس لتطبيقات العالم الحقيقي الأكثر تعقيدًا.

المثال 2: تنفيذ تدفق البيانات في الوقت الفعلي في لغة بايثون باستخدام بيانات سوق الأوراق المالية

لنقم بمثال فريد آخر لتنفيذ تدفق البيانات في الوقت الفعلي في بايثون باستخدام سيناريو مختلف؛ هذه المرة، سوف نركز على بيانات سوق الأسهم. نقوم بإنشاء نظام تدفق البيانات في الوقت الفعلي الذي يلتقط التغيرات في أسعار الأسهم ويعالجها باستخدام Apache Kafka وPython في Google Colab. كما هو موضح في المثال السابق، نبدأ بتكوين بيئتنا في Google Colab. أولاً نقوم بتثبيت المكتبات المطلوبة:

! نقطة ثَبَّتَ كافكا بيثون yfinance


هنا، نضيف مكتبة 'yfinance' التي تتيح لنا الحصول على بيانات سوق الأوراق المالية في الوقت الحقيقي. بعد ذلك، نقوم باستيراد المكتبات اللازمة. نواصل استخدام فئات 'KafkaProducer' و'KafkaConsumer' من مكتبة 'kafka-python' لتفاعل كافكا. نقوم باستيراد JSON للعمل مع بيانات JSON. نستخدم أيضًا 'yfinance' للحصول على بيانات سوق الأسهم في الوقت الفعلي. نقوم أيضًا باستيراد مكتبة 'الوقت' لإضافة تأخير زمني لمحاكاة التحديثات في الوقت الفعلي.

من استيراد كافكا KafkaProducer، KafkaConsumer
استيراد json
استيراد yfinance مثل yf
يستورد وقت


الآن، نقوم بإنشاء منتج كافكا لبيانات المخزون. يحصل منتج كافكا لدينا على بيانات الأسهم في الوقت الفعلي ويرسلها إلى موضوع كافكا يسمى 'سعر السهم'.

Producer = كافكاProducer ( bootstrap_servers = 'المضيف المحلي: 9092' ,
value_serializer =lambda v: json.dumps ( في ) .تشفير ( 'utf-8' ) )

بينما حقيقي:
الأسهم = yf.Ticker ( 'آبل' ) # مثال: أسهم شركة Apple Inc
Stock_data = Stock.history ( فترة = '1د' )
last_price = Stock_data [ 'يغلق' ] .iloc [ - 1 ]
البيانات = { 'رمز' : 'آبل' , 'سعر' : السعر الاخير }
منتج.إرسال ( 'سعر السهم' ، بيانات )
وقت النوم ( 10 ) # محاكاة التحديثات في الوقت الحقيقي كل 10 ثواني


نقوم بإنشاء مثيل 'KafkaProducer' بعنوان وسيط Kafka في هذا الرمز. داخل الحلقة، نستخدم 'yfinance' للحصول على أحدث سعر لسهم شركة Apple Inc. ('AAPL'). ثم نقوم باستخراج آخر سعر إغلاق وإرساله إلى موضوع 'سعر السهم'. في النهاية، نقدم تأخيرًا زمنيًا لمحاكاة التحديثات في الوقت الفعلي كل 10 ثوانٍ.

لنقم بإنشاء مستهلك كافكا لقراءة ومعالجة بيانات أسعار الأسهم من موضوع 'سعر السهم'.

Consumer = KafkaConsumer ( 'سعر السهم' ,
bootstrap_servers = 'المضيف المحلي: 9092' ,
value_deserializer =لامدا س: json.loads ( x.decode ( 'utf-8' ) ) )

ل رسالة في مستهلك:
Stock_data = message.value
مطبعة ( F 'بيانات المخزون المستلمة: {stock_data['symbol']} - السعر: {stock_data['price']}' )


يشبه هذا الرمز إعداد المستهلك في المثال السابق. يقوم بقراءة ومعالجة الرسائل بشكل مستمر من موضوع 'سعر السهم' ويطبع رمز السهم وسعره على وحدة التحكم. نقوم بتنفيذ خلايا التعليمات البرمجية بشكل تسلسلي، على سبيل المثال، واحدة تلو الأخرى في Google Colab لتشغيل المنتج والمستهلك. يحصل المنتج على تحديثات أسعار الأسهم في الوقت الفعلي ويرسلها بينما يقرأ المستهلك هذه البيانات ويعرضها.

! نقطة ثَبَّتَ كافكا بيثون yfinance
من استيراد كافكا KafkaProducer، KafkaConsumer
استيراد json
استيراد yfinance مثل yf
يستورد وقت
Producer = كافكاProducer ( bootstrap_servers = 'المضيف المحلي: 9092' ,
value_serializer =lambda v: json.dumps ( في ) .تشفير ( 'utf-8' ) )

بينما حقيقي:
الأسهم = yf.Ticker ( 'آبل' ) # أسهم شركة أبل
Stock_data = Stock.history ( فترة = '1د' )
last_price = Stock_data [ 'يغلق' ] .iloc [ - 1 ]

البيانات = { 'رمز' : 'آبل' , 'سعر' : السعر الاخير }

منتج.إرسال ( 'سعر السهم' ، بيانات )

وقت النوم ( 10 ) # محاكاة التحديثات في الوقت الحقيقي كل 10 ثواني
Consumer = KafkaConsumer ( 'سعر السهم' ,
bootstrap_servers = 'المضيف المحلي: 9092' ,
value_deserializer =لامدا س: json.loads ( x.decode ( 'utf-8' ) ) )

ل رسالة في مستهلك:
Stock_data = message.value
مطبعة ( F 'بيانات المخزون المستلمة: {stock_data['symbol']} - السعر: {stock_data['price']}' )


في تحليل المخرجات بعد تشغيل التعليمات البرمجية، سنلاحظ إنتاج واستهلاك تحديثات أسعار الأسهم في الوقت الفعلي لشركة Apple Inc.

خاتمة

في هذا المثال الفريد، أظهرنا تنفيذ تدفق البيانات في الوقت الفعلي في Python باستخدام Apache Kafka ومكتبة 'yfinance' لالتقاط بيانات سوق الأوراق المالية ومعالجتها. لقد شرحنا بدقة كل سطر من الكود. يمكن تطبيق تدفق البيانات في الوقت الفعلي على مجالات مختلفة لبناء تطبيقات العالم الحقيقي في مجال التمويل وإنترنت الأشياء والمزيد.