وحدة `threading` في بايثون

وحدة `threading` في بايثون

تشرح هذه المقالة وحدة threading في بايثون۔

YouTube Video

وحدة threading في بايثون

وحدة threading في بايثون هي مكتبة قياسية تدعم برمجة تعدد الخيوط۔ استخدام الخيوط يسمح بتشغيل عمليات متعددة في وقت واحد، مما قد يحسن أداء البرامج، خاصة في الحالات التي تتضمن عمليات حجب مثل انتظار إدخال/إخراج البيانات (I/O).۔ نتيجةً لوجود قفل المترجم العالمي (GIL) في بايثون، فإن فعالية التشغيل المتعدد الخيوط تكون محدودة في العمليات المعتمدة على وحدة المعالجة المركزية (CPU-bound)، لكنها تعمل بكفاءة عالية في العمليات المعتمدة على إدخال/إخراج البيانات (I/O-bound).۔

تقدم الأقسام التالية شرحًا لأساسيات استخدام وحدة threading وكيفية التحكم في الخيوط۔

الاستخدام الأساسي للخيوط

إنشاء وتشغيل الخيوط

لإنشاء خيط وتنفيذ معالجة متزامنة، استخدم الفئة threading.Thread.۔ حدد الدالة المستهدفة لإنشاء خيط وتشغيله.۔

 1import threading
 2import time
 3
 4# Function to be executed in a thread
 5def worker():
 6    print("Worker thread started")
 7    time.sleep(2)
 8    print("Worker thread finished")
 9
10# Create and start the thread
11thread = threading.Thread(target=worker)
12thread.start()
13
14# Processing in the main thread
15print("Main thread continues to run")
16
17# Wait for the thread to finish
18thread.join()
19print("Main thread finished")
  • في هذا المثال، يتم تنفيذ دالة worker في خيط مستقل، بينما يستمر الخيط الرئيسي في العمل.۔ عند استدعاء أسلوب join()، ينتظر الخيط الرئيسي حتى ينتهي الخيط الفرعي من التنفيذ.۔

تسمية الخيوط (Threads)

إعطاء الخيوط أسماء ذات معنى يسهل التسجيل (logging) وتصحيح الأخطاء (debugging)۔ يمكنك تحديد ذلك باستخدام الوسيطة name۔

 1import threading
 2import time
 3
 4# Function to be executed in a thread
 5def worker():
 6    print("Worker thread started")
 7    time.sleep(2)
 8    print("Worker thread finished")
 9
10t = threading.Thread(
11    target=worker,
12    args=("named-worker", 0.3),
13    name="MyWorkerThread"
14)
15
16t.start()
17
18print("Active threads:", threading.active_count())
19for th in threading.enumerate():
20    print(" -", th.name)
21
22t.join()
  • تعيد الدالة threading.enumerate() قائمة بالخيوط الحالية، مما يساعد في تصحيح الأخطاء ومراقبة الحالة۔

توريث الفئة Thread

إذا كنت تريد تخصيص الفئة التي تنفذ الخيط، يمكنك تعريف فئة جديدة عن طريق توريث الفئة threading.Thread

 1import threading
 2import time
 3
 4# Inherit from the Thread class
 5class WorkerThread(threading.Thread):
 6    def __init__(self, name, delay, repeat=3):
 7        super().__init__(name=name)
 8        self.delay = delay
 9        self.repeat = repeat
10        self.results = []
11
12    def run(self):
13        for i in range(self.repeat):
14            msg = f"{self.name} step {i+1}"
15            print(msg)
16            self.results.append(msg)
17            time.sleep(self.delay)
18
19# Create and start the threads
20t1 = WorkerThread("Worker-A", delay=0.4, repeat=3)
21t2 = WorkerThread("Worker-B", delay=0.2, repeat=5)
22
23t1.start()
24t2.start()
25
26t1.join()
27t2.join()
28
29print("Results A:", t1.results)
30print("Results B:", t2.results)
  • في هذا المثال، تم تجاوز الطريقة run() لتعريف سلوك الخيط، مما يسمح لكل خيط بالاحتفاظ ببياناته الخاصة۔ يكون ذلك مفيدًا عندما تقوم الخيوط بمعالجة معقدة أو عند الحاجة إلى أن يمتلك كل خيط بيانات مستقلة خاصة به۔

