وحدة `threading` في بايثون
تشرح هذه المقالة وحدة threading في بايثون۔
YouTube Video
وحدة threading في بايثون
وحدة threading في بايثون هي مكتبة قياسية تدعم برمجة تعدد الخيوط۔ استخدام الخيوط يسمح بتشغيل عمليات متعددة في وقت واحد، مما قد يحسن أداء البرامج، خاصة في الحالات التي تتضمن عمليات حجب مثل انتظار إدخال/إخراج البيانات (I/O).۔ نتيجةً لوجود قفل المترجم العالمي (GIL) في بايثون، فإن فعالية التشغيل المتعدد الخيوط تكون محدودة في العمليات المعتمدة على وحدة المعالجة المركزية (CPU-bound)، لكنها تعمل بكفاءة عالية في العمليات المعتمدة على إدخال/إخراج البيانات (I/O-bound).۔
تقدم الأقسام التالية شرحًا لأساسيات استخدام وحدة threading وكيفية التحكم في الخيوط۔
الاستخدام الأساسي للخيوط
إنشاء وتشغيل الخيوط
لإنشاء خيط وتنفيذ معالجة متزامنة، استخدم الفئة threading.Thread.۔ حدد الدالة المستهدفة لإنشاء خيط وتشغيله.۔
1import threading
2import time
3
4# Function to be executed in a thread
5def worker():
6 print("Worker thread started")
7 time.sleep(2)
8 print("Worker thread finished")
9
10# Create and start the thread
11thread = threading.Thread(target=worker)
12thread.start()
13
14# Processing in the main thread
15print("Main thread continues to run")
16
17# Wait for the thread to finish
18thread.join()
19print("Main thread finished")- في هذا المثال، يتم تنفيذ دالة
workerفي خيط مستقل، بينما يستمر الخيط الرئيسي في العمل.۔ عند استدعاء أسلوبjoin()، ينتظر الخيط الرئيسي حتى ينتهي الخيط الفرعي من التنفيذ.۔
تسمية الخيوط (Threads)
إعطاء الخيوط أسماء ذات معنى يسهل التسجيل (logging) وتصحيح الأخطاء (debugging)۔ يمكنك تحديد ذلك باستخدام الوسيطة name۔
1import threading
2import time
3
4# Function to be executed in a thread
5def worker():
6 print("Worker thread started")
7 time.sleep(2)
8 print("Worker thread finished")
9
10t = threading.Thread(
11 target=worker,
12 args=("named-worker", 0.3),
13 name="MyWorkerThread"
14)
15
16t.start()
17
18print("Active threads:", threading.active_count())
19for th in threading.enumerate():
20 print(" -", th.name)
21
22t.join()- تعيد الدالة
threading.enumerate()قائمة بالخيوط الحالية، مما يساعد في تصحيح الأخطاء ومراقبة الحالة۔
توريث الفئة Thread
إذا كنت تريد تخصيص الفئة التي تنفذ الخيط، يمكنك تعريف فئة جديدة عن طريق توريث الفئة threading.Thread.۔
1import threading
2import time
3
4# Inherit from the Thread class
5class WorkerThread(threading.Thread):
6 def __init__(self, name, delay, repeat=3):
7 super().__init__(name=name)
8 self.delay = delay
9 self.repeat = repeat
10 self.results = []
11
12 def run(self):
13 for i in range(self.repeat):
14 msg = f"{self.name} step {i+1}"
15 print(msg)
16 self.results.append(msg)
17 time.sleep(self.delay)
18
19# Create and start the threads
20t1 = WorkerThread("Worker-A", delay=0.4, repeat=3)
21t2 = WorkerThread("Worker-B", delay=0.2, repeat=5)
22
23t1.start()
24t2.start()
25
26t1.join()
27t2.join()
28
29print("Results A:", t1.results)
30print("Results B:", t2.results)- في هذا المثال، تم تجاوز الطريقة
run()لتعريف سلوك الخيط، مما يسمح لكل خيط بالاحتفاظ ببياناته الخاصة۔ يكون ذلك مفيدًا عندما تقوم الخيوط بمعالجة معقدة أو عند الحاجة إلى أن يمتلك كل خيط بيانات مستقلة خاصة به۔
تزامن بين الخيوط
عندما تصل عدة خيوط إلى موارد مشتركة في نفس الوقت، قد تحدث تنافس في البيانات (Data Races).۔ لمنع ذلك، توفر وحدة threading العديد من آليات التزامن۔
القفل (Lock)
يُستخدم كائن Lock لتنفيذ التحكم الحصري في الموارد بين الخيوط.۔ عندما يقوم خيط واحد بقفل مورد ما، لا يمكن للخيوط الأخرى الوصول إلى ذلك المورد.۔
1import threading
2
3lock = threading.Lock()
4shared_resource = 0
5
6def worker():
7 global shared_resource
8 with lock: # Acquire the lock
9 local_copy = shared_resource
10 local_copy += 1
11 shared_resource = local_copy
12
13threads = [threading.Thread(target=worker) for _ in range(5)]
14
15for t in threads:
16 t.start()
17
18for t in threads:
19 t.join()
20
21print(f"Final value of shared resource: {shared_resource}")- في هذا المثال، تصل خمسة خيوط إلى مورد مشترك، لكن يتم استخدام
Lockلمنع تعديل البيانات من قبل عدة خيوط بشكل متزامن.۔
القفل القابل لإعادة الدخول (RLock)
إذا كان نفس الخيط بحاجة لاكتساب القفل عدة مرات، استخدم RLock (القفل القابل لإعادة الدخول)۔ يكون ذلك مفيدًا في حالات الاستدعاءات التكرارية أو استدعاءات المكتبات التي قد تكتسب القفل في استدعاءات مختلفة۔
1import threading
2
3rlock = threading.RLock()
4shared = []
5
6def outer():
7 with rlock:
8 shared.append("outer")
9 inner()
10
11def inner():
12 with rlock:
13 shared.append("inner")
14
15t = threading.Thread(target=outer)
16t.start()
17t.join()
18print(shared)- مع
RLock، يمكن لنفس الخيط إعادة اكتساب القفل الذي يحتفظ به بالفعل، مما يساعد في منع حالات التوقف (deadlock) مع الأقفال المتداخلة۔
الحالة (Condition)
يُستخدم Condition لجعل الخيوط تنتظر حتى يتم تحقيق شرط معين۔ عندما يحقق أحد الخيوط الشرط، يمكنك استدعاء notify() لإعلام خيط آخر، أو notify_all() لإعلام جميع الخيوط المنتظرة۔
فيما يلي مثال لمنتج ومستهلِك يستخدمان Condition۔
1import threading
2
3condition = threading.Condition()
4shared_data = []
5
6def producer():
7 with condition:
8 shared_data.append(1)
9 print("Produced an item")
10 condition.notify() # Notify the consumer
11
12def consumer():
13 with condition:
14 condition.wait() # Wait until the condition is met
15
16 item = shared_data.pop(0)
17 print(f"Consumed an item: {item}")
18
19# Create the threads
20producer_thread = threading.Thread(target=producer)
21consumer_thread = threading.Thread(target=consumer)
22
23consumer_thread.start()
24producer_thread.start()
25
26producer_thread.join()
27consumer_thread.join()- يستخدم هذا الكود
Conditionليقوم المنتج بإعلام المستهلِك عند إضافة بيانات، بينما ينتظر المستهلِك ذلك الإشعار قبل جلب البيانات، لتحقيق التزامن۔
تحويل الخيوط إلى خيوط خدمية (Daemonization)
يتم إنهاء الخيوط الخدمية (Daemon threads) بشكل إجباري عند انتهاء الخيط الرئيسي۔ بينما يجب أن تنتظر الخيوط العادية لإنهاء نفسها، يتم إنهاء الخيوط الخدمية تلقائيًا۔
1import threading
2import time
3
4def worker():
5 while True:
6 print("Working...")
7 time.sleep(1)
8
9# Create a daemon thread
10thread = threading.Thread(target=worker)
11thread.daemon = True # Set as a daemon thread
12
13thread.start()
14
15# Processing in the main thread
16time.sleep(3)
17print("Main thread finished")- في هذا المثال، تم تحويل خيط
workerإلى خيط خادي (daemon)، لذلك يتم إنهاؤه بشكل إجباري عند انتهاء الخيط الرئيسي۔
إدارة الخيوط باستخدام ThreadPoolExecutor
بخلاف وحدة threading، يمكنك استخدام ThreadPoolExecutor من وحدة concurrent.futures لإدارة تجمع الخيوط وتنفيذ المهام بالتوازي۔
1from concurrent.futures import ThreadPoolExecutor
2import time
3
4def worker(seconds):
5 print(f"Sleeping for {seconds} second(s)")
6 time.sleep(seconds)
7 return f"Finished sleeping for {seconds} second(s)"
8
9with ThreadPoolExecutor(max_workers=3) as executor:
10 futures = [executor.submit(worker, i) for i in range(1, 4)]
11 for future in futures:
12 print(future.result())- يقوم
ThreadPoolExecutorبإنشاء تجمع للخيوط ومعالجة المهام بكفاءة۔ حدد عدد الخيوط التي ستعمل بالتوازي باستخدامmax_workers۔
التواصل بين الخيوط باستخدام الأحداث (Event Communication)
باستخدام threading.Event، يمكنك تعيين إشارات (Flags) بين الخيوط لإبلاغ الخيوط الأخرى بحدوث حدث معين۔
1import threading
2import time
3
4event = threading.Event()
5
6def worker():
7 print("Waiting for event to be set")
8 event.wait() # Wait until the event is set
9
10 print("Event received, continuing work")
11
12thread = threading.Thread(target=worker)
13thread.start()
14
15time.sleep(2)
16print("Setting the event")
17event.set() # Set the event and notify the thread- يُظهِر هذا الكود آلية حيث ينتظر خيط العمل إشارة
Eventويستأنف المعالجة عندما يستدعي الخيط الرئيسيevent.set()۔
معالجة الاستثناءات وإنهاء الخيوط في الخيوط (Threads)
عند حدوث استثناءات في الخيوط، فهي لا تنتقل مباشرة إلى الخيط الرئيسي، لذلك يلزم وجود نمط لالتقاط الاستثناءات ومشاركتها۔
1import threading
2import queue
3
4def worker(err_q):
5 try:
6 raise ValueError("Something bad")
7 except Exception as e:
8 err_q.put(e)
9
10q = queue.Queue()
11t = threading.Thread(target=worker, args=(q,))
12t.start()
13t.join()
14if not q.empty():
15 exc = q.get()
16 print("Worker raised:", exc)- من خلال وضع الاستثناءات في
Queueواسترجاعها في الخيط الرئيسي، يمكنك اكتشاف حالات الفشل بشكل موثوق۔ إذا استخدمتconcurrent.futures.ThreadPoolExecutor، فإن الاستثناءات تُرفع مرة أخرى معfuture.result()، مما يسهل التعامل معها۔
قفل المفسر العام (GIL) وتأثيراته
بسبب آلية قفل المفسر العام (GIL) في CPython، لا تعمل تعليمة بايت بايثون متعددة بالفعل في نفس الوقت ضمن نفس العملية۔ بالنسبة للمهام التي تستهلك المعالج بشكل كبير مثل العمليات الحسابية الثقيلة، يُنصح باستخدام multiprocessing۔ من ناحية أخرى، في المهام المرتبطة بإدخال/إخراج مثل قراءة الملفات أو الاتصال بالشبكة، فإن threading يعمل بفعالية۔
الملخص
باستخدام وحدة threading في بايثون، يمكنك تنفيذ برامج متعددة الخيوط وتشغيل عمليات متعددة بالتوازي۔ مع آليات التزامن مثل Lock وCondition، يمكنك الوصول بأمان إلى الموارد المشتركة وإجراء التزامن المعقد۔ بالإضافة إلى ذلك، باستخدام الخيوط الخدمية أو ThreadPoolExecutor، يصبح إدارة الخيوط والمعالجة المتوازية بشكل فعال أسهل۔
يمكنك متابعة المقالة أعلاه باستخدام Visual Studio Code على قناتنا على YouTube.۔ يرجى التحقق من القناة على YouTube أيضًا.۔