非同步輸入/輸出

非同步輸入/輸出

本文將說明非同步輸入/輸出。

本指南將循序漸進地溫和解釋在 Python 實際運用中非常有用的非同步輸入/輸出概念與模式。

YouTube Video

非同步輸入/輸出(I/O)

非同步 I/O 的基本概念

非同步 I/O 是一種機制,允許在等待耗時的 I/O(例如文件操作或網路通訊)時,其他操作能夠並行執行。在 Python 中,asyncio 作為標準非同步框架被提供,許多函式庫也都設計為遵循這一機制。

基礎:async / await 與事件循環

首先,介紹如何撰寫基本協程,以及如何用 asyncio.gather 同時執行多個協程的範例。

以下程式碼是一個最簡的範例,用於定義並並行執行非同步函式。sleep 函式用於展示並行執行的效果。

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • 這段程式碼使用 asyncio.run() 啟動事件循環,並同時運行三個協程。

async with 與非同步上下文管理器

在非同步處理中,像是開啟連線及關閉文件等資源管理會很容易變得複雜。這時候,使用 async with 的非同步上下文管理器就會非常有用。這種語法跟同步的 with 陳述式一樣用,但內部處理是非同步的,非常適合融入 async/await 流程中。

使用 async with 有兩個主要原因:。

  • 可以可靠地釋放像是連線、檔案控制權或會話等資源。即使發生異常中止,也能確保資源正確被釋放,讓人安心。
  • 自動化初始化與清理作業,例如建立或關閉連線、刷新等都可在非同步的流程中自動完成。這樣可以省去手動編寫的麻煩,並讓程式碼更清晰易懂。

以下是從零開始創建簡單非同步上下文管理器的範例。

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • 只要定義 __aenter____aexit__,就可以使用 async with
  • 進入與離開 async with 區塊時的處理都會以非同步、安全的方式執行。

