Kiểm soát đồng bộ trong xử lý bất đồng bộ Python

Kiểm soát đồng bộ trong xử lý bất đồng bộ Python

Bài viết này giải thích cách kiểm soát đồng bộ trong xử lý bất đồng bộ Python.

Bạn sẽ học từng bước, từ kiến thức cơ bản về asyncio đến các mẫu thực tế thường được sử dụng để kiểm soát đồng bộ hóa.

YouTube Video

Kiểm soát đồng bộ trong xử lý bất đồng bộ Python

Trong xử lý bất đồng bộ, bạn có thể dễ dàng chạy đồng thời nhiều tác vụ. Tuy nhiên, trong thực tế, cần có các điều chỉnh nâng cao hơn như kiểm soát số lượng đồng thời, phối hợp tác vụ, kiểm soát tài nguyên chia sẻ, xử lý các tiến trình đồng bộ nặng và dọn dẹp sau khi hủy tác vụ.

Tại đây, chúng ta sẽ học từng bước từ những kiến thức cơ bản về asyncio đến những mẫu thực tiễn thường dùng để đồng bộ hóa.

Giới thiệu: Cơ bản (async / awaitcreate_task)

Hãy cùng xem trước một số đoạn mã bất đồng bộ đơn giản. await sẽ dừng tại đó cho đến khi coroutine được gọi hoàn thành, còn asyncio.create_task sẽ tạo một nhiệm vụ để thực thi đồng thời.

 1import asyncio
 2
 3async def worker(name, delay):
 4    # Simulate async work
 5    await asyncio.sleep(delay)
 6    return f"{name} done after {delay}s"
 7
 8async def main():
 9    # Create two tasks that run concurrently.
10    t1 = asyncio.create_task(worker("A", 1))
11    t2 = asyncio.create_task(worker("B", 2))
12
13    # Await both results (this suspends until both are done).
14    result1 = await t1
15    result2 = await t2
16    print(result1, result2)
17
18if __name__ == "__main__":
19    asyncio.run(main())
  • Đây là mẫu mã phổ biến khi các tác vụ được tạo một cách rõ ràng, chạy song song, và nhận kết quả sau cùng bằng await. create_task cho phép thực thi đồng thời các tác vụ.

Sự khác biệt giữa asyncio.gather, asyncio.wait, và asyncio.as_completed

Khi chạy nhiều coroutine đồng thời, bạn sẽ chọn phương thức tùy thuộc vào cách muốn lấy kết quả. gather đợi tất cả hoàn thành và trả kết quả theo thứ tự đầu vào, còn as_completed cho phép xử lý kết quả khi chúng xong, bất kể thứ tự.

 1import asyncio
 2import random
 3
 4async def job(i):
 5    delay = random.random() * 2
 6    await asyncio.sleep(delay)
 7    return (i, delay)
 8
 9async def gather_example():
10    # gather waits for all tasks and returns results in the same order as input
11    results = await asyncio.gather(*(job(i) for i in range(5)))
12    print("gather order:", results)
13
14async def as_completed_example():
15    # as_completed yields results as they finish (useful to process early results)
16    tasks = [asyncio.create_task(job(i)) for i in range(5)]
17    for coro in asyncio.as_completed(tasks):
18        res = await coro
19        print("completed:", res)
20
21async def main():
22    await gather_example()
23    await as_completed_example()
24
25if __name__ == "__main__":
26    asyncio.run(main())
  • Như trong đoạn mã này, gather trả kết quả theo thứ tự đầu vào nên hữu ích khi bạn cần giữ thứ tự. as_completed dùng khi bạn muốn xử lý kết quả ngay khi xong.

Kiểm soát đồng thời: Giới hạn số lần thực thi đồng thời với asyncio.Semaphore

Khi có giới hạn tốc độ API bên ngoài hoặc giới hạn kết nối cơ sở dữ liệu, bạn có thể kiểm soát số lần thực thi đồng thời với Semaphore.

 1import asyncio
 2import random
 3
 4sem = asyncio.Semaphore(3)  # allow up to 3 concurrent workers
 5
 6async def limited_worker(i):
 7    async with sem:
 8        # Only 3 tasks can be inside this block concurrently
 9        delay = random.random() * 2
10        await asyncio.sleep(delay)
11        print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14    tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15    await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18    asyncio.run(main())
  • Bằng cách sử dụng Semaphore với async with, bạn có thể dễ dàng giới hạn số lần thực thi đồng thời. Cách này rất hiệu quả trong các trường hợp bị giới hạn ngoài.

Kiểm soát độc quyền tài nguyên chia sẻ: asyncio.Lock

