异步输入/输出

异步输入/输出

本文将解释异步输入/输出。

本指南将以循序渐进的方式,详细讲解在 Python 中实际有用的异步输入/输出的概念和模式。

YouTube Video

异步输入/输出(I/O)

异步I/O的概念

异步I/O是一种机制,可以在等待耗时I/O(如文件操作或网络通信)期间让其他操作并行运行。在Python中,asyncio被作为标准异步框架提供,许多库也是基于这一机制设计的。

基础:async / await 与事件循环

首先,介绍一下如何编写基础协程,以及如何使用asyncio.gather同时运行多个协程的例子。

下面的代码是一个同时定义并运行异步函数的最小示例。sleep函数用于演示并行执行。

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • 该代码通过asyncio.run()启动事件循环,并同时运行三个协程。

async with与异步上下文管理器

在异步处理中,资源管理(如打开连接、关闭文件)往往比较复杂。这时,利用async with的异步上下文管理器就变得很有用。这种用法与同步的with语句类似,但内部处理为异步,因此能自然融入async/await流程。

使用async with主要有两个原因:。

  • 可以可靠地清理资源,例如连接、文件句柄和会话等。即使发生异常终止,也能确保资源被正确释放。
  • 能够以异步方式自动执行初始化和清理操作,比如建立或关闭连接、刷新等。这样可以避免手动编写琐碎代码,使代码更清晰。

下面是从零实现一个简单异步上下文管理器的示例。

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • 通过定义__aenter____aexit__方法,就可以使用async with
  • 在进入与退出async with代码块时,相关处理会以异步且安全的方式执行。

异步文件I/O(aiofiles

文件操作是典型的阻塞性操作。使用aiofiles可以安全地以异步方式执行文件操作。它内部使用线程池,并通过async with确保文件被正确关闭。

下面的例子演示如何异步并行读取多个文件。运行此代码前,你需要通过pip install aiofiles安装aiofiles

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • 该代码会并行读取每个文件。aiofiles内部常用线程池,使你能通过异步接口处理阻塞的文件I/O操作。

异步HTTP客户端(aiohttp

HTTP请求是网络I/O的经典例子,以下介绍如何异步执行HTTP请求。在需要大量并发HTTP请求时,它尤其强大。

下面是用aiohttp并行抓取多个URL的示例。你需要使用pip install aiohttp来安装aiohttp

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • 通过asyncio.as_completed可以按任务完成的顺序处理结果。这对高效处理大量请求非常有用。

与阻塞I/O共存:run_in_executor

在异步代码中处理 CPU 密集型任务或已有的阻塞 API 时,可以通过 loop.run_in_executor 使用 ThreadPoolExecutorProcessPoolExecutor

下面的代码是一个使用线程池并发运行假设为阻塞 I/O 任务的示例。

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • 通过使用 run_in_executor,可以在无需大量重写的情况下,将现有的同步代码整合进异步流程中。但要注意线程数量与CPU负载。
  • ProcessPoolExecutor 适用于 CPU 密集型任务。

异步服务器:基于asyncio的TCP回显服务器

如需直接操作socket,可用asyncio.start_server轻松构建异步服务器。

下面是一个简单的回显服务器,它会将来自客户端的数据原样返回。

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • 在使用asyncio进行TCP通信时,StreamReaderStreamWriter在异步输入输出中起着核心作用。StreamReader可异步读取客户端发送的数据,StreamWriter则用于将服务器的响应写回客户端。

  • 即使不需要自己处理套接字的详细操作,你也可以使用 asyncio.start_server 简单且高效地启动一个异步服务器。

  • 当你向 asyncio.start_server 传递处理函数时,该函数会接收 readerwriter 作为参数。使用这些对象,你可以比直接处理底层套接字 API 更安全、更清晰地实现通信流程。例如,通过使用 reader.read() 接收数据,并结合使用 writer.write()writer.drain(),你可以实现确保传输完成的异步发送。

  • 该结构适合处理大量并发连接,适用于简单协议或小规模TCP服务。

处理大规模流式数据

顺序处理大文件或响应时,应分块读写数据以降低内存消耗。下面是利用aiohttp进行流式读取的示例。

下例将HTTP响应以块为单位处理,并在接收数据时写入磁盘。

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • 此代码不会一次性加载整个大文件;相反,它会分块(小片段)接收数据,并异步写入文件。因此,它能够在保持较低内存占用的同时,实现快速且高效的下载。aiohttp 异步获取数据,而 aiofiles 在不阻塞的情况下写入文件,使其能够轻松与其他任务并行运行。

  • 该模式适合以最小内存高效下载和保存大文件。

异步执行子进程

如果要异步运行外部命令,并实时读取输出,asyncio.create_subprocess_exec会很有用。

下面演示如何启动外部命令并实时读取其标准输出。

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • 通过异步控制子进程,可实时处理外部工具日志,或并行运行多个进程。

处理取消与超时

异步任务可以被取消。实现超时时,使用asyncio.wait_for很简单。

下面是带超时执行任务的示例代码。

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • wait_for在超时时会抛出TimeoutError,必要时取消任务。要注意任务取消的传递与清理。

控制并发(Semaphore

由于大量并发连接或请求可能耗尽资源,建议通过asyncio.Semaphore限制并发数。

下面是用信号量限制并发下载的实例。

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • 这种方式可以温和访问外部服务,同时避免自身进程过载。

异常处理与重试策略

即使在异步处理中也难免发生错误。要适当捕捉异常,并实现如指数退避等重试策略。

下面是一个简单的最多重试N次的实现示例。

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • 合理的重试逻辑有助于平衡一致性与流量控制。

调试与日志记录技巧

异步处理中任务并发进行,导致问题定位变得困难。记住以下要点,有助于高效追踪问题、提升调试效率。

  • asyncio.run()Task中未被捕获的异常容易被忽略,请确保记录所有未处理异常。
  • 使用 logging 时,在日志中包含协程名称或(在 Python 3.8 及以上版本中)task.get_name() 可以让追踪更容易。
  • 你可以使用 asyncio.Task.all_tasks() 查看当前任务的状态。然而,该 API 主要用于调试目的,在生产环境中应谨慎使用,以避免性能问题或意外干扰。

性能注意事项

异步编程擅长处理I/O等待,但滥用时反而可能导致性能下降。优化时请注意以下几点:。

  • 异步处理适合I/O密集型任务,不适合CPU密集型任务,后者请用进程池。
  • 使用线程池或进程池时,请考虑池大小及任务性质。
  • 若一次启动大量小任务,事件循环开销会增大——可使用批处理或信号量调节。

总结

Python的异步I/O是一种高效机制,能充分利用I/O等待时间并并发执行网络与文件操作。结合asyncioaiohttpaiofilesrun_in_executor等技术,可以灵活地构建实用的异步应用。利用async with自动管理资源的获取与释放,可以安全、可靠地管理文件、HTTP会话、锁等异步资源。通过妥善的异常处理与并发控制,可以安全运行高可靠性的异步程序。

您可以在我们的YouTube频道上使用Visual Studio Code跟随上述文章进行学习。 请也查看我们的YouTube频道。

YouTube Video