בקרת סנכרון בעיבוד אסינכרוני בפייתון
מאמר זה מסביר את בקרת הסנכרון בעיבוד אסינכרוני בפייתון.
תלמדו שלב אחר שלב, מהבסיסים של asyncio ועד לדפוסים מעשיים הנפוצים לשליטה בסנכרון.
YouTube Video
בקרת סנכרון בעיבוד אסינכרוני בפייתון
בעיבוד אסינכרוני ניתן להריץ בקלות מספר משימות במקביל. אך בפועל נדרשות התאמות מתקדמות יותר, כגון בקרה על מקביליות, תיאום משימות, בקרה בלעדית על משאבים משותפים, טיפול בתהליכים סינכרוניים כבדים וניקוי אחר ביטולים.
כאן נלמד צעד אחר צעד מהבסיס של asyncio ועד דפוסים מעשיים נפוצים לסנכרון.
מבוא: יסודות (async / await ו-create_task)
בואו נבחן תחילה דוגמה לקוד אסינכרוני מינימלי. await ממתין בנקודה זו עד שהקורוטינה המסומנת מסתיימת, ו-asyncio.create_task משבץ משימה לביצוע מקבילי.
1import asyncio
2
3async def worker(name, delay):
4 # Simulate async work
5 await asyncio.sleep(delay)
6 return f"{name} done after {delay}s"
7
8async def main():
9 # Create two tasks that run concurrently.
10 t1 = asyncio.create_task(worker("A", 1))
11 t2 = asyncio.create_task(worker("B", 2))
12
13 # Await both results (this suspends until both are done).
14 result1 = await t1
15 result2 = await t2
16 print(result1, result2)
17
18if __name__ == "__main__":
19 asyncio.run(main())- הקוד הזה הוא דפוס טיפוסי שבו משימות נוצרתות במפורש, רצות במקביל, והתגובות מתקבלות בסוף באמצעות
await.create_taskמאפשר ביצוע מקבילי.
הבדלים בין asyncio.gather, asyncio.wait ו-asyncio.as_completed
בעת הרצת מספר קורוטינות במקביל, תוכל לבחור באיזו מהן להשתמש בהתאם לאופן בו אתה רוצה לקבל את התוצאות. gather ממתין להשלמת כולן ומחזיר את התוצאות לפי סדר הקלט, בעוד ש-as_completed מאפשר לעבד תוצאות מיד כשהן מסתיימות, ללא תלות בסדר.
1import asyncio
2import random
3
4async def job(i):
5 delay = random.random() * 2
6 await asyncio.sleep(delay)
7 return (i, delay)
8
9async def gather_example():
10 # gather waits for all tasks and returns results in the same order as input
11 results = await asyncio.gather(*(job(i) for i in range(5)))
12 print("gather order:", results)
13
14async def as_completed_example():
15 # as_completed yields results as they finish (useful to process early results)
16 tasks = [asyncio.create_task(job(i)) for i in range(5)]
17 for coro in asyncio.as_completed(tasks):
18 res = await coro
19 print("completed:", res)
20
21async def main():
22 await gather_example()
23 await as_completed_example()
24
25if __name__ == "__main__":
26 asyncio.run(main())- כפי שמוצג בקוד זה,
gatherמחזיר את התוצאות בסדר הקלט, מה שהופך אותו לשימושי כאשר רוצים לשמור על הסדר. משתמשים ב-as_completedכאשר רוצים לעבד תוצאות מיד כשהן מסתיימות.
בקרת מקביליות: הגבלת הרצות בו-זמניות עם asyncio.Semaphore
כאשר קיימות מגבלות קצב על API חיצוני או חיבורי DB, ניתן לשלוט בהרצות מקביליות באמצעות Semaphore.
1import asyncio
2import random
3
4sem = asyncio.Semaphore(3) # allow up to 3 concurrent workers
5
6async def limited_worker(i):
7 async with sem:
8 # Only 3 tasks can be inside this block concurrently
9 delay = random.random() * 2
10 await asyncio.sleep(delay)
11 print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14 tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15 await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18 asyncio.run(main())- באמצעות שימוש ב-
Semaphoreיחד עםasync with, ניתן להגביל בקלות את מספר ההרצות בו-זמנית. זה יעיל במצבים עם מגבלות חיצוניות.
בקרה בלעדית על משאבים משותפים: asyncio.Lock
משתמשים ב-Lock כדי למנוע עדכונים בו זמניים לנתונים משותפים. asyncio.Lock הוא פרימיטיב בלעדי לשימוש אסינכרוני.
1import asyncio
2
3counter = 0
4lock = asyncio.Lock()
5
6async def incrementer(n_times):
7 global counter
8 for _ in range(n_times):
9 # Acquire lock to update shared counter safely
10 async with lock:
11 tmp = counter
12 await asyncio.sleep(0) # yield control to increase race likelihood
13 counter = tmp + 1
14
15async def main():
16 tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17 await asyncio.gather(*tasks)
18 print("final counter:", counter)
19
20if __name__ == "__main__":
21 asyncio.run(main())- אם מספר משימות מעדכנות משתנה משותף כמו
counterגלובלי, עלולים להיווצר קונפליקטים. באמצעות הכללת פעולות בתוךLock, ניתן לשמור על עקביות.
תיאום משימות: asyncio.Event
משתמשים ב-Event כאשר משימה אחת מאותת שהיא מוכנה ומשימות אחרות ממתינות לאות הזה. זו דרך פשוטה עבור משימות לשתף אותות ולסנכרן ביניהן.
1import asyncio
2
3event = asyncio.Event()
4
5async def waiter(name):
6 print(f"{name} is waiting for event")
7 await event.wait()
8 print(f"{name} resumed after event set")
9
10async def setter():
11 print("setter will sleep and then set the event")
12 await asyncio.sleep(1)
13 event.set()
14 print("event set by setter")
15
16async def main():
17 tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18 await asyncio.create_task(setter())
19 await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22 asyncio.run(main())- ל-
Eventיש דגל בוליאני, וקריאה ל-set()מחזירה את כל המשימות הממתינות להמשך ריצה. זה שימושי לסנכרון פשוט.
דפוס יצרן-צרכן: asyncio.Queue
באמצעות Queue, יצרנים (שיוצרים נתונים) וצרכנים (המבצעים עיבוד) יכולים לתאם באופן חלק ואסינכרוני. כאשר התור מלא, יצרנים ממתינים אוטומטית, ובכך מיושם בקלות backpressure שמונע ייצור יתר.
1import asyncio
2import random
3
4async def producer(q, n_items):
5 for i in range(n_items):
6 await asyncio.sleep(random.random() * 0.5)
7 item = f"item-{i}"
8 await q.put(item)
9 print("produced", item)
10 # signal consumers to stop
11 await q.put(None)
12
13async def consumer(q, name):
14 while True:
15 item = await q.get()
16 if item is None:
17 # put sentinel back for other consumers and break
18 await q.put(None)
19 break
20 await asyncio.sleep(random.random() * 1)
21 print(name, "consumed", item)
22 q.task_done()
23
24async def main():
25 q = asyncio.Queue(maxsize=5) # bounded queue to apply backpressure
26 prod = asyncio.create_task(producer(q, 10))
27 cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28 await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31 asyncio.run(main())Queueמסייע בתיאום אסינכרוני בין יצרנים וצרכנים. בנוסף, הגדרתmaxsizeגורמת ליצרן להמתין ב-putכאשר התור מלא, וכך נמנע ייצור יתר.
טיפול בפעולות חסימה סינכרוניות: run_in_executor
לעיבוד כבד על המעבד או כאשר ספריה אינה תומכת ב-async, השתמש ב-run_in_executor להעברת העיבוד לשרשור או תהליך אחר. כך נמנעת עצירת הלולאת האירועים הראשית, ומשימות אסינכרוניות אחרות ממשיכות לרוץ בצורה חלקה.
1import asyncio
2import time
3
4def blocking_io(x):
5 # simulate blocking I/O or CPU-bound work
6 time.sleep(2)
7 return x * x
8
9async def main():
10 loop = asyncio.get_running_loop()
11 # run blocking_io in default thread pool
12 result = await loop.run_in_executor(None, blocking_io, 3)
13 print("blocking result:", result)
14
15if __name__ == "__main__":
16 asyncio.run(main())- קריאה ישירה לפונקציות סינכרוניות תחסום את לולאת האירועים. באמצעות
run_in_executor, הקוד רץ בשרשור נפרד ומשימות אסינכרוניות ממשיכות להתקדם במקביל.
דוגמה: קריאות API עם הגבלת קצב (שילוב של Semaphore ו- run_in_executor)
להלן תרחיש דוגמה שבו קריאות API מוגבלות קצב, ומבוצע עיבוד כבד על התוצאות. שילוב של Semaphore ו-run_in_executor מאפשר לעבד בצורה בטוחה ויעילה.
1import asyncio
2import time
3import random
4
5sem = asyncio.Semaphore(5)
6
7def heavy_sync_processing(data):
8 # simulate heavy CPU-bound work
9 time.sleep(1)
10 return f"processed-{data}"
11
12async def api_call(i):
13 await asyncio.sleep(random.random() * 0.5) # simulate network latency
14 return f"data-{i}"
15
16async def worker(i):
17 async with sem:
18 data = await api_call(i)
19 # offload CPU-bound work to threadpool
20 loop = asyncio.get_running_loop()
21 result = await loop.run_in_executor(None, heavy_sync_processing, data)
22 print(result)
23
24async def main():
25 tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26 await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29 asyncio.run(main())- אנו משתמשים ב-
Semaphoreלהגבלת מספר הקריאות המקבילות ל-API, והעיבוד הכבד על הנתונים מועבר ל-thread pool. הפרדת עיבוד רשת ומעבד משפרת את היעילות.
ביטול משימות וניקוי משאבים
כאשר משימה מתבטלת, חשוב מאוד לטפל נכון ב-finally וב-asyncio.CancelledError. כך משוחררים קבצים וחיבורים ומצבים ביניים מטופלים נכון – נשמרת עקביות באפליקציה.
1import asyncio
2
3async def long_running():
4 try:
5 print("started long_running")
6 await asyncio.sleep(10)
7 print("finished long_running")
8 except asyncio.CancelledError:
9 print("long_running was cancelled, cleaning up")
10 # perform cleanup here
11 raise
12
13async def main():
14 task = asyncio.create_task(long_running())
15 await asyncio.sleep(1)
16 task.cancel()
17 try:
18 await task
19 except asyncio.CancelledError:
20 print("task cancelled in main")
21
22if __name__ == "__main__":
23 asyncio.run(main())- ביטול נמסר בתור חריגה (
CancelledError), לכן יש לבצע ניקוי נדרש בבלוק ה-exceptולזרוק מחדש את החריגה במידת הצורך.
נקודות מפתח לעיצוב מעשי
להלן נקודות מעשיות שימושיות לתכנון עיבוד אסינכרוני.
-
שלוט במקביליות במפורש כאשר יש מגבלות משאבים כמו API או מסדי נתונים, ניתן להגביל את מספר ההרצות המקבילות בעזרת
Semaphore. -
טפל במשאבים משותפים בבטחה אם עליך לעדכן מצב ממספר משימות, השתמש ב-
Lock. הקטנת מצב משותף ועיצוב סביב נתונים immutable הופכת את המערכת לבטוחה יותר. -
בחר כיצד לקבל תוצאות אם אתה רוצה לעבד משימות ברגע שהן מסתיימות, השתמש ב-
asyncio.as_completed; אם ברצונך לקבל תוצאות בסדר הקלט, השתמש ב-gather. -
בודד עיבוד סינכרוני כבד לעיבוד כבד על ה-CPU או קריאות לספריות סינכרוניות, השתמש ב-
run_in_executorאוProcessPoolExecutorכדי למנוע חסימה של לולאת האירועים. -
תכנן לטפל בביטולים וחריגות כתוב טיפול נכון בחריגות כדי לנקות משאבים בבטחה גם אם משימה מבוטלת באמצע.
-
הפוך את הבדיקה לקלה הפשט פעולות צד (I/O, זמן, אקראיות) כדי לאפשר החלפה, מה שמקל על בדיקת קוד אסינכרוני.
סיכום
asyncio היא חזקה, אך אם תתמקד רק ב"הרצת דברים במקביל", עלולות להתעורר בעיות כגון התנגשות על משאבים משותפים, חריגה ממגבלות משאב, או חסימת לולאת האירועים. על ידי שילוב של Semaphore, Lock, Event, Queue, run_in_executor וטיפול נכון בביטול, תוכל לעצב יישומים אסינכרוניים בטוחים ויעילים. באמצעות שימוש במנגנונים כגון דפוס יצרן-צרכן, הגבלת מקביליות או הפרדת עיבוד אסינכרוני וחוסם, ניתן לבנות תהליכי עבודה אסינכרוניים בצורה בטוחה ויעילה יותר.
תוכלו לעקוב אחר המאמר שלמעלה באמצעות Visual Studio Code בערוץ היוטיוב שלנו. נא לבדוק גם את ערוץ היוטיוב.