Lock dùng để ngăn việc cập nhật đồng thời lên dữ liệu chia sẻ. asyncio.Lock là công cụ kiểm soát truy cập riêng biệt cho các tác vụ bất đồng bộ.

 1import asyncio
 2
 3counter = 0
 4lock = asyncio.Lock()
 5
 6async def incrementer(n_times):
 7    global counter
 8    for _ in range(n_times):
 9        # Acquire lock to update shared counter safely
10        async with lock:
11            tmp = counter
12            await asyncio.sleep(0)  # yield control to increase race likelihood
13            counter = tmp + 1
14
15async def main():
16    tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17    await asyncio.gather(*tasks)
18    print("final counter:", counter)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Nếu nhiều tác vụ cùng cập nhật một biến chia sẻ như counter toàn cục, có thể phát sinh xung đột. Khi bao bọc các thao tác bằng Lock, bạn có thể duy trì tính nhất quán.

Phối hợp tác vụ: asyncio.Event

Event dùng khi một tác vụ phát tín hiệu sẵn sàng còn các tác vụ khác chờ tín hiệu này. Đây là cách đơn giản để các tác vụ chia sẻ tín hiệu và đồng bộ với nhau.

 1import asyncio
 2
 3event = asyncio.Event()
 4
 5async def waiter(name):
 6    print(f"{name} is waiting for event")
 7    await event.wait()
 8    print(f"{name} resumed after event set")
 9
10async def setter():
11    print("setter will sleep and then set the event")
12    await asyncio.sleep(1)
13    event.set()
14    print("event set by setter")
15
16async def main():
17    tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18    await asyncio.create_task(setter())
19    await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22    asyncio.run(main())
  • Event có cờ boolean, khi gọi set() sẽ tiếp tục tất cả các tác vụ đang chờ. Rất hữu ích cho việc đồng bộ đơn giản.

Mẫu sản xuất-tiêu thụ: asyncio.Queue

Sử dụng Queue, nhà sản xuất (tạo dữ liệu) và nhà tiêu thụ (xử lý dữ liệu) có thể phối hợp mượt mà, bất đồng bộ. Khi hàng đợi đầy, nhà sản xuất sẽ tự động chờ, giúp kiểm soát quá trình sản xuất tự nhiên và tránh dư thừa.

 1import asyncio
 2import random
 3
 4async def producer(q, n_items):
 5    for i in range(n_items):
 6        await asyncio.sleep(random.random() * 0.5)
 7        item = f"item-{i}"
 8        await q.put(item)
 9        print("produced", item)
10    # signal consumers to stop
11    await q.put(None)
12
13async def consumer(q, name):
14    while True:
15        item = await q.get()
16        if item is None:
17            # put sentinel back for other consumers and break
18            await q.put(None)
19            break
20        await asyncio.sleep(random.random() * 1)
21        print(name, "consumed", item)
22        q.task_done()
23
24async def main():
25    q = asyncio.Queue(maxsize=5)  # bounded queue to apply backpressure
26    prod = asyncio.create_task(producer(q, 10))
27    cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28    await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31    asyncio.run(main())
  • Queue giúp phối hợp bất đồng bộ giữa nhà sản xuất và nhà tiêu thụ. Thêm nữa, đặt maxsize khiến nhà sản xuất chờ khi hàng đợi đầy, tránh dư thừa.

Xử lý các hoạt động đồng bộ gây chặn: run_in_executor

Đối với xử lý nặng CPU hoặc khi sử dụng thư viện không hỗ trợ async, hãy dùng run_in_executor để chuyển xử lý sang luồng hoặc tiến trình khác. Làm như vậy, vòng lặp sự kiện chính không bị dừng lại, cho phép các tác vụ bất đồng bộ khác chạy mượt mà.

 1import asyncio
 2import time
 3
 4def blocking_io(x):
 5    # simulate blocking I/O or CPU-bound work
 6    time.sleep(2)
 7    return x * x
 8
 9async def main():
10    loop = asyncio.get_running_loop()
11    # run blocking_io in default thread pool
12    result = await loop.run_in_executor(None, blocking_io, 3)
13    print("blocking result:", result)
14
15if __name__ == "__main__":
16    asyncio.run(main())
  • Gọi trực tiếp hàm đồng bộ sẽ khiến vòng lặp sự kiện bị chặn. Với run_in_executor, mã được chạy ở luồng riêng và các tác vụ async vẫn tiếp tục chạy song song.

Ví dụ: Gọi API bị giới hạn tốc độ (kết hợp Semaphore + run_in_executor)

