异步输入/输出
本文将解释异步输入/输出。
本指南将以循序渐进的方式,详细讲解在 Python 中实际有用的异步输入/输出的概念和模式。
YouTube Video
异步输入/输出(I/O)
异步I/O的概念
异步I/O是一种机制,可以在等待耗时I/O(如文件操作或网络通信)期间让其他操作并行运行。在Python中,asyncio被作为标准异步框架提供,许多库也是基于这一机制设计的。
基础:async / await 与事件循环
首先,介绍一下如何编写基础协程,以及如何使用asyncio.gather同时运行多个协程的例子。
下面的代码是一个同时定义并运行异步函数的最小示例。sleep函数用于演示并行执行。
1import asyncio
2
3async def worker(name: str, delay: float):
4 # Simulate I/O-bound work by sleeping
5 print(f"{name} started")
6 await asyncio.sleep(delay)
7 print(f"{name} finished after {delay}s")
8 return name, delay
9
10async def main():
11 # Run multiple coroutines concurrently
12 tasks = [
13 worker("A", 1.5),
14 worker("B", 1.0),
15 worker("C", 0.5),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- 该代码通过
asyncio.run()启动事件循环,并同时运行三个协程。
async with与异步上下文管理器
在异步处理中,资源管理(如打开连接、关闭文件)往往比较复杂。这时,利用async with的异步上下文管理器就变得很有用。这种用法与同步的with语句类似,但内部处理为异步,因此能自然融入async/await流程。
使用async with主要有两个原因:。
- 可以可靠地清理资源,例如连接、文件句柄和会话等。即使发生异常终止,也能确保资源被正确释放。
- 能够以异步方式自动执行初始化和清理操作,比如建立或关闭连接、刷新等。这样可以避免手动编写琐碎代码,使代码更清晰。
下面是从零实现一个简单异步上下文管理器的示例。
1import asyncio
2
3class AsyncResource:
4 async def __aenter__(self):
5 print("Opening resource...")
6 await asyncio.sleep(0.5)
7 print("Resource opened")
8 return self
9
10 async def __aexit__(self, exc_type, exc, tb):
11 print("Closing resource...")
12 await asyncio.sleep(0.5)
13 print("Resource closed")
14
15async def main():
16 async with AsyncResource() as r:
17 print("Using resource...")
18
19if __name__ == "__main__":
20 asyncio.run(main())- 通过定义
__aenter__和__aexit__方法,就可以使用async with。 - 在进入与退出
async with代码块时,相关处理会以异步且安全的方式执行。
异步文件I/O(aiofiles)
文件操作是典型的阻塞性操作。使用aiofiles可以安全地以异步方式执行文件操作。它内部使用线程池,并通过async with确保文件被正确关闭。
下面的例子演示如何异步并行读取多个文件。运行此代码前,你需要通过pip install aiofiles安装aiofiles。
1# pip install aiofiles
2import asyncio
3import aiofiles
4from pathlib import Path
5
6async def read_file(path: Path):
7 # Read file content asynchronously
8 async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
9 contents = await f.read()
10 return path.name, len(contents)
11
12async def main():
13 files = [
14 Path("example1.txt"),
15 Path("example2.txt"),
16 Path("example3.txt")
17 ]
18 tasks = [read_file(p) for p in files]
19 results = await asyncio.gather(*tasks)
20 for name, size in results:
21 print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24 asyncio.run(main())- 该代码会并行读取每个文件。
aiofiles内部常用线程池,使你能通过异步接口处理阻塞的文件I/O操作。
异步HTTP客户端(aiohttp)
HTTP请求是网络I/O的经典例子,以下介绍如何异步执行HTTP请求。在需要大量并发HTTP请求时,它尤其强大。
下面是用aiohttp并行抓取多个URL的示例。你需要使用pip install aiohttp来安装aiohttp。
1# pip install aiohttp
2import asyncio
3import aiohttp
4
5async def fetch(session: aiohttp.ClientSession, url: str):
6 # Fetch a URL asynchronously and return status and size
7 async with session.get(url) as resp:
8 text = await resp.text()
9 return url, resp.status, len(text)
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [fetch(session, u) for u in urls]
14 for coro in asyncio.as_completed(tasks):
15 url, status, size = await coro
16 print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19 urls = [
20 "https://codesparklab.com/json/example1.json",
21 "https://codesparklab.com/json/example2.json",
22 "https://codesparklab.com/json/example3.json",
23 ]
24 asyncio.run(main(urls))- 通过
asyncio.as_completed可以按任务完成的顺序处理结果。这对高效处理大量请求非常有用。
与阻塞I/O共存:run_in_executor
在异步代码中处理 CPU 密集型任务或已有的阻塞 API 时,可以通过 loop.run_in_executor 使用 ThreadPoolExecutor 或 ProcessPoolExecutor。
下面的代码是一个使用线程池并发运行假设为阻塞 I/O 任务的示例。
1import asyncio
2from concurrent.futures import ThreadPoolExecutor
3import time
4
5def blocking_io(n):
6 # Simulate a blocking I/O or CPU-bound function
7 time.sleep(n)
8 return f"slept {n}s"
9
10async def main():
11 loop = asyncio.get_running_loop()
12 with ThreadPoolExecutor() as pool:
13 tasks = [
14 loop.run_in_executor(pool, blocking_io, 1),
15 loop.run_in_executor(pool, blocking_io, 2),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Blocking results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- 通过使用
run_in_executor,可以在无需大量重写的情况下,将现有的同步代码整合进异步流程中。但要注意线程数量与CPU负载。 ProcessPoolExecutor适用于 CPU 密集型任务。
异步服务器:基于asyncio的TCP回显服务器
如需直接操作socket,可用asyncio.start_server轻松构建异步服务器。
下面是一个简单的回显服务器,它会将来自客户端的数据原样返回。
1import asyncio
2
3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
4 # Handle a single client: read data and echo it back
5 addr = writer.get_extra_info('peername')
6 print(f"Connection from {addr}")
7 while True:
8 data = await reader.read(1024)
9 if not data:
10 break
11 writer.write(data) # echo back
12 await writer.drain()
13 writer.close()
14 await writer.wait_closed()
15 print(f"Connection closed {addr}")
16
17async def main():
18 server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19 addr = server.sockets[0].getsockname()
20 print(f"Serving on {addr}")
21 async with server:
22 await server.serve_forever()
23
24if __name__ == "__main__":
25 asyncio.run(main())-
在使用
asyncio进行TCP通信时,StreamReader和StreamWriter在异步输入输出中起着核心作用。StreamReader可异步读取客户端发送的数据,StreamWriter则用于将服务器的响应写回客户端。 -
即使不需要自己处理套接字的详细操作,你也可以使用
asyncio.start_server简单且高效地启动一个异步服务器。 -
当你向
asyncio.start_server传递处理函数时,该函数会接收reader和writer作为参数。使用这些对象,你可以比直接处理底层套接字 API 更安全、更清晰地实现通信流程。例如,通过使用reader.read()接收数据,并结合使用writer.write()和writer.drain(),你可以实现确保传输完成的异步发送。 -
该结构适合处理大量并发连接,适用于简单协议或小规模TCP服务。
处理大规模流式数据
顺序处理大文件或响应时,应分块读写数据以降低内存消耗。下面是利用aiohttp进行流式读取的示例。
下例将HTTP响应以块为单位处理,并在接收数据时写入磁盘。
1import aiohttp
2import asyncio
3import aiofiles
4
5async def stream_download(url: str, dest: str):
6 # Stream download and write to file in chunks
7 async with aiohttp.ClientSession() as session:
8 async with session.get(url) as resp:
9 async with aiofiles.open(dest, 'wb') as f:
10 async for chunk in resp.content.iter_chunked(1024 * 64):
11 await f.write(chunk)
12
13if __name__ == "__main__":
14 asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))-
此代码不会一次性加载整个大文件;相反,它会分块(小片段)接收数据,并异步写入文件。因此,它能够在保持较低内存占用的同时,实现快速且高效的下载。
aiohttp异步获取数据,而aiofiles在不阻塞的情况下写入文件,使其能够轻松与其他任务并行运行。 -
该模式适合以最小内存高效下载和保存大文件。
异步执行子进程
如果要异步运行外部命令,并实时读取输出,asyncio.create_subprocess_exec会很有用。
下面演示如何启动外部命令并实时读取其标准输出。
1import asyncio
2
3async def run_cmd(cmd):
4 # Run external command asynchronously and capture output line by line
5 proc = await asyncio.create_subprocess_exec(
6 *cmd,
7 stdout=asyncio.subprocess.PIPE,
8 stderr=asyncio.subprocess.PIPE
9 )
10
11 async def read_stream(stream, name):
12 while True:
13 line = await stream.readline()
14 if not line:
15 break
16 print(f"[{name}] {line.decode().rstrip()}")
17
18 await asyncio.gather(
19 read_stream(proc.stdout, "stdout"),
20 read_stream(proc.stderr, "stderr"),
21 )
22 await proc.wait()
23 return proc.returncode
24
25if __name__ == "__main__":
26 asyncio.run(run_cmd(["python", "--version"]))- 通过异步控制子进程,可实时处理外部工具日志,或并行运行多个进程。
处理取消与超时
异步任务可以被取消。实现超时时,使用asyncio.wait_for很简单。
下面是带超时执行任务的示例代码。
1import asyncio
2
3async def slow_task():
4 await asyncio.sleep(5)
5 return "done"
6
7async def main():
8 try:
9 result = await asyncio.wait_for(slow_task(), timeout=2.0)
10 print("Result:", result)
11 except asyncio.TimeoutError:
12 print("Task timed out")
13
14if __name__ == "__main__":
15 asyncio.run(main())wait_for在超时时会抛出TimeoutError,必要时取消任务。要注意任务取消的传递与清理。
控制并发(Semaphore)
由于大量并发连接或请求可能耗尽资源,建议通过asyncio.Semaphore限制并发数。
下面是用信号量限制并发下载的实例。
1import asyncio
2import aiohttp
3
4semaphore = asyncio.Semaphore(3) # allow up to 3 concurrent tasks
5
6async def limited_fetch(session, url):
7 async with semaphore:
8 async with session.get(url) as resp:
9 return url, resp.status
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [limited_fetch(session, u) for u in urls]
14 results = await asyncio.gather(*tasks)
15 print(results)
16
17if __name__ == "__main__":
18 urls = ["https://codesparklab.com/json/example.json"] * 10
19 asyncio.run(main(urls))- 这种方式可以温和访问外部服务,同时避免自身进程过载。
异常处理与重试策略
即使在异步处理中也难免发生错误。要适当捕捉异常,并实现如指数退避等重试策略。
下面是一个简单的最多重试N次的实现示例。
1import asyncio
2import aiohttp
3import random
4
5async def fetch_with_retry(session, url, retries=3):
6 for attempt in range(1, retries + 1):
7 try:
8 async with session.get(url) as resp:
9 resp.raise_for_status()
10 text = await resp.text()
11 return text
12 except Exception as e:
13 if attempt == retries:
14 raise
15 await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18 async with aiohttp.ClientSession() as session:
19 text = await fetch_with_retry(session, "https://codesparklab.com/")
20 print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23 asyncio.run(main())- 合理的重试逻辑有助于平衡一致性与流量控制。
调试与日志记录技巧
异步处理中任务并发进行,导致问题定位变得困难。记住以下要点,有助于高效追踪问题、提升调试效率。
asyncio.run()和Task中未被捕获的异常容易被忽略,请确保记录所有未处理异常。- 使用
logging时,在日志中包含协程名称或(在 Python 3.8 及以上版本中)task.get_name()可以让追踪更容易。 - 你可以使用
asyncio.Task.all_tasks()查看当前任务的状态。然而,该 API 主要用于调试目的,在生产环境中应谨慎使用,以避免性能问题或意外干扰。
性能注意事项
异步编程擅长处理I/O等待,但滥用时反而可能导致性能下降。优化时请注意以下几点:。
- 异步处理适合I/O密集型任务,不适合CPU密集型任务,后者请用进程池。
- 使用线程池或进程池时,请考虑池大小及任务性质。
- 若一次启动大量小任务,事件循环开销会增大——可使用批处理或信号量调节。
总结
Python的异步I/O是一种高效机制,能充分利用I/O等待时间并并发执行网络与文件操作。结合asyncio、aiohttp、aiofiles和run_in_executor等技术,可以灵活地构建实用的异步应用。利用async with自动管理资源的获取与释放,可以安全、可靠地管理文件、HTTP会话、锁等异步资源。通过妥善的异常处理与并发控制,可以安全运行高可靠性的异步程序。
您可以在我们的YouTube频道上使用Visual Studio Code跟随上述文章进行学习。 请也查看我们的YouTube频道。