Python 非同步處理中的同步控制

Python 非同步處理中的同步控制

本文說明了在 Python 非同步處理中的同步控制方法。

你將一步步學習,從 asyncio 的基礎到常用於同步控制的實用模式。

YouTube Video

Python 非同步處理中的同步控制

在非同步處理中,可以輕鬆地同時執行多個任務。然而,在實際應用中,還需要更進階的調整,例如控制並發數量、協調任務、對共用資源進行排他控制、處理重量級同步操作,以及在取消後進行清理等。

在這裡,我們將從 asyncio 的基礎開始,逐步學習常見的同步實用範式。

導論:基礎用法(asyncawaitcreate_task

首先來看看一些最基本的非同步程式碼。await 會在此處等待直至被調用的協程完成,而 asyncio.create_task 用於排程協程以並發執行。

 1import asyncio
 2
 3async def worker(name, delay):
 4    # Simulate async work
 5    await asyncio.sleep(delay)
 6    return f"{name} done after {delay}s"
 7
 8async def main():
 9    # Create two tasks that run concurrently.
10    t1 = asyncio.create_task(worker("A", 1))
11    t2 = asyncio.create_task(worker("B", 2))
12
13    # Await both results (this suspends until both are done).
14    result1 = await t1
15    result2 = await t2
16    print(result1, result2)
17
18if __name__ == "__main__":
19    asyncio.run(main())
  • 這段程式碼是典型的模式:顯式建立任務、平行執行,並在最後以 await 取得所有結果。create_task 能夠讓任務進行並發執行。

asyncio.gatherasyncio.waitasyncio.as_completed 的差異

當要同時運行多個協程時,可以根據想獲取結果的方式選擇不同方法。gather 會等待所有任務完成並按照輸入順序返回結果,而 as_completed 則能在各個任務先後完成時即時處理結果,無需顧慮順序。

 1import asyncio
 2import random
 3
 4async def job(i):
 5    delay = random.random() * 2
 6    await asyncio.sleep(delay)
 7    return (i, delay)
 8
 9async def gather_example():
10    # gather waits for all tasks and returns results in the same order as input
11    results = await asyncio.gather(*(job(i) for i in range(5)))
12    print("gather order:", results)
13
14async def as_completed_example():
15    # as_completed yields results as they finish (useful to process early results)
16    tasks = [asyncio.create_task(job(i)) for i in range(5)]
17    for coro in asyncio.as_completed(tasks):
18        res = await coro
19        print("completed:", res)
20
21async def main():
22    await gather_example()
23    await as_completed_example()
24
25if __name__ == "__main__":
26    asyncio.run(main())
  • 如這段程式碼所示,gather 會依據輸入順序返回結果,當需要保留順序時很有用。as_completed 適合於希望任務只要完成就立即處理其結果的情境。

控制並發數量:利用 asyncio.Semaphore 限制同時執行數

當有外部 API 的速率限制或資料庫連線限制時,可利用 Semaphore 控制並發執行數。

 1import asyncio
 2import random
 3
 4sem = asyncio.Semaphore(3)  # allow up to 3 concurrent workers
 5
 6async def limited_worker(i):
 7    async with sem:
 8        # Only 3 tasks can be inside this block concurrently
 9        delay = random.random() * 2
10        await asyncio.sleep(delay)
11        print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14    tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15    await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18    asyncio.run(main())
  • 透過將 Semaphoreasync with 搭配使用,可以輕鬆設置同時執行數量的上限。這種方式在有外部限制的情境下特別有效。

共用資源的排他控制:asyncio.Lock

Lock 用於防止多個任務同時更新共用數據。asyncio.Lock 是供非同步使用的排他性原語。

 1import asyncio
 2
 3counter = 0
 4lock = asyncio.Lock()
 5
 6async def incrementer(n_times):
 7    global counter
 8    for _ in range(n_times):
 9        # Acquire lock to update shared counter safely
10        async with lock:
11            tmp = counter
12            await asyncio.sleep(0)  # yield control to increase race likelihood
13            counter = tmp + 1
14
15async def main():
16    tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17    await asyncio.gather(*tasks)
18    print("final counter:", counter)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • 如果多個任務同時更新像全域變數 counter 這類共用變數,可能會發生衝突。將操作包在 Lock 內,即可維持資料一致性。

任務協調:asyncio.Event

Event 適用於一個任務發出已就緒信號,其他任務則等待該信號的場合。這是一種簡單的方式,讓多個任務共享訊號並相互同步。

 1import asyncio
 2
 3event = asyncio.Event()
 4
 5async def waiter(name):
 6    print(f"{name} is waiting for event")
 7    await event.wait()
 8    print(f"{name} resumed after event set")
 9
10async def setter():
11    print("setter will sleep and then set the event")
12    await asyncio.sleep(1)
13    event.set()
14    print("event set by setter")
15
16async def main():
17    tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18    await asyncio.create_task(setter())
19    await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22    asyncio.run(main())
  • Event 擁有布林旗標,透過呼叫 set(),可讓所有等待中的任務恢復執行。適用於簡單的同步場景。

生產者-消費者模式:asyncio.Queue

透過 Queue,生產者(產生數據者)與消費者(處理數據者)能平順且非同步地協作。另外,當隊列已滿時,生產者會自動等待,從而自然實現阻塞壓力,防止過度生產。

 1import asyncio
 2import random
 3
 4async def producer(q, n_items):
 5    for i in range(n_items):
 6        await asyncio.sleep(random.random() * 0.5)
 7        item = f"item-{i}"
 8        await q.put(item)
 9        print("produced", item)
10    # signal consumers to stop
11    await q.put(None)
12
13async def consumer(q, name):
14    while True:
15        item = await q.get()
16        if item is None:
17            # put sentinel back for other consumers and break
18            await q.put(None)
19            break
20        await asyncio.sleep(random.random() * 1)
21        print(name, "consumed", item)
22        q.task_done()
23
24async def main():
25    q = asyncio.Queue(maxsize=5)  # bounded queue to apply backpressure
26    prod = asyncio.create_task(producer(q, 10))
27    cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28    await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31    asyncio.run(main())
  • Queue 協助生產者與消費者進行非同步協調。另外,為隊列設定 maxsize,若已滿則生產者在執行 put 時會自動等待,防止過度生產。

處理同步阻塞操作:run_in_executor

針對 CPU 密集型運算,或必須使用不支援 async 的函式庫時,可以利用 run_in_executor 將處理委派至其他執行緒或程序。這麼做可避免主事件迴圈被阻塞,允許其他非同步任務順利運作。

 1import asyncio
 2import time
 3
 4def blocking_io(x):
 5    # simulate blocking I/O or CPU-bound work
 6    time.sleep(2)
 7    return x * x
 8
 9async def main():
10    loop = asyncio.get_running_loop()
11    # run blocking_io in default thread pool
12    result = await loop.run_in_executor(None, blocking_io, 3)
13    print("blocking result:", result)
14
15if __name__ == "__main__":
16    asyncio.run(main())
  • 如果直接呼叫同步函式會導致事件迴圈被阻塞。透過 run_in_executor,程式碼會在獨立執行緒執行,非同步任務也能持續並行運作。

實例:API 呼叫頻率限制(結合 Semaphore 與 run_in_executor)

以下是一個 API 呼叫有頻率限制、並且會對結果進行重量級處理的範例場景。利用 Semaphore 結合 run_in_executor,能安全且有效地執行這類處理。

 1import asyncio
 2import time
 3import random
 4
 5sem = asyncio.Semaphore(5)
 6
 7def heavy_sync_processing(data):
 8    # simulate heavy CPU-bound work
 9    time.sleep(1)
10    return f"processed-{data}"
11
12async def api_call(i):
13    await asyncio.sleep(random.random() * 0.5)  # simulate network latency
14    return f"data-{i}"
15
16async def worker(i):
17    async with sem:
18        data = await api_call(i)
19        # offload CPU-bound work to threadpool
20        loop = asyncio.get_running_loop()
21        result = await loop.run_in_executor(None, heavy_sync_processing, data)
22        print(result)
23
24async def main():
25    tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26    await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29    asyncio.run(main())
  • 我們用 Semaphore 限制 API 同時呼叫數量,再將重度運算任務交給執行緒池處理。將網絡存取和 CPU 運算分離,能顯著提升效率。

任務取消與清理

任務被取消時,正確處理 finallyasyncio.CancelledError 是非常重要的。這能確保檔案、連線獲得釋放,中間狀態被妥善處理,維持應用程式一致性。

 1import asyncio
 2
 3async def long_running():
 4    try:
 5        print("started long_running")
 6        await asyncio.sleep(10)
 7        print("finished long_running")
 8    except asyncio.CancelledError:
 9        print("long_running was cancelled, cleaning up")
10        # perform cleanup here
11        raise
12
13async def main():
14    task = asyncio.create_task(long_running())
15    await asyncio.sleep(1)
16    task.cancel()
17    try:
18        await task
19    except asyncio.CancelledError:
20        print("task cancelled in main")
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • 取消會以例外(CancelledError)傳遞,因此請在 except 區塊內執行必要的清理,並在需要時重新拋出例外。

實務設計重點

以下是設計非同步處理時有用的實用重點。

  • 明確控制並發數 當有 API 或資料庫等資源限制時,可以用 Semaphore 控制同時執行的數量。

  • 安全管理共用資源 當多個任務需同時更新狀態時,請使用 Lock。減少共用狀態並以不可變資料為主的設計,更能提升安全性。

  • 選擇接收結果的方法 如需在任務完成時即時處理,用 asyncio.as_completed;如要依照輸入順序處理,則用 gather

  • 隔離重量級同步處理 針對 CPU 密集或同步函式庫呼叫,請用 run_in_executorProcessPoolExecutor 避免事件迴圈被阻塞。

  • 提前規劃取消與例外處理 正確編寫例外處理,確保即使任務中途被取消也能安全釋放資源。

  • 讓測試更容易 將 I/O、時間、隨機等副作用抽象化,方便替換與測試非同步程式碼。

總結

asyncio 雖然功能強大,但如果只關心「並行執行」,容易遇到共用資源競爭、突破資源上限或事件迴圈阻塞等問題。善用 SemaphoreLockEventQueuerun_in_executor 並妥善處理取消,即可設計出安全且高效的非同步應用。透過運用生產者-消費者模式、並發限制、分離非同步與阻塞處理等機制,能更安全有效地構建非同步工作流。

您可以在我們的 YouTube 頻道上使用 Visual Studio Code 來跟隨上述文章一起學習。 請也查看我們的 YouTube 頻道。

YouTube Video