מודול `threading` ב-Python
מאמר זה מסביר את מודול threading ב-Python.
YouTube Video
מודול threading ב-Python
מודול threading ב-Python הוא ספרייה סטנדרטית התומכת בתכנות מרובה אשכולות. שימוש בתהליכים מאפשר להריץ מספר תהליכים בו-זמנית, דבר שיכול לשפר את ביצועי התוכניות, במיוחד במקרים הכוללים פעולות חוסמות כמו המתנות I/O. בשל Global Interpreter Lock (GIL) של פייתון, היעילות של ריבוי תהליכים מוגבלת עבור תהליכים התלויים בעומסי CPU, אך היא עובדת ביעילות עבור תהליכים התלויים ב-I/O.
הקטעים הבאים מסבירים את הבסיסים לשימוש במודול threading וכיצד לשלוט באשכולות.
שימוש בסיסי בתהליכים
יצירה והרצה של תהליכים
כדי ליצור תהליך ולבצע עיבוד מקביל, יש להשתמש במחלקת threading.Thread. יש להגדיר את הפונקציה הרצויה לצורך יצירת תהליך ולהריץ את התהליך.
1import threading
2import time
3
4# Function to be executed in a thread
5def worker():
6 print("Worker thread started")
7 time.sleep(2)
8 print("Worker thread finished")
9
10# Create and start the thread
11thread = threading.Thread(target=worker)
12thread.start()
13
14# Processing in the main thread
15print("Main thread continues to run")
16
17# Wait for the thread to finish
18thread.join()
19print("Main thread finished")- בדוגמה זו, פונקציית
workerמתבצעת בתהליך נפרד, בעוד שהתהליך הראשי ממשיך לפעול. על ידי קריאה למתודהjoin(), התהליך הראשי ממתין עד שהתהליך המשני יסתיים.
מתן שמות לתהליכונים
מתן שמות משמעותיים לתהליכונים מקל על רישום יומנים ואיתור באגים. אתה יכול לציין זאת באמצעות הפרמטר name.
1import threading
2import time
3
4# Function to be executed in a thread
5def worker():
6 print("Worker thread started")
7 time.sleep(2)
8 print("Worker thread finished")
9
10t = threading.Thread(
11 target=worker,
12 args=("named-worker", 0.3),
13 name="MyWorkerThread"
14)
15
16t.start()
17
18print("Active threads:", threading.active_count())
19for th in threading.enumerate():
20 print(" -", th.name)
21
22t.join()threading.enumerate()מחזירה רשימה של כל התהליכונים הפעילים, מה שמועיל לניפוי שגיאות ולמעקב מצב.
ירושה של מחלקת Thread
אם ברצונך להתאים אישית את מחלקת הרצת התהליכים, ניתן להגדיר מחלקה חדשה באמצעות ירושה מ-threading.Thread.
1import threading
2import time
3
4# Inherit from the Thread class
5class WorkerThread(threading.Thread):
6 def __init__(self, name, delay, repeat=3):
7 super().__init__(name=name)
8 self.delay = delay
9 self.repeat = repeat
10 self.results = []
11
12 def run(self):
13 for i in range(self.repeat):
14 msg = f"{self.name} step {i+1}"
15 print(msg)
16 self.results.append(msg)
17 time.sleep(self.delay)
18
19# Create and start the threads
20t1 = WorkerThread("Worker-A", delay=0.4, repeat=3)
21t2 = WorkerThread("Worker-B", delay=0.2, repeat=5)
22
23t1.start()
24t2.start()
25
26t1.join()
27t2.join()
28
29print("Results A:", t1.results)
30print("Results B:", t2.results)- בדוגמה זו, שיטת
run()נדרסת כדי להגדיר את התנהגות התהליכון, כך שלכל תהליכון יש נתונים נפרדים משלו. זה מועיל כאשר תהליכונים מבצעים עיבוד מורכב או כאשר אתה רוצה שלכל תהליכון יהיו נתונים עצמאיים משלו.
סנכרון בין תהליכים
כאשר מספר תהליכים ניגשים למשאבים משותפים בו-זמנית, עשויות להתרחש מרוצי נתונים. כדי למנוע זאת, מודול threading מספק מספר מנגנוני סנכרון.
נעילה (Lock)
אובייקט Lock משמש ליישום שליטה בלעדית על משאבים בין תהליכים. כאשר תהליך אחד נועל משאב, תהליכים אחרים לא יכולים לגשת למשאב זה.
1import threading
2
3lock = threading.Lock()
4shared_resource = 0
5
6def worker():
7 global shared_resource
8 with lock: # Acquire the lock
9 local_copy = shared_resource
10 local_copy += 1
11 shared_resource = local_copy
12
13threads = [threading.Thread(target=worker) for _ in range(5)]
14
15for t in threads:
16 t.start()
17
18for t in threads:
19 t.join()
20
21print(f"Final value of shared resource: {shared_resource}")- בדוגמה זו, חמישה תהליכים ניגשים למשאב משותף, אך נעשה שימוש ב-
Lockכדי למנוע ממספר תהליכים לשנות את הנתונים בו-זמנית.
נעילה חוזרת (RLock)
אם תהליכון צריך להשיג נעילה מספר פעמים, השתמש ב־RLock (נעילה חוזרת). זה שימושי לקריאות רקורסיביות או לקריאות של ספריה שעשויות להשיג נעילות במספר קריאות שונות.
1import threading
2
3rlock = threading.RLock()
4shared = []
5
6def outer():
7 with rlock:
8 shared.append("outer")
9 inner()
10
11def inner():
12 with rlock:
13 shared.append("inner")
14
15t = threading.Thread(target=outer)
16t.start()
17t.join()
18print(shared)- עם
RLock, תהליכון יכול להשיג שוב נעילה שכבר ברשותו, וכך מסייע למנוע קיפאון (deadlock) ברכישת נעילות מקוננות.
תנאי (Condition)
Condition משמשת עבור אשכולות להמתין עד שמתקיים תנאי מסוים. כאשר תָּחָרִית (thread) ממלאה תנאי, תוכל לקרוא ל־notify() כדי להודיע לתָּחָרִית אחר, או ל־notify_all() כדי להודיע לכל התָּחָרִיות הממתינות.
להלן דוגמה של יצרן וצרכן המשתמשים ב־Condition.
1import threading
2
3condition = threading.Condition()
4shared_data = []
5
6def producer():
7 with condition:
8 shared_data.append(1)
9 print("Produced an item")
10 condition.notify() # Notify the consumer
11
12def consumer():
13 with condition:
14 condition.wait() # Wait until the condition is met
15
16 item = shared_data.pop(0)
17 print(f"Consumed an item: {item}")
18
19# Create the threads
20producer_thread = threading.Thread(target=producer)
21consumer_thread = threading.Thread(target=consumer)
22
23consumer_thread.start()
24producer_thread.start()
25
26producer_thread.join()
27consumer_thread.join()- קוד זה משתמש ב־
Conditionכך שהיצרן מודיע כאשר נתונים מתווספים, והצרכן ממתין להתראה לפני שליפת הנתונים, וכך מושגת סנכרון.
הפיכת אשכול לשדון (Daemon)
אשכולות שדון (Daemon) נעצרים בכוח כאשר האשכול הראשי מסתיים. בעוד שאשכולות רגילים צריכים להמתין כדי להסתיים, אשכולות שדון (Daemon) מסתיימים אוטומטית.
1import threading
2import time
3
4def worker():
5 while True:
6 print("Working...")
7 time.sleep(1)
8
9# Create a daemon thread
10thread = threading.Thread(target=worker)
11thread.daemon = True # Set as a daemon thread
12
13thread.start()
14
15# Processing in the main thread
16time.sleep(3)
17print("Main thread finished")- בדוגמה זו, אשכול ה
workerהוא שדון (Daemon), ולכן הוא נעצר בכוח כאשר האשכול הראשי מסתיים.
ניהול אשכולות עם ThreadPoolExecutor
בנוסף למודול threading, ניתן להשתמש ב-ThreadPoolExecutor ממודול concurrent.futures לניהול מאגר אשכולות ולהרצת משימות במקביל.
1from concurrent.futures import ThreadPoolExecutor
2import time
3
4def worker(seconds):
5 print(f"Sleeping for {seconds} second(s)")
6 time.sleep(seconds)
7 return f"Finished sleeping for {seconds} second(s)"
8
9with ThreadPoolExecutor(max_workers=3) as executor:
10 futures = [executor.submit(worker, i) for i in range(1, 4)]
11 for future in futures:
12 print(future.result())ThreadPoolExecutorיוצר מאגר אשכולות ומעבד משימות ביעילות. ניתן לקבוע את מספר האשכולות שירוצו במקביל באמצעותmax_workers.
תקשורת אירועים בין אשכולות
באמצעות threading.Event, ניתן להגדיר דגלים בין אשכולות כדי להודיע על התרחשות של אירוע.
1import threading
2import time
3
4event = threading.Event()
5
6def worker():
7 print("Waiting for event to be set")
8 event.wait() # Wait until the event is set
9
10 print("Event received, continuing work")
11
12thread = threading.Thread(target=worker)
13thread.start()
14
15time.sleep(2)
16print("Setting the event")
17event.set() # Set the event and notify the thread- קוד זה מדגים מנגנון שבו תהליכון עובד ממתין לאות
Event, וממשיך לעבד לאחר שהתהליכון הראשי קורא ל־event.set().
טיפול בחריגות וסיום תהליכונים
כאשר מתרחשות חריגות בתהליכונים, הן לא מועברות ישירות לתהליכון הראשי ולכן יש צורך במנגנון לקליטה ושיתוף של חריגות.
1import threading
2import queue
3
4def worker(err_q):
5 try:
6 raise ValueError("Something bad")
7 except Exception as e:
8 err_q.put(e)
9
10q = queue.Queue()
11t = threading.Thread(target=worker, args=(q,))
12t.start()
13t.join()
14if not q.empty():
15 exc = q.get()
16 print("Worker raised:", exc)- על ידי הכנסת חריגות ל־
Queueושליפתן בתָּחָרִית הראשי, תוכל לזהות כשלים בצורה מהימנה. אם אתה משתמש ב־concurrent.futures.ThreadPoolExecutor, החריגות נזרקות מחדש בעת קריאה ל־future.result(), מה שמקל על הטיפול בהן.
ה־GIL (נעילת מפרש גלובלית) והשפעותיו
עקב מנגנון ה־GIL (Global Interpreter Lock) ב־CPython, כמה קוד־בייט של פייתון לא רצים בו־זמנית בתהליך יחיד. עבור משימות עתירות־מעבד, כמו חישובים כבדים, מומלץ להשתמש ב־multiprocessing. לעומת זאת, למשימות תלויות־קלט־פלט כגון קריאת קבצים או תקשורת רשת, threading עובד ביעילות.
סיכום
באמצעות מודול threading של Python, ניתן ליישם תוכניות מרובות אשכולות ולהריץ תהליכים מרובים במקביל. עם מנגנוני סנכרון כמו Lock ו-Condition, ניתן לגשת בבטחה למשאבים משותפים ולבצע סנכרון מורכב. בנוסף, באמצעות שימוש באשכולות שדון (Daemon) או ב-ThreadPoolExecutor, ניהול אשכולות ועיבוד מקבילי יעיל הופך לפשוט יותר.
תוכלו לעקוב אחר המאמר שלמעלה באמצעות Visual Studio Code בערוץ היוטיוב שלנו. נא לבדוק גם את ערוץ היוטיוב.