非同步輸入/輸出
本文將說明非同步輸入/輸出。
本指南將循序漸進地溫和解釋在 Python 實際運用中非常有用的非同步輸入/輸出概念與模式。
YouTube Video
非同步輸入/輸出(I/O)
非同步 I/O 的基本概念
非同步 I/O 是一種機制,允許在等待耗時的 I/O(例如文件操作或網路通訊)時,其他操作能夠並行執行。在 Python 中,asyncio 作為標準非同步框架被提供,許多函式庫也都設計為遵循這一機制。
基礎:async / await 與事件循環
首先,介紹如何撰寫基本協程,以及如何用 asyncio.gather 同時執行多個協程的範例。
以下程式碼是一個最簡的範例,用於定義並並行執行非同步函式。sleep 函式用於展示並行執行的效果。
1import asyncio
2
3async def worker(name: str, delay: float):
4 # Simulate I/O-bound work by sleeping
5 print(f"{name} started")
6 await asyncio.sleep(delay)
7 print(f"{name} finished after {delay}s")
8 return name, delay
9
10async def main():
11 # Run multiple coroutines concurrently
12 tasks = [
13 worker("A", 1.5),
14 worker("B", 1.0),
15 worker("C", 0.5),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- 這段程式碼使用
asyncio.run()啟動事件循環,並同時運行三個協程。
async with 與非同步上下文管理器
在非同步處理中,像是開啟連線及關閉文件等資源管理會很容易變得複雜。這時候,使用 async with 的非同步上下文管理器就會非常有用。這種語法跟同步的 with 陳述式一樣用,但內部處理是非同步的,非常適合融入 async/await 流程中。
使用 async with 有兩個主要原因:。
- 可以可靠地釋放像是連線、檔案控制權或會話等資源。即使發生異常中止,也能確保資源正確被釋放,讓人安心。
- 自動化初始化與清理作業,例如建立或關閉連線、刷新等都可在非同步的流程中自動完成。這樣可以省去手動編寫的麻煩,並讓程式碼更清晰易懂。
以下是從零開始創建簡單非同步上下文管理器的範例。
1import asyncio
2
3class AsyncResource:
4 async def __aenter__(self):
5 print("Opening resource...")
6 await asyncio.sleep(0.5)
7 print("Resource opened")
8 return self
9
10 async def __aexit__(self, exc_type, exc, tb):
11 print("Closing resource...")
12 await asyncio.sleep(0.5)
13 print("Resource closed")
14
15async def main():
16 async with AsyncResource() as r:
17 print("Using resource...")
18
19if __name__ == "__main__":
20 asyncio.run(main())- 只要定義
__aenter__和__aexit__,就可以使用async with。 - 進入與離開
async with區塊時的處理都會以非同步、安全的方式執行。
非同步文件 I/O(aiofiles)
檔案處理是典型的阻塞性例子。透過 aiofiles,你可以安全地進行檔案的非同步操作。它內部會使用執行緒池,並搭配 async with 確保檔案能被正確關閉。
以下範例展示了如何平行非同步讀取多個檔案。這段程式碼需先用 pip install aiofiles 安裝 aiofiles 後才能執行。
1# pip install aiofiles
2import asyncio
3import aiofiles
4from pathlib import Path
5
6async def read_file(path: Path):
7 # Read file content asynchronously
8 async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
9 contents = await f.read()
10 return path.name, len(contents)
11
12async def main():
13 files = [
14 Path("example1.txt"),
15 Path("example2.txt"),
16 Path("example3.txt")
17 ]
18 tasks = [read_file(p) for p in files]
19 results = await asyncio.gather(*tasks)
20 for name, size in results:
21 print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24 asyncio.run(main())- 這段程式碼會平行處理每一份檔案的讀取。
aiofiles通常會在內部使用執行緒池,讓你透過非同步界面處理阻塞的檔案 I/O。
非同步 HTTP 用戶端(aiohttp)
作為網路 I/O 的經典例子,以下介紹如何非同步執行 HTTP 請求。特別是在需要同時平行發送大量 HTTP 請求時,效果非常顯著。
以下是用 aiohttp 平行獲取多個 URL 的範例。你需要先用 pip install aiohttp 安裝 aiohttp。
1# pip install aiohttp
2import asyncio
3import aiohttp
4
5async def fetch(session: aiohttp.ClientSession, url: str):
6 # Fetch a URL asynchronously and return status and size
7 async with session.get(url) as resp:
8 text = await resp.text()
9 return url, resp.status, len(text)
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [fetch(session, u) for u in urls]
14 for coro in asyncio.as_completed(tasks):
15 url, status, size = await coro
16 print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19 urls = [
20 "https://codesparklab.com/json/example1.json",
21 "https://codesparklab.com/json/example2.json",
22 "https://codesparklab.com/json/example3.json",
23 ]
24 asyncio.run(main(urls))- 搭配
asyncio.as_completed,可以按照任務完成的順序來處理結果。對於有效地處理大量請求這十分有用。
與阻塞式I/O的共存:run_in_executor
在非同步程式碼中遇到 CPU 密集型任務或現有的阻塞 API 時,請透過 loop.run_in_executor 使用 ThreadPoolExecutor 或 ProcessPoolExecutor。
以下程式碼示範了如何使用執行緒池同時運行假設為阻塞 I/O 的任務。
1import asyncio
2from concurrent.futures import ThreadPoolExecutor
3import time
4
5def blocking_io(n):
6 # Simulate a blocking I/O or CPU-bound function
7 time.sleep(n)
8 return f"slept {n}s"
9
10async def main():
11 loop = asyncio.get_running_loop()
12 with ThreadPoolExecutor() as pool:
13 tasks = [
14 loop.run_in_executor(pool, blocking_io, 1),
15 loop.run_in_executor(pool, blocking_io, 2),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Blocking results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- 利用
run_in_executor,您可以將現有同步程式碼納入非同步流程,而無需進行大幅度重寫。不過,須注意執行緒數量及 CPU 負載。 ProcessPoolExecutor適用於需大量 CPU 計算的任務。
非同步伺服器:以 asyncio 為基礎的 TCP Echo 伺服器
如果要直接操作 socket,只需用 asyncio.start_server 就能輕鬆建立非同步伺服器。
以下是簡單的 echo 伺服器範例,會將從 client 收到的資料原封不動傳回去。
1import asyncio
2
3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
4 # Handle a single client: read data and echo it back
5 addr = writer.get_extra_info('peername')
6 print(f"Connection from {addr}")
7 while True:
8 data = await reader.read(1024)
9 if not data:
10 break
11 writer.write(data) # echo back
12 await writer.drain()
13 writer.close()
14 await writer.wait_closed()
15 print(f"Connection closed {addr}")
16
17async def main():
18 server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19 addr = server.sockets[0].getsockname()
20 print(f"Serving on {addr}")
21 async with server:
22 await server.serve_forever()
23
24if __name__ == "__main__":
25 asyncio.run(main())-
在使用
asyncio進行的TCP通信中,StreamReader和StreamWriter在非同步輸入輸出中扮演著核心角色。StreamReader非同步地讀取客戶端傳來的資料,而StreamWriter則用於從伺服器將回應發送回客戶端。 -
即使不需要自己處理socket的詳細操作,也可以透過
asyncio.start_server簡單且高效地啟動非同步伺服器。 -
當你將處理函式傳遞給
asyncio.start_server時,該函式會收到reader和writer作為參數。利用這些工具,可以比直接操作低階socket API更安全、更清晰地實現通訊流程。例如,透過reader.read()接收資料,再結合writer.write()與writer.drain(),可以實現確保傳輸完成的非同步發送。 -
這種架構適合處理大量同時連線,也很適合用於簡易協議或小型 TCP 服務。
處理大型串流資料
當需依序處理大型檔案或回應時,請分塊(chunk)讀寫資料,以降低記憶體使用量。以下為使用 aiohttp 進行串流讀取的範例。
下列程式碼會將 HTTP 回應分區塊處理,接收資料時就直接寫入磁碟。
1import aiohttp
2import asyncio
3import aiofiles
4
5async def stream_download(url: str, dest: str):
6 # Stream download and write to file in chunks
7 async with aiohttp.ClientSession() as session:
8 async with session.get(url) as resp:
9 async with aiofiles.open(dest, 'wb') as f:
10 async for chunk in resp.content.iter_chunked(1024 * 64):
11 await f.write(chunk)
12
13if __name__ == "__main__":
14 asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))-
這段程式碼並不是一次性載入大型檔案,而是將資料分成多個區塊分批接收,並以非同步的方式寫入檔案。因此,它能在維持低內存使用量的同時,快速且高效地進行下載。
aiohttp能非同步地取得資料,而aiofiles則不會阻塞地將資料寫入檔案,使其容易與其他程序並行執行。 -
這種設計很適合高效下載大型檔案並儲存,又能極小化記憶體用量。
非同步子程序執行
若想非同步執行外部指令並且即時讀取其輸出,可以使用 asyncio.create_subprocess_exec。
以下展示啟動外部指令並即時讀取標準輸出的範例。
1import asyncio
2
3async def run_cmd(cmd):
4 # Run external command asynchronously and capture output line by line
5 proc = await asyncio.create_subprocess_exec(
6 *cmd,
7 stdout=asyncio.subprocess.PIPE,
8 stderr=asyncio.subprocess.PIPE
9 )
10
11 async def read_stream(stream, name):
12 while True:
13 line = await stream.readline()
14 if not line:
15 break
16 print(f"[{name}] {line.decode().rstrip()}")
17
18 await asyncio.gather(
19 read_stream(proc.stdout, "stdout"),
20 read_stream(proc.stderr, "stderr"),
21 )
22 await proc.wait()
23 return proc.returncode
24
25if __name__ == "__main__":
26 asyncio.run(run_cmd(["python", "--version"]))- 透過非同步控制子程序,可以即時取得外部工具的日誌,或同時並行運作多個外部程序。
處理取消與超時
非同步任務可被取消。要設置超時只需使用 asyncio.wait_for 即可。
以下是在限定超時時間下運行任務的範例。
1import asyncio
2
3async def slow_task():
4 await asyncio.sleep(5)
5 return "done"
6
7async def main():
8 try:
9 result = await asyncio.wait_for(slow_task(), timeout=2.0)
10 print("Result:", result)
11 except asyncio.TimeoutError:
12 print("Task timed out")
13
14if __name__ == "__main__":
15 asyncio.run(main())wait_for超時時會丟出TimeoutError,必要時同時取消任務。需注意任務取消後的傳遞及善後處理。
控制並發數量(Semaphore)
由於同時發出大量連線/請求會耗盡資源,請用 asyncio.Semaphore 限制併發數。
以下是用 semaphore 限制同時下載數的範例。
1import asyncio
2import aiohttp
3
4semaphore = asyncio.Semaphore(3) # allow up to 3 concurrent tasks
5
6async def limited_fetch(session, url):
7 async with semaphore:
8 async with session.get(url) as resp:
9 return url, resp.status
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [limited_fetch(session, u) for u in urls]
14 results = await asyncio.gather(*tasks)
15 print(results)
16
17if __name__ == "__main__":
18 urls = ["https://codesparklab.com/json/example.json"] * 10
19 asyncio.run(main(urls))- 用這方法可以溫和地存取外部服務,也能避免讓自己程序超載。
錯誤處理與重試策略
即使在非同步處理中,也不可避免地會發生錯誤。請適當捕捉例外,同時實作例如指數退避(exponential backoff)等重試策略。
以下為簡單重試至 N 次的實作範例。
1import asyncio
2import aiohttp
3import random
4
5async def fetch_with_retry(session, url, retries=3):
6 for attempt in range(1, retries + 1):
7 try:
8 async with session.get(url) as resp:
9 resp.raise_for_status()
10 text = await resp.text()
11 return text
12 except Exception as e:
13 if attempt == retries:
14 raise
15 await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18 async with aiohttp.ClientSession() as session:
19 text = await fetch_with_retry(session, "https://codesparklab.com/")
20 print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23 asyncio.run(main())- 良好的重試邏輯對於平衡一致性和流量控制非常重要。
除錯與記錄技巧
非同步處理時,任務會並行進行,因此問題發生的原因往往不易查明。若要有效追蹤問題,請注意以下幾點,能讓除錯更加順利。
asyncio.run()及Task產生的例外容易被忽略,請確保未處理例外有被記錄下來。- 當使用
logging時,若在日誌中包含協程名稱,或Python 3.8以上版本使用task.get_name(),可讓追蹤更容易。 - 你可以使用
asyncio.Task.all_tasks()來查看目前任務的狀態。然而,這個API是為了除錯用途設計,在生產環境中使用時應特別留意,避免產生效能問題或出現意外干擾。
效能優化重點
雖然非同步程式特別擅長處理 I/O 等待,但若誤用反而會降低效能。最佳化時請考慮以下幾點:。
- 非同步處理適合 I/O 密集型任務,但不適用於 CPU 密集,這時該考慮用 process pool。
- 使用執行緒池或程序池時,請考量池的大小與任務特性。
- 一口氣啟動過多的小任務會造成事件循環負擔,請搭配分批(batch)或 semaphore 做調整。
總結
Python 的非同步 I/O 能有效利用 I/O 等待時間,並高效並行處理網路與文件操作,是一套強大的機制。結合 asyncio、aiohttp、aiofiles 與 run_in_executor 等技巧,可靈活構築實用的非同步應用程式。使用 async with 自動取得和釋放資源,能安全且可靠地管理文件、HTTP 會話、鎖等非同步資源。配合適當的錯誤處理與併發控制,能讓高可靠的非同步應用安全穩健地運行。
您可以在我們的 YouTube 頻道上使用 Visual Studio Code 來跟隨上述文章一起學習。 請也查看我們的 YouTube 頻道。