تزامن بين الخيوط

عندما تصل عدة خيوط إلى موارد مشتركة في نفس الوقت، قد تحدث تنافس في البيانات (Data Races).۔ لمنع ذلك، توفر وحدة threading العديد من آليات التزامن۔

القفل (Lock)

يُستخدم كائن Lock لتنفيذ التحكم الحصري في الموارد بين الخيوط.۔ عندما يقوم خيط واحد بقفل مورد ما، لا يمكن للخيوط الأخرى الوصول إلى ذلك المورد.۔

 1import threading
 2
 3lock = threading.Lock()
 4shared_resource = 0
 5
 6def worker():
 7    global shared_resource
 8    with lock:  # Acquire the lock
 9        local_copy = shared_resource
10        local_copy += 1
11        shared_resource = local_copy
12
13threads = [threading.Thread(target=worker) for _ in range(5)]
14
15for t in threads:
16    t.start()
17
18for t in threads:
19    t.join()
20
21print(f"Final value of shared resource: {shared_resource}")
  • في هذا المثال، تصل خمسة خيوط إلى مورد مشترك، لكن يتم استخدام Lock لمنع تعديل البيانات من قبل عدة خيوط بشكل متزامن.۔

القفل القابل لإعادة الدخول (RLock)

إذا كان نفس الخيط بحاجة لاكتساب القفل عدة مرات، استخدم RLock (القفل القابل لإعادة الدخول)۔ يكون ذلك مفيدًا في حالات الاستدعاءات التكرارية أو استدعاءات المكتبات التي قد تكتسب القفل في استدعاءات مختلفة۔

 1import threading
 2
 3rlock = threading.RLock()
 4shared = []
 5
 6def outer():
 7    with rlock:
 8        shared.append("outer")
 9        inner()
10
11def inner():
12    with rlock:
13        shared.append("inner")
14
15t = threading.Thread(target=outer)
16t.start()
17t.join()
18print(shared)
  • مع RLock، يمكن لنفس الخيط إعادة اكتساب القفل الذي يحتفظ به بالفعل، مما يساعد في منع حالات التوقف (deadlock) مع الأقفال المتداخلة۔

الحالة (Condition)

يُستخدم Condition لجعل الخيوط تنتظر حتى يتم تحقيق شرط معين۔ عندما يحقق أحد الخيوط الشرط، يمكنك استدعاء notify() لإعلام خيط آخر، أو notify_all() لإعلام جميع الخيوط المنتظرة۔

فيما يلي مثال لمنتج ومستهلِك يستخدمان Condition۔

 1import threading
 2
 3condition = threading.Condition()
 4shared_data = []
 5
 6def producer():
 7    with condition:
 8        shared_data.append(1)
 9        print("Produced an item")
10        condition.notify()  # Notify the consumer
11
12def consumer():
13    with condition:
14        condition.wait()  # Wait until the condition is met
15
16        item = shared_data.pop(0)
17        print(f"Consumed an item: {item}")
18
19# Create the threads
20producer_thread = threading.Thread(target=producer)
21consumer_thread = threading.Thread(target=consumer)
22
23consumer_thread.start()
24producer_thread.start()
25
26producer_thread.join()
27consumer_thread.join()
  • يستخدم هذا الكود Condition ليقوم المنتج بإعلام المستهلِك عند إضافة بيانات، بينما ينتظر المستهلِك ذلك الإشعار قبل جلب البيانات، لتحقيق التزامن۔

تحويل الخيوط إلى خيوط خدمية (Daemonization)

يتم إنهاء الخيوط الخدمية (Daemon threads) بشكل إجباري عند انتهاء الخيط الرئيسي۔ بينما يجب أن تنتظر الخيوط العادية لإنهاء نفسها، يتم إنهاء الخيوط الخدمية تلقائيًا۔

 1import threading
 2import time
 3
 4def worker():
 5    while True:
 6        print("Working...")
 7        time.sleep(1)
 8
 9# Create a daemon thread