非同步文件 I/O(aiofiles

檔案處理是典型的阻塞性例子。透過 aiofiles,你可以安全地進行檔案的非同步操作。它內部會使用執行緒池,並搭配 async with 確保檔案能被正確關閉。

以下範例展示了如何平行非同步讀取多個檔案。這段程式碼需先用 pip install aiofiles 安裝 aiofiles 後才能執行。

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • 這段程式碼會平行處理每一份檔案的讀取。aiofiles 通常會在內部使用執行緒池,讓你透過非同步界面處理阻塞的檔案 I/O。

非同步 HTTP 用戶端(aiohttp

作為網路 I/O 的經典例子,以下介紹如何非同步執行 HTTP 請求。特別是在需要同時平行發送大量 HTTP 請求時,效果非常顯著。

以下是用 aiohttp 平行獲取多個 URL 的範例。你需要先用 pip install aiohttp 安裝 aiohttp

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • 搭配 asyncio.as_completed,可以按照任務完成的順序來處理結果。對於有效地處理大量請求這十分有用。

與阻塞式I/O的共存:run_in_executor

在非同步程式碼中遇到 CPU 密集型任務或現有的阻塞 API 時,請透過 loop.run_in_executor 使用 ThreadPoolExecutorProcessPoolExecutor

以下程式碼示範了如何使用執行緒池同時運行假設為阻塞 I/O 的任務。

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • 利用 run_in_executor,您可以將現有同步程式碼納入非同步流程,而無需進行大幅度重寫。不過,須注意執行緒數量及 CPU 負載。
  • ProcessPoolExecutor 適用於需大量 CPU 計算的任務。

非同步伺服器:以 asyncio 為基礎的 TCP Echo 伺服器

如果要直接操作 socket,只需用 asyncio.start_server 就能輕鬆建立非同步伺服器。

以下是簡單的 echo 伺服器範例,會將從 client 收到的資料原封不動傳回去。

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • 在使用asyncio進行的TCP通信中,StreamReaderStreamWriter在非同步輸入輸出中扮演著核心角色。StreamReader非同步地讀取客戶端傳來的資料,而StreamWriter則用於從伺服器將回應發送回客戶端。

  • 即使不需要自己處理socket的詳細操作,也可以透過asyncio.start_server簡單且高效地啟動非同步伺服器。

  • 當你將處理函式傳遞給asyncio.start_server時,該函式會收到readerwriter作為參數。利用這些工具,可以比直接操作低階socket API更安全、更清晰地實現通訊流程。例如,透過reader.read()接收資料,再結合writer.write()writer.drain(),可以實現確保傳輸完成的非同步發送。

  • 這種架構適合處理大量同時連線,也很適合用於簡易協議或小型 TCP 服務。

處理大型串流資料

當需依序處理大型檔案或回應時,請分塊(chunk)讀寫資料,以降低記憶體使用量。以下為使用 aiohttp 進行串流讀取的範例。

下列程式碼會將 HTTP 回應分區塊處理,接收資料時就直接寫入磁碟。

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • 這段程式碼並不是一次性載入大型檔案,而是將資料分成多個區塊分批接收,並以非同步的方式寫入檔案。因此,它能在維持低內存使用量的同時,快速且高效地進行下載。aiohttp能非同步地取得資料,而aiofiles則不會阻塞地將資料寫入檔案,使其容易與其他程序並行執行。

  • 這種設計很適合高效下載大型檔案並儲存,又能極小化記憶體用量。

非同步子程序執行

若想非同步執行外部指令並且即時讀取其輸出,可以使用 asyncio.create_subprocess_exec

以下展示啟動外部指令並即時讀取標準輸出的範例。

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • 透過非同步控制子程序,可以即時取得外部工具的日誌,或同時並行運作多個外部程序。

處理取消與超時

非同步任務可被取消。要設置超時只需使用 asyncio.wait_for 即可。

以下是在限定超時時間下運行任務的範例。

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • wait_for 超時時會丟出 TimeoutError,必要時同時取消任務。需注意任務取消後的傳遞及善後處理。

控制並發數量(Semaphore

由於同時發出大量連線/請求會耗盡資源,請用 asyncio.Semaphore 限制併發數。

以下是用 semaphore 限制同時下載數的範例。

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • 用這方法可以溫和地存取外部服務,也能避免讓自己程序超載。

錯誤處理與重試策略

即使在非同步處理中,也不可避免地會發生錯誤。請適當捕捉例外,同時實作例如指數退避(exponential backoff)等重試策略。

以下為簡單重試至 N 次的實作範例。

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • 良好的重試邏輯對於平衡一致性和流量控制非常重要。

除錯與記錄技巧

非同步處理時,任務會並行進行,因此問題發生的原因往往不易查明。若要有效追蹤問題,請注意以下幾點,能讓除錯更加順利。

  • asyncio.run()Task 產生的例外容易被忽略,請確保未處理例外有被記錄下來。
  • 當使用logging時,若在日誌中包含協程名稱,或Python 3.8以上版本使用task.get_name(),可讓追蹤更容易。
  • 你可以使用asyncio.Task.all_tasks()來查看目前任務的狀態。然而,這個API是為了除錯用途設計,在生產環境中使用時應特別留意,避免產生效能問題或出現意外干擾。

效能優化重點

雖然非同步程式特別擅長處理 I/O 等待,但若誤用反而會降低效能。最佳化時請考慮以下幾點:。

  • 非同步處理適合 I/O 密集型任務,但不適用於 CPU 密集,這時該考慮用 process pool。
  • 使用執行緒池或程序池時,請考量池的大小與任務特性。
  • 一口氣啟動過多的小任務會造成事件循環負擔,請搭配分批(batch)或 semaphore 做調整。

總結

Python 的非同步 I/O 能有效利用 I/O 等待時間,並高效並行處理網路與文件操作,是一套強大的機制。結合 asyncioaiohttpaiofilesrun_in_executor 等技巧,可靈活構築實用的非同步應用程式。使用 async with 自動取得和釋放資源,能安全且可靠地管理文件、HTTP 會話、鎖等非同步資源。配合適當的錯誤處理與併發控制,能讓高可靠的非同步應用安全穩健地運行。

您可以在我們的 YouTube 頻道上使用 Visual Studio Code 來跟隨上述文章一起學習。 請也查看我們的 YouTube 頻道。

YouTube Video