Kiểm soát đồng bộ trong xử lý bất đồng bộ Python
Bài viết này giải thích cách kiểm soát đồng bộ trong xử lý bất đồng bộ Python.
Bạn sẽ học từng bước, từ kiến thức cơ bản về asyncio đến các mẫu thực tế thường được sử dụng để kiểm soát đồng bộ hóa.
YouTube Video
Kiểm soát đồng bộ trong xử lý bất đồng bộ Python
Trong xử lý bất đồng bộ, bạn có thể dễ dàng chạy đồng thời nhiều tác vụ. Tuy nhiên, trong thực tế, cần có các điều chỉnh nâng cao hơn như kiểm soát số lượng đồng thời, phối hợp tác vụ, kiểm soát tài nguyên chia sẻ, xử lý các tiến trình đồng bộ nặng và dọn dẹp sau khi hủy tác vụ.
Tại đây, chúng ta sẽ học từng bước từ những kiến thức cơ bản về asyncio đến những mẫu thực tiễn thường dùng để đồng bộ hóa.
Giới thiệu: Cơ bản (async / await và create_task)
Hãy cùng xem trước một số đoạn mã bất đồng bộ đơn giản. await sẽ dừng tại đó cho đến khi coroutine được gọi hoàn thành, còn asyncio.create_task sẽ tạo một nhiệm vụ để thực thi đồng thời.
1import asyncio
2
3async def worker(name, delay):
4 # Simulate async work
5 await asyncio.sleep(delay)
6 return f"{name} done after {delay}s"
7
8async def main():
9 # Create two tasks that run concurrently.
10 t1 = asyncio.create_task(worker("A", 1))
11 t2 = asyncio.create_task(worker("B", 2))
12
13 # Await both results (this suspends until both are done).
14 result1 = await t1
15 result2 = await t2
16 print(result1, result2)
17
18if __name__ == "__main__":
19 asyncio.run(main())- Đây là mẫu mã phổ biến khi các tác vụ được tạo một cách rõ ràng, chạy song song, và nhận kết quả sau cùng bằng
await.create_taskcho phép thực thi đồng thời các tác vụ.
Sự khác biệt giữa asyncio.gather, asyncio.wait, và asyncio.as_completed
Khi chạy nhiều coroutine đồng thời, bạn sẽ chọn phương thức tùy thuộc vào cách muốn lấy kết quả. gather đợi tất cả hoàn thành và trả kết quả theo thứ tự đầu vào, còn as_completed cho phép xử lý kết quả khi chúng xong, bất kể thứ tự.
1import asyncio
2import random
3
4async def job(i):
5 delay = random.random() * 2
6 await asyncio.sleep(delay)
7 return (i, delay)
8
9async def gather_example():
10 # gather waits for all tasks and returns results in the same order as input
11 results = await asyncio.gather(*(job(i) for i in range(5)))
12 print("gather order:", results)
13
14async def as_completed_example():
15 # as_completed yields results as they finish (useful to process early results)
16 tasks = [asyncio.create_task(job(i)) for i in range(5)]
17 for coro in asyncio.as_completed(tasks):
18 res = await coro
19 print("completed:", res)
20
21async def main():
22 await gather_example()
23 await as_completed_example()
24
25if __name__ == "__main__":
26 asyncio.run(main())- Như trong đoạn mã này,
gathertrả kết quả theo thứ tự đầu vào nên hữu ích khi bạn cần giữ thứ tự.as_completeddùng khi bạn muốn xử lý kết quả ngay khi xong.
Kiểm soát đồng thời: Giới hạn số lần thực thi đồng thời với asyncio.Semaphore
Khi có giới hạn tốc độ API bên ngoài hoặc giới hạn kết nối cơ sở dữ liệu, bạn có thể kiểm soát số lần thực thi đồng thời với Semaphore.
1import asyncio
2import random
3
4sem = asyncio.Semaphore(3) # allow up to 3 concurrent workers
5
6async def limited_worker(i):
7 async with sem:
8 # Only 3 tasks can be inside this block concurrently
9 delay = random.random() * 2
10 await asyncio.sleep(delay)
11 print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14 tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15 await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18 asyncio.run(main())- Bằng cách sử dụng
Semaphorevớiasync with, bạn có thể dễ dàng giới hạn số lần thực thi đồng thời. Cách này rất hiệu quả trong các trường hợp bị giới hạn ngoài.
Kiểm soát độc quyền tài nguyên chia sẻ: asyncio.Lock
Lock dùng để ngăn việc cập nhật đồng thời lên dữ liệu chia sẻ. asyncio.Lock là công cụ kiểm soát truy cập riêng biệt cho các tác vụ bất đồng bộ.
1import asyncio
2
3counter = 0
4lock = asyncio.Lock()
5
6async def incrementer(n_times):
7 global counter
8 for _ in range(n_times):
9 # Acquire lock to update shared counter safely
10 async with lock:
11 tmp = counter
12 await asyncio.sleep(0) # yield control to increase race likelihood
13 counter = tmp + 1
14
15async def main():
16 tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17 await asyncio.gather(*tasks)
18 print("final counter:", counter)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Nếu nhiều tác vụ cùng cập nhật một biến chia sẻ như
countertoàn cục, có thể phát sinh xung đột. Khi bao bọc các thao tác bằngLock, bạn có thể duy trì tính nhất quán.
Phối hợp tác vụ: asyncio.Event
Event dùng khi một tác vụ phát tín hiệu sẵn sàng còn các tác vụ khác chờ tín hiệu này. Đây là cách đơn giản để các tác vụ chia sẻ tín hiệu và đồng bộ với nhau.
1import asyncio
2
3event = asyncio.Event()
4
5async def waiter(name):
6 print(f"{name} is waiting for event")
7 await event.wait()
8 print(f"{name} resumed after event set")
9
10async def setter():
11 print("setter will sleep and then set the event")
12 await asyncio.sleep(1)
13 event.set()
14 print("event set by setter")
15
16async def main():
17 tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18 await asyncio.create_task(setter())
19 await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22 asyncio.run(main())Eventcó cờ boolean, khi gọiset()sẽ tiếp tục tất cả các tác vụ đang chờ. Rất hữu ích cho việc đồng bộ đơn giản.
Mẫu sản xuất-tiêu thụ: asyncio.Queue
Sử dụng Queue, nhà sản xuất (tạo dữ liệu) và nhà tiêu thụ (xử lý dữ liệu) có thể phối hợp mượt mà, bất đồng bộ. Khi hàng đợi đầy, nhà sản xuất sẽ tự động chờ, giúp kiểm soát quá trình sản xuất tự nhiên và tránh dư thừa.
1import asyncio
2import random
3
4async def producer(q, n_items):
5 for i in range(n_items):
6 await asyncio.sleep(random.random() * 0.5)
7 item = f"item-{i}"
8 await q.put(item)
9 print("produced", item)
10 # signal consumers to stop
11 await q.put(None)
12
13async def consumer(q, name):
14 while True:
15 item = await q.get()
16 if item is None:
17 # put sentinel back for other consumers and break
18 await q.put(None)
19 break
20 await asyncio.sleep(random.random() * 1)
21 print(name, "consumed", item)
22 q.task_done()
23
24async def main():
25 q = asyncio.Queue(maxsize=5) # bounded queue to apply backpressure
26 prod = asyncio.create_task(producer(q, 10))
27 cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28 await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31 asyncio.run(main())Queuegiúp phối hợp bất đồng bộ giữa nhà sản xuất và nhà tiêu thụ. Thêm nữa, đặtmaxsizekhiến nhà sản xuất chờ khi hàng đợi đầy, tránh dư thừa.
Xử lý các hoạt động đồng bộ gây chặn: run_in_executor
Đối với xử lý nặng CPU hoặc khi sử dụng thư viện không hỗ trợ async, hãy dùng run_in_executor để chuyển xử lý sang luồng hoặc tiến trình khác. Làm như vậy, vòng lặp sự kiện chính không bị dừng lại, cho phép các tác vụ bất đồng bộ khác chạy mượt mà.
1import asyncio
2import time
3
4def blocking_io(x):
5 # simulate blocking I/O or CPU-bound work
6 time.sleep(2)
7 return x * x
8
9async def main():
10 loop = asyncio.get_running_loop()
11 # run blocking_io in default thread pool
12 result = await loop.run_in_executor(None, blocking_io, 3)
13 print("blocking result:", result)
14
15if __name__ == "__main__":
16 asyncio.run(main())- Gọi trực tiếp hàm đồng bộ sẽ khiến vòng lặp sự kiện bị chặn. Với
run_in_executor, mã được chạy ở luồng riêng và các tác vụ async vẫn tiếp tục chạy song song.
Ví dụ: Gọi API bị giới hạn tốc độ (kết hợp Semaphore + run_in_executor)
Dưới đây là tình huống mẫu, khi việc gọi API bị giới hạn và cần xử lý nặng đối với kết quả. Kết hợp Semaphore và run_in_executor giúp việc xử lý diễn ra an toàn, hiệu quả.
1import asyncio
2import time
3import random
4
5sem = asyncio.Semaphore(5)
6
7def heavy_sync_processing(data):
8 # simulate heavy CPU-bound work
9 time.sleep(1)
10 return f"processed-{data}"
11
12async def api_call(i):
13 await asyncio.sleep(random.random() * 0.5) # simulate network latency
14 return f"data-{i}"
15
16async def worker(i):
17 async with sem:
18 data = await api_call(i)
19 # offload CPU-bound work to threadpool
20 loop = asyncio.get_running_loop()
21 result = await loop.run_in_executor(None, heavy_sync_processing, data)
22 print(result)
23
24async def main():
25 tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26 await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29 asyncio.run(main())- Chúng ta dùng
Semaphoređể giới hạn số lần gọi API đồng thời, còn xử lý nặng dữ liệu sẽ được chuyển sang pool luồng riêng. Tách riêng xử lý mạng và xử lý CPU giúp nâng cao hiệu quả.
Hủy tác vụ và dọn dẹp
Khi tác vụ bị hủy, việc xử lý đúng finally và asyncio.CancelledError là rất quan trọng. Điều này đảm bảo các tệp, kết nối được giải phóng, trạng thái trung gian được xử lý đúng giúp ứng dụng duy trì tính nhất quán.
1import asyncio
2
3async def long_running():
4 try:
5 print("started long_running")
6 await asyncio.sleep(10)
7 print("finished long_running")
8 except asyncio.CancelledError:
9 print("long_running was cancelled, cleaning up")
10 # perform cleanup here
11 raise
12
13async def main():
14 task = asyncio.create_task(long_running())
15 await asyncio.sleep(1)
16 task.cancel()
17 try:
18 await task
19 except asyncio.CancelledError:
20 print("task cancelled in main")
21
22if __name__ == "__main__":
23 asyncio.run(main())- Việc hủy sẽ phát ra ngoại lệ (
CancelledError), vì thế, hãy dọn dẹp trong khốiexceptvà ném lại ngoại lệ nếu cần.
Những điểm chính trong thiết kế thực tiễn
Sau đây là những điểm hữu ích khi thiết kế xử lý bất đồng bộ.
-
Kiểm soát mức độ đồng thời một cách rõ ràng Khi có giới hạn tài nguyên như API hoặc DB, bạn có thể dùng
Semaphoređể giới hạn số lượt thực thi đồng thời. -
Xử lý tài nguyên chia sẻ an toàn Nếu cần cập nhật trạng thái từ nhiều tác vụ, hãy dùng
Lock. Giảm dùng trạng thái chia sẻ và thiết kế dựa trên dữ liệu bất biến giúp an toàn hơn. -
Chọn cách nhận kết quả Nếu bạn muốn xử lý tác vụ ngay khi chúng hoàn thành, dùng
asyncio.as_completed; nếu cần kết quả theo thứ tự, dùnggather. -
Tách rời xử lý đồng bộ nặng Đối với các hàm CPU nặng hoặc đồng bộ, hãy dùng
run_in_executorhoặcProcessPoolExecutorđể không chặn event loop. -
Lên kế hoạch xử lý hủy tác vụ và ngoại lệ Viết mã xử lý ngoại lệ hợp lý để dọn dẹp an toàn dù tác vụ bị hủy giữa chừng.
-
Dễ dàng kiểm thử Trừu tượng hóa các tác động phụ như I/O, thời gian, random để dễ thay thế, giúp kiểm thử mã bất đồng bộ dễ dàng hơn.
Tóm tắt
asyncio rất mạnh mẽ, nhưng nếu chỉ chú ý đến “chạy song song”, bạn có thể gặp phải các vấn đề như tranh chấp tài nguyên, vượt quá giới hạn, hoặc chặn event loop. Bằng cách kết hợp Semaphore, Lock, Event, Queue, run_in_executor cùng xử lý hủy tác vụ đúng cách, bạn có thể thiết kế ứng dụng bất đồng bộ an toàn, hiệu quả. Bằng cách áp dụng các cơ chế như mẫu sản xuất-tiêu thụ, giới hạn đồng thời hoặc tách biệt xử lý bất đồng bộ và chặn, bạn có thể xây dựng luồng công việc bất đồng bộ an toàn, hiệu quả hơn.
Bạn có thể làm theo bài viết trên bằng cách sử dụng Visual Studio Code trên kênh YouTube của chúng tôi. Vui lòng ghé thăm kênh YouTube.