10thread = threading.Thread(target=worker)
11thread.daemon = True  # Set as a daemon thread
12
13thread.start()
14
15# Processing in the main thread
16time.sleep(3)
17print("Main thread finished")
  • في هذا المثال، تم تحويل خيط worker إلى خيط خادي (daemon)، لذلك يتم إنهاؤه بشكل إجباري عند انتهاء الخيط الرئيسي۔

إدارة الخيوط باستخدام ThreadPoolExecutor

بخلاف وحدة threading، يمكنك استخدام ThreadPoolExecutor من وحدة concurrent.futures لإدارة تجمع الخيوط وتنفيذ المهام بالتوازي۔

 1from concurrent.futures import ThreadPoolExecutor
 2import time
 3
 4def worker(seconds):
 5    print(f"Sleeping for {seconds} second(s)")
 6    time.sleep(seconds)
 7    return f"Finished sleeping for {seconds} second(s)"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    futures = [executor.submit(worker, i) for i in range(1, 4)]
11    for future in futures:
12        print(future.result())
  • يقوم ThreadPoolExecutor بإنشاء تجمع للخيوط ومعالجة المهام بكفاءة۔ حدد عدد الخيوط التي ستعمل بالتوازي باستخدام max_workers۔

التواصل بين الخيوط باستخدام الأحداث (Event Communication)

باستخدام threading.Event، يمكنك تعيين إشارات (Flags) بين الخيوط لإبلاغ الخيوط الأخرى بحدوث حدث معين۔

 1import threading
 2import time
 3
 4event = threading.Event()
 5
 6def worker():
 7    print("Waiting for event to be set")
 8    event.wait()  # Wait until the event is set
 9
10    print("Event received, continuing work")
11
12thread = threading.Thread(target=worker)
13thread.start()
14
15time.sleep(2)
16print("Setting the event")
17event.set()  # Set the event and notify the thread
  • يُظهِر هذا الكود آلية حيث ينتظر خيط العمل إشارة Event ويستأنف المعالجة عندما يستدعي الخيط الرئيسي event.set()۔

معالجة الاستثناءات وإنهاء الخيوط في الخيوط (Threads)

عند حدوث استثناءات في الخيوط، فهي لا تنتقل مباشرة إلى الخيط الرئيسي، لذلك يلزم وجود نمط لالتقاط الاستثناءات ومشاركتها۔

 1import threading
 2import queue
 3
 4def worker(err_q):
 5    try:
 6        raise ValueError("Something bad")
 7    except Exception as e:
 8        err_q.put(e)
 9
10q = queue.Queue()
11t = threading.Thread(target=worker, args=(q,))
12t.start()
13t.join()
14if not q.empty():
15    exc = q.get()
16    print("Worker raised:", exc)
  • من خلال وضع الاستثناءات في Queue واسترجاعها في الخيط الرئيسي، يمكنك اكتشاف حالات الفشل بشكل موثوق۔ إذا استخدمت concurrent.futures.ThreadPoolExecutor، فإن الاستثناءات تُرفع مرة أخرى مع future.result()، مما يسهل التعامل معها۔

قفل المفسر العام (GIL) وتأثيراته

بسبب آلية قفل المفسر العام (GIL) في CPython، لا تعمل تعليمة بايت بايثون متعددة بالفعل في نفس الوقت ضمن نفس العملية۔ بالنسبة للمهام التي تستهلك المعالج بشكل كبير مثل العمليات الحسابية الثقيلة، يُنصح باستخدام multiprocessing۔ من ناحية أخرى، في المهام المرتبطة بإدخال/إخراج مثل قراءة الملفات أو الاتصال بالشبكة، فإن threading يعمل بفعالية۔

الملخص

باستخدام وحدة threading في بايثون، يمكنك تنفيذ برامج متعددة الخيوط وتشغيل عمليات متعددة بالتوازي۔ مع آليات التزامن مثل Lock وCondition، يمكنك الوصول بأمان إلى الموارد المشتركة وإجراء التزامن المعقد۔ بالإضافة إلى ذلك، باستخدام الخيوط الخدمية أو ThreadPoolExecutor، يصبح إدارة الخيوط والمعالجة المتوازية بشكل فعال أسهل۔

يمكنك متابعة المقالة أعلاه باستخدام Visual Studio Code على قناتنا على YouTube.۔ يرجى التحقق من القناة على YouTube أيضًا.۔

YouTube Video