การควบคุมการประสานงานในการประมวลผลแบบอะซิงโครนัสของ Python
บทความนี้อธิบายเกี่ยวกับการควบคุมการประสานงานในการประมวลผลแบบอะซิงโครนัสของ Python
คุณจะได้เรียนรู้ทีละขั้นตอน ตั้งแต่พื้นฐานของ asyncio ไปจนถึงรูปแบบการใช้งานจริงที่ใช้บ่อยสำหรับการควบคุมการประสานงาน
YouTube Video
การควบคุมการประสานงานในการประมวลผลแบบอะซิงโครนัสของ Python
ในการประมวลผลแบบอะซิงโครนัส คุณสามารถเรียกใช้งานหลายๆ งานได้พร้อมกันอย่างง่ายดาย อย่างไรก็ตาม ในทางปฏิบัติแล้ว ต้องมีการจัดการที่ซับซ้อนยิ่งขึ้น เช่น การควบคุมจำนวนงานที่ทำพร้อมกัน การประสานงานระหว่างงาน การควบคุมสิทธิ์การใช้ทรัพยากรร่วม การจัดการกระบวนการซิงโครนัสที่หนัก และการล้างข้อมูลหลังการยกเลิก
ที่นี่ เราจะเรียนรู้ทีละขั้นตอนตั้งแต่พื้นฐานของ asyncio ไปจนถึงรูปแบบปฏิบัติที่มักใช้สำหรับการประสานงาน
เกริ่นนำ: พื้นฐาน (async / await และ create_task)
ลองมาดูตัวอย่างโค้ดแบบอะซิงโครนัสที่เรียบง่ายกันก่อน await จะรอที่จุดนั้นจนกว่า coroutine ที่เรียกจะเสร็จสิ้น และ asyncio.create_task จะกำหนดงานให้รันพร้อมกัน
1import asyncio
2
3async def worker(name, delay):
4 # Simulate async work
5 await asyncio.sleep(delay)
6 return f"{name} done after {delay}s"
7
8async def main():
9 # Create two tasks that run concurrently.
10 t1 = asyncio.create_task(worker("A", 1))
11 t2 = asyncio.create_task(worker("B", 2))
12
13 # Await both results (this suspends until both are done).
14 result1 = await t1
15 result2 = await t2
16 print(result1, result2)
17
18if __name__ == "__main__":
19 asyncio.run(main())- โค้ดนี้เป็นรูปแบบตัวอย่างที่สร้างงานอย่างชัดเจน รันแบบขนาน และรับผลลัพธ์ที่ตอนจบด้วย
awaitcreate_taskช่วยให้สามารถรันงานหลายงานพร้อมกันได้
ความแตกต่างระหว่าง asyncio.gather, asyncio.wait และ asyncio.as_completed
เมื่อต้องประมวลผล coroutine พร้อมกันหลายตัว คุณจะเลือกใช้อะไรขึ้นอยู่กับวิธีที่ต้องการรับผลลัพธ์ gather รอให้ทุกงานเสร็จแล้วคืนค่าผลลัพธ์ตามลำดับที่รับมา ในขณะที่ as_completed อนุญาตให้จัดการผลลัพธ์ทันทีที่แต่ละงานเสร็จโดยไม่เรียงลำดับ
1import asyncio
2import random
3
4async def job(i):
5 delay = random.random() * 2
6 await asyncio.sleep(delay)
7 return (i, delay)
8
9async def gather_example():
10 # gather waits for all tasks and returns results in the same order as input
11 results = await asyncio.gather(*(job(i) for i in range(5)))
12 print("gather order:", results)
13
14async def as_completed_example():
15 # as_completed yields results as they finish (useful to process early results)
16 tasks = [asyncio.create_task(job(i)) for i in range(5)]
17 for coro in asyncio.as_completed(tasks):
18 res = await coro
19 print("completed:", res)
20
21async def main():
22 await gather_example()
23 await as_completed_example()
24
25if __name__ == "__main__":
26 asyncio.run(main())- ตามที่เห็นในโค้ดนี้
gatherจะส่งกลับผลลัพธ์ตามลำดับที่รับเข้า เหมาะสำหรับกรณีที่ต้องการคงลำดับผลลัพธ์as_completedจะใช้เมื่อคุณต้องการประมวลผลผลลัพธ์ทันทีที่แต่ละอันเสร็จ
การควบคุมการทำงานพร้อมกัน: จำกัดจำนวนงานที่รันพร้อมกันด้วย asyncio.Semaphore
เมื่อมีข้อจำกัดเช่นจำนวนครั้งของ API หรือจำนวนการเชื่อมต่อ DB คุณสามารถควบคุมงานที่รันพร้อมกันได้ด้วย Semaphore
1import asyncio
2import random
3
4sem = asyncio.Semaphore(3) # allow up to 3 concurrent workers
5
6async def limited_worker(i):
7 async with sem:
8 # Only 3 tasks can be inside this block concurrently
9 delay = random.random() * 2
10 await asyncio.sleep(delay)
11 print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14 tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15 await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18 asyncio.run(main())- โดยใช้
Semaphoreร่วมกับasync withคุณสามารถจำกัดจำนวนงานที่ทำพร้อมกันได้อย่างง่ายดาย วิธีนี้มีประสิทธิภาพเมื่อมีข้อจำกัดจากภายนอก
การควบคุมการใช้ทรัพยากรร่วมแบบ Exclusive: asyncio.Lock
Lock ใช้เพื่อป้องกันการอัปเดตข้อมูลร่วมกันพร้อมกัน asyncio.Lock เป็นตัวควบคุมที่ใช้สำหรับงานอะซิงโครนัสโดยเฉพาะ
1import asyncio
2
3counter = 0
4lock = asyncio.Lock()
5
6async def incrementer(n_times):
7 global counter
8 for _ in range(n_times):
9 # Acquire lock to update shared counter safely
10 async with lock:
11 tmp = counter
12 await asyncio.sleep(0) # yield control to increase race likelihood
13 counter = tmp + 1
14
15async def main():
16 tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17 await asyncio.gather(*tasks)
18 print("final counter:", counter)
19
20if __name__ == "__main__":
21 asyncio.run(main())- ถ้ามีงานหลายงานอัปเดตตัวแปรร่วมกัน เช่น global
counterอาจเกิดความขัดแย้งได้ โดยการใช้Lockครอบขณะดำเนินการ คุณสามารถรักษาความสอดคล้องของข้อมูลได้
การประสานงานระหว่างงาน: asyncio.Event
Event ใช้เมื่อมีงานหนึ่งแจ้งสถานะว่าเสร็จ และให้งานอื่นๆ รอรับสัญญาณนี้ นี่เป็นวิธีที่ง่ายสำหรับให้งานต่างๆ แชร์สัญญาณและประสานงานกัน
1import asyncio
2
3event = asyncio.Event()
4
5async def waiter(name):
6 print(f"{name} is waiting for event")
7 await event.wait()
8 print(f"{name} resumed after event set")
9
10async def setter():
11 print("setter will sleep and then set the event")
12 await asyncio.sleep(1)
13 event.set()
14 print("event set by setter")
15
16async def main():
17 tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18 await asyncio.create_task(setter())
19 await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22 asyncio.run(main())Eventมีตัวแปร Boolean และเมื่อเรียกset()ทุกงานที่รอจะถูกรันต่อ เหมาะกับการซิงโครไนซ์แบบง่ายๆ
รูปแบบ Producer-Consumer: asyncio.Queue
ด้วยการใช้ Queue ฝ่ายผลิต (producers - สร้างข้อมูล) และฝ่ายบริโภค (consumers - ประมวลผลข้อมูล) สามารถประสานงานกันได้อย่างราบรื่นและแบบอะซิงโครนัส และเมื่อคิวเต็ม ฝ่ายผลิตจะรอโดยอัตโนมัติ ซึ่งเป็นการสร้าง backpressure เพื่อป้องกันการผลิตเกิน
1import asyncio
2import random
3
4async def producer(q, n_items):
5 for i in range(n_items):
6 await asyncio.sleep(random.random() * 0.5)
7 item = f"item-{i}"
8 await q.put(item)
9 print("produced", item)
10 # signal consumers to stop
11 await q.put(None)
12
13async def consumer(q, name):
14 while True:
15 item = await q.get()
16 if item is None:
17 # put sentinel back for other consumers and break
18 await q.put(None)
19 break
20 await asyncio.sleep(random.random() * 1)
21 print(name, "consumed", item)
22 q.task_done()
23
24async def main():
25 q = asyncio.Queue(maxsize=5) # bounded queue to apply backpressure
26 prod = asyncio.create_task(producer(q, 10))
27 cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28 await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31 asyncio.run(main())Queueช่วยประสานงานฝ่ายผลิตและฝ่ายบริโภคในรูปแบบอะซิงโครนัส หากกำหนดmaxsizeเมื่อต้องการเพิ่มข้อมูลเข้าQueueที่เต็มแล้ว ฝ่ายผลิตจะต้องรอ จึงป้องกันการสร้างข้อมูลเกิน
การจัดการกับการดำเนินการที่บล็อกแบบ Synchronous: run_in_executor
สำหรับกระบวนการที่ใช้ CPU หนัก หรือใช้ไลบรารีที่ไม่รองรับ async ให้ใช้ run_in_executor เพื่อกระจายงานไปยัง thread หรือ process อื่น วิธีนี้ช่วยให้ event loop หลักไม่หยุด และงานอะซิงโครนัสอื่นๆ สามารถทำงานต่อได้อย่างราบรื่น
1import asyncio
2import time
3
4def blocking_io(x):
5 # simulate blocking I/O or CPU-bound work
6 time.sleep(2)
7 return x * x
8
9async def main():
10 loop = asyncio.get_running_loop()
11 # run blocking_io in default thread pool
12 result = await loop.run_in_executor(None, blocking_io, 3)
13 print("blocking result:", result)
14
15if __name__ == "__main__":
16 asyncio.run(main())- การเรียกฟังก์ชันแบบซิงโครนัสโดยตรงจะบล็อก event loop ด้วย
run_in_executorโค้ดจะไปรันใน thread อื่น ทำให้งานอะซิงโครนัสสามารถทำต่อได้พร้อมๆ กัน
ตัวอย่าง: การเรียก API ที่จำกัดจำนวน (ผสมผสาน Semaphore + run_in_executor)
ตัวอย่างต่อไปนี้เป็นกรณีที่ต้องเรียก API แบบจำกัดจำนวนครั้ง และต้องประมวลผลข้อมูลผลลัพธ์อย่างหนัก การผสมผสาน Semaphore และ run_in_executor ทำให้สามารถประมวลผลได้อย่างปลอดภัยและมีประสิทธิภาพ
1import asyncio
2import time
3import random
4
5sem = asyncio.Semaphore(5)
6
7def heavy_sync_processing(data):
8 # simulate heavy CPU-bound work
9 time.sleep(1)
10 return f"processed-{data}"
11
12async def api_call(i):
13 await asyncio.sleep(random.random() * 0.5) # simulate network latency
14 return f"data-{i}"
15
16async def worker(i):
17 async with sem:
18 data = await api_call(i)
19 # offload CPU-bound work to threadpool
20 loop = asyncio.get_running_loop()
21 result = await loop.run_in_executor(None, heavy_sync_processing, data)
22 print(result)
23
24async def main():
25 tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26 await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29 asyncio.run(main())- เราใช้
Semaphoreเพื่อจำกัดจำนวนการเรียก API พร้อมกัน และแบ่งงานประมวลผลข้อมูลหนักไปยัง thread pool การแยกขั้นตอนเน็ตเวิร์กและงานที่ใช้ CPU จะช่วยเพิ่มประสิทธิภาพ
การยกเลิกงานและการล้างข้อมูล
เมื่อมีงานถูกยกเลิก การจัดการ finally และ asyncio.CancelledError อย่างเหมาะสมเป็นสิ่งสำคัญมาก เพื่อให้แน่ใจว่าไฟล์และการเชื่อมต่อถูกปิด และสถานะค้างถูกจัดการอย่างเหมาะสม ทำให้แอปพลิเคชันคงความสอดคล้อง
1import asyncio
2
3async def long_running():
4 try:
5 print("started long_running")
6 await asyncio.sleep(10)
7 print("finished long_running")
8 except asyncio.CancelledError:
9 print("long_running was cancelled, cleaning up")
10 # perform cleanup here
11 raise
12
13async def main():
14 task = asyncio.create_task(long_running())
15 await asyncio.sleep(1)
16 task.cancel()
17 try:
18 await task
19 except asyncio.CancelledError:
20 print("task cancelled in main")
21
22if __name__ == "__main__":
23 asyncio.run(main())- การยกเลิกจะถูกส่งมาเป็นข้อยกเว้น (
CancelledError) คุณจึงควรทำการล้างข้อมูลในบล็อกexceptและส่งข้อยกเว้นนี้ต่อหากจำเป็น
ประเด็นสำคัญสำหรับการออกแบบเชิงปฏิบัติ
ต่อไปนี้คือข้อเสนอแนะสำคัญสำหรับออกแบบการประมวลผลแบบอะซิงโครนัส
-
ควบคุมจำนวนงานที่ประมวลผลพร้อมกันอย่างชัดเจน เมื่อมีข้อจำกัดทรัพยากร เช่น API หรือฐานข้อมูล คุณสามารถจำกัดจำนวนงานที่ทำพร้อมกันได้ด้วย
Semaphore -
จัดการทรัพยากรร่วมอย่างปลอดภัย หากต้องอัปเดตสถานะจากหลายงานควรใช้
Lockลดการใช้สถานะร่วม และออกแบบให้ใช้ข้อมูลที่เปลี่ยนแปลงไม่ได้ จะยิ่งปลอดภัย -
เลือกรูปแบบการรับผลลัพธ์ หากต้องการประมวลผลทันทีที่งานเสร็จ ให้ใช้
asyncio.as_completed; ถ้าต้องการผลลัพธ์เรียงตามลำดับ ให้ใช้gather -
แยกงานซิงโครนัสหนักออกจากงานหลัก สำหรับงานที่ใช้ CPU หนัก หรือเรียกไลบรารีซิงโครนัส ให้ใช้
run_in_executorหรือProcessPoolExecutorเพื่อไม่ให้ event loop ถูกบล็อก -
วางแผนสำหรับการยกเลิกและข้อผิดพลาด เขียนกลไกจัดการข้อผิดพลาดให้เหมาะสม เพื่อให้แน่ใจว่าทรัพยากรถูกล้างแม้งานจะถูกยกเลิกกะทันหัน
-
ให้ง่ายต่อการทดสอบ แยกส่วนที่มีผลข้างเคียงเช่น I/O, เวลา, การสุ่ม เพื่อให้ง่ายต่อการแทนที่และทดสอบโค้ดอะซิงโครนัส
สรุป
asyncio ทรงพลังมาก แต่ถ้ามัวแต่เน้น “รันงานพร้อมกัน” เพียงอย่างเดียว อาจเจอปัญหาอย่างเช่น การแย่งชิงทรัพยากรร่วม ปัญหาข้อจำกัดของทรัพยากร หรือ event loop ถูกบล็อก การผสมผสานการใช้ Semaphore, Lock, Event, Queue, run_in_executor และกลยุทธ์การยกเลิกที่เหมาะสม จะช่วยออกแบบแอปพลิเคชันอะซิงโครนัสที่ปลอดภัยและมีประสิทธิภาพ ด้วยการใช้กลไกอย่างรูปแบบ producer-consumer, การจำกัดจำนวนงานพร้อมกัน หรือแยกงานอะซิงโครนัสกับงานซิงโครนัสออกจากกัน จะช่วยสร้าง workflow แบบอะซิงโครนัสให้ปลอดภัยและมีประสิทธิภาพยิ่งขึ้น
คุณสามารถติดตามบทความข้างต้นโดยใช้ Visual Studio Code บนช่อง YouTube ของเรา กรุณาตรวจสอบช่อง YouTube ด้วย