Dưới đây là tình huống mẫu, khi việc gọi API bị giới hạn và cần xử lý nặng đối với kết quả. Kết hợp Semaphorerun_in_executor giúp việc xử lý diễn ra an toàn, hiệu quả.

 1import asyncio
 2import time
 3import random
 4
 5sem = asyncio.Semaphore(5)
 6
 7def heavy_sync_processing(data):
 8    # simulate heavy CPU-bound work
 9    time.sleep(1)
10    return f"processed-{data}"
11
12async def api_call(i):
13    await asyncio.sleep(random.random() * 0.5)  # simulate network latency
14    return f"data-{i}"
15
16async def worker(i):
17    async with sem:
18        data = await api_call(i)
19        # offload CPU-bound work to threadpool
20        loop = asyncio.get_running_loop()
21        result = await loop.run_in_executor(None, heavy_sync_processing, data)
22        print(result)
23
24async def main():
25    tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26    await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29    asyncio.run(main())
  • Chúng ta dùng Semaphore để giới hạn số lần gọi API đồng thời, còn xử lý nặng dữ liệu sẽ được chuyển sang pool luồng riêng. Tách riêng xử lý mạng và xử lý CPU giúp nâng cao hiệu quả.

Hủy tác vụ và dọn dẹp

Khi tác vụ bị hủy, việc xử lý đúng finallyasyncio.CancelledError là rất quan trọng. Điều này đảm bảo các tệp, kết nối được giải phóng, trạng thái trung gian được xử lý đúng giúp ứng dụng duy trì tính nhất quán.

 1import asyncio
 2
 3async def long_running():
 4    try:
 5        print("started long_running")
 6        await asyncio.sleep(10)
 7        print("finished long_running")
 8    except asyncio.CancelledError:
 9        print("long_running was cancelled, cleaning up")
10        # perform cleanup here
11        raise
12
13async def main():
14    task = asyncio.create_task(long_running())
15    await asyncio.sleep(1)
16    task.cancel()
17    try:
18        await task
19    except asyncio.CancelledError:
20        print("task cancelled in main")
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • Việc hủy sẽ phát ra ngoại lệ (CancelledError), vì thế, hãy dọn dẹp trong khối except và ném lại ngoại lệ nếu cần.

Những điểm chính trong thiết kế thực tiễn

Sau đây là những điểm hữu ích khi thiết kế xử lý bất đồng bộ.

  • Kiểm soát mức độ đồng thời một cách rõ ràng Khi có giới hạn tài nguyên như API hoặc DB, bạn có thể dùng Semaphore để giới hạn số lượt thực thi đồng thời.

  • Xử lý tài nguyên chia sẻ an toàn Nếu cần cập nhật trạng thái từ nhiều tác vụ, hãy dùng Lock. Giảm dùng trạng thái chia sẻ và thiết kế dựa trên dữ liệu bất biến giúp an toàn hơn.

  • Chọn cách nhận kết quả Nếu bạn muốn xử lý tác vụ ngay khi chúng hoàn thành, dùng asyncio.as_completed; nếu cần kết quả theo thứ tự, dùng gather.

  • Tách rời xử lý đồng bộ nặng Đối với các hàm CPU nặng hoặc đồng bộ, hãy dùng run_in_executor hoặc ProcessPoolExecutor để không chặn event loop.

  • Lên kế hoạch xử lý hủy tác vụ và ngoại lệ Viết mã xử lý ngoại lệ hợp lý để dọn dẹp an toàn dù tác vụ bị hủy giữa chừng.

  • Dễ dàng kiểm thử Trừu tượng hóa các tác động phụ như I/O, thời gian, random để dễ thay thế, giúp kiểm thử mã bất đồng bộ dễ dàng hơn.

Tóm tắt

asyncio rất mạnh mẽ, nhưng nếu chỉ chú ý đến “chạy song song”, bạn có thể gặp phải các vấn đề như tranh chấp tài nguyên, vượt quá giới hạn, hoặc chặn event loop. Bằng cách kết hợp Semaphore, Lock, Event, Queue, run_in_executor cùng xử lý hủy tác vụ đúng cách, bạn có thể thiết kế ứng dụng bất đồng bộ an toàn, hiệu quả. Bằng cách áp dụng các cơ chế như mẫu sản xuất-tiêu thụ, giới hạn đồng thời hoặc tách biệt xử lý bất đồng bộ và chặn, bạn có thể xây dựng luồng công việc bất đồng bộ an toàn, hiệu quả hơn.

Bạn có thể làm theo bài viết trên bằng cách sử dụng Visual Studio Code trên kênh YouTube của chúng tôi. Vui lòng ghé thăm kênh YouTube.

YouTube Video