Python 异步处理中的同步控制

Python 异步处理中的同步控制

本文将解释 Python 异步处理中的同步控制。

你将循序渐进地学习,从 asyncio 的基础知识到常用于同步控制的实用模式。

YouTube Video

Python 异步处理中的同步控制

在异步处理中,你可以轻松同时运行多个任务。然而在实践中,需要更高级的控制,如限制并发、协调任务、对共享资源进行排他控制、处理耗时的同步操作,以及在任务取消后执行清理等。

在这里,我们将从 asyncio 的基础开始,逐步学习常用的同步模式。

介绍:基础知识(async / awaitcreate_task

让我们先看一个最小的异步代码示例。await 会等待所调用的协程执行完毕,而 asyncio.create_task 用于调度任务并发执行。

 1import asyncio
 2
 3async def worker(name, delay):
 4    # Simulate async work
 5    await asyncio.sleep(delay)
 6    return f"{name} done after {delay}s"
 7
 8async def main():
 9    # Create two tasks that run concurrently.
10    t1 = asyncio.create_task(worker("A", 1))
11    t2 = asyncio.create_task(worker("B", 2))
12
13    # Await both results (this suspends until both are done).
14    result1 = await t1
15    result2 = await t2
16    print(result1, result2)
17
18if __name__ == "__main__":
19    asyncio.run(main())
  • 这段代码展示了一个典型模式:显式创建任务、并行执行,并最终通过 await 获取结果。create_task 可以实现并发执行。

asyncio.gatherasyncio.waitasyncio.as_completed 的区别

在并发运行多个协程时,你可以根据需要获取结果的方式来选择使用哪一个。gather 会等待所有任务完成并按输入顺序返回结果,而 as_completed 则允许你按完成顺序处理结果,不考虑原始顺序。

 1import asyncio
 2import random
 3
 4async def job(i):
 5    delay = random.random() * 2
 6    await asyncio.sleep(delay)
 7    return (i, delay)
 8
 9async def gather_example():
10    # gather waits for all tasks and returns results in the same order as input
11    results = await asyncio.gather(*(job(i) for i in range(5)))
12    print("gather order:", results)
13
14async def as_completed_example():
15    # as_completed yields results as they finish (useful to process early results)
16    tasks = [asyncio.create_task(job(i)) for i in range(5)]
17    for coro in asyncio.as_completed(tasks):
18        res = await coro
19        print("completed:", res)
20
21async def main():
22    await gather_example()
23    await as_completed_example()
24
25if __name__ == "__main__":
26    asyncio.run(main())
  • 如代码所示,gather 会按照输入顺序返回结果,因此在需要保持顺序时很有用。当你希望结果一完成就立即处理时,可以使用 as_completed

控制并发:使用 asyncio.Semaphore 限制同时执行的任务数量

当存在外部 API 速率限制或数据库连接限制时,可以使用 Semaphore 控制并发执行数量。

 1import asyncio
 2import random
 3
 4sem = asyncio.Semaphore(3)  # allow up to 3 concurrent workers
 5
 6async def limited_worker(i):
 7    async with sem:
 8        # Only 3 tasks can be inside this block concurrently
 9        delay = random.random() * 2
10        await asyncio.sleep(delay)
11        print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14    tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15    await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18    asyncio.run(main())
  • 结合 async with 使用 Semaphore,可以轻松限制同时执行的数量。这在受外部限制的情况下非常有效。

共享资源的排他控制:asyncio.Lock

Lock 用于防止多个任务同时更新共享数据。asyncio.Lock 是用于异步环境的排他锁原语。

 1import asyncio
 2
 3counter = 0
 4lock = asyncio.Lock()
 5
 6async def incrementer(n_times):
 7    global counter
 8    for _ in range(n_times):
 9        # Acquire lock to update shared counter safely
10        async with lock:
11            tmp = counter
12            await asyncio.sleep(0)  # yield control to increase race likelihood
13            counter = tmp + 1
14
15async def main():
16    tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17    await asyncio.gather(*tasks)
18    print("final counter:", counter)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • 如果多个任务同时更新共享变量(如全局 counter),可能会发生冲突。通过使用 Lock 包裹操作,可以保持一致性。

任务协调:asyncio.Event

Event 用于一个任务发出已准备好的信号,其他任务等待该信号。这是任务之间共享信号并同步执行的一种简单方式。

 1import asyncio
 2
 3event = asyncio.Event()
 4
 5async def waiter(name):
 6    print(f"{name} is waiting for event")
 7    await event.wait()
 8    print(f"{name} resumed after event set")
 9
10async def setter():
11    print("setter will sleep and then set the event")
12    await asyncio.sleep(1)
13    event.set()
14    print("event set by setter")
15
16async def main():
17    tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18    await asyncio.create_task(setter())
19    await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22    asyncio.run(main())
  • Event 有一个布尔标志,调用 set() 会唤醒所有等待的任务。它适用于简单的同步场景。

生产者-消费者模式:asyncio.Queue

通过使用 Queue,生产者(产生数据)与消费者(处理数据)可以顺畅且异步地协调。此外,当队列已满时,生产者会自动等待,自然形成背压机制防止过度生产。

 1import asyncio
 2import random
 3
 4async def producer(q, n_items):
 5    for i in range(n_items):
 6        await asyncio.sleep(random.random() * 0.5)
 7        item = f"item-{i}"
 8        await q.put(item)
 9        print("produced", item)
10    # signal consumers to stop
11    await q.put(None)
12
13async def consumer(q, name):
14    while True:
15        item = await q.get()
16        if item is None:
17            # put sentinel back for other consumers and break
18            await q.put(None)
19            break
20        await asyncio.sleep(random.random() * 1)
21        print(name, "consumed", item)
22        q.task_done()
23
24async def main():
25    q = asyncio.Queue(maxsize=5)  # bounded queue to apply backpressure
26    prod = asyncio.create_task(producer(q, 10))
27    cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28    await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31    asyncio.run(main())
  • Queue 有助于异步协调生产者与消费者。另外,通过设置 maxsize,当队列满时,生产者在执行 put 时会等待,从而防止过度生产。

处理同步阻塞操作:run_in_executor

对于 CPU 密集型处理或不支持异步的库,可以使用 run_in_executor 将处理委派给其他线程或进程。这样可以防止主事件循环被阻塞,使其他异步任务能够顺利运行。

 1import asyncio
 2import time
 3
 4def blocking_io(x):
 5    # simulate blocking I/O or CPU-bound work
 6    time.sleep(2)
 7    return x * x
 8
 9async def main():
10    loop = asyncio.get_running_loop()
11    # run blocking_io in default thread pool
12    result = await loop.run_in_executor(None, blocking_io, 3)
13    print("blocking result:", result)
14
15if __name__ == "__main__":
16    asyncio.run(main())
  • 直接调用同步函数会阻塞事件循环。通过 run_in_executor,代码会在独立线程中运行,异步任务可以继续并发执行。

示例:受速率限制的 API 调用(结合 Semaphore + run_in_executor)

以下示例展示了 API 调用受速率限制并对结果进行重处理的场景。结合使用 Semaphorerun_in_executor 可以实现安全且高效的处理流程。

 1import asyncio
 2import time
 3import random
 4
 5sem = asyncio.Semaphore(5)
 6
 7def heavy_sync_processing(data):
 8    # simulate heavy CPU-bound work
 9    time.sleep(1)
10    return f"processed-{data}"
11
12async def api_call(i):
13    await asyncio.sleep(random.random() * 0.5)  # simulate network latency
14    return f"data-{i}"
15
16async def worker(i):
17    async with sem:
18        data = await api_call(i)
19        # offload CPU-bound work to threadpool
20        loop = asyncio.get_running_loop()
21        result = await loop.run_in_executor(None, heavy_sync_processing, data)
22        print(result)
23
24async def main():
25    tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26    await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29    asyncio.run(main())
  • 我们使用 Semaphore 限制 API 调用的并发数量,并将结果数据的重处理委派给线程池。将网络处理与 CPU 处理分离能够提高效率。

任务取消与清理

当任务被取消时,正确处理 finallyasyncio.CancelledError 非常重要。这样可以确保文件与连接被释放,中间状态得到妥善处理,从而保持应用程序一致性。

 1import asyncio
 2
 3async def long_running():
 4    try:
 5        print("started long_running")
 6        await asyncio.sleep(10)
 7        print("finished long_running")
 8    except asyncio.CancelledError:
 9        print("long_running was cancelled, cleaning up")
10        # perform cleanup here
11        raise
12
13async def main():
14    task = asyncio.create_task(long_running())
15    await asyncio.sleep(1)
16    task.cancel()
17    try:
18        await task
19    except asyncio.CancelledError:
20        print("task cancelled in main")
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • 任务取消会以异常(CancelledError)的形式传递,因此应在 except 中执行必要的清理,并在需要时重新抛出异常。

实用设计要点

以下是设计异步处理时有用的实践要点。

  • 显式控制并发 当 API 或数据库等资源有限时,可以使用 Semaphore 限制并发执行数量。

  • 安全处理共享资源 如果需要从多个任务更新状态,请使用 Lock。减少共享状态并以不可变数据为中心设计会更加安全。

  • 选择如何获取结果 如果希望任务完成即处理,使用 asyncio.as_completed;如果希望按输入顺序处理结果,使用 gather

  • 隔离沉重的同步处理 对于 CPU 密集型任务或同步库调用,使用 run_in_executorProcessPoolExecutor 避免阻塞事件循环。

  • 为取消与异常做好规划 编写适当的异常处理以确保任务被取消时也能安全清理资源。

  • 使测试更容易 将 I/O、时间、随机性等副作用抽象化以便替换,从而更容易测试异步代码。

总结

asyncio 功能强大,但如果只关注“并行运行任务”,可能会遇到共享资源争用、资源限制冲突或事件循环阻塞等问题。通过结合使用 SemaphoreLockEventQueuerun_in_executor 以及适当的取消处理,可以设计出安全且高效的异步应用。通过利用生产者-消费者模式、并发限制、异步与阻塞处理分离等机制,可以更安全高效地构建异步工作流。

您可以在我们的YouTube频道上使用Visual Studio Code跟随上述文章进行学习。 请也查看我们的YouTube频道。

YouTube Video