非同期入出力

非同期入出力

この記事では非同期入出力について説明します。

Pythonで実用的に使える非同期入出力の考え方とパターンを、ステップバイステップでやさしく解説します。

YouTube Video

非同期入出力(I/O)

非同期I/Oの概念

非同期 I/O は、ファイル操作やネットワーク通信など、時間のかかる I/O 待ちのあいだに他の処理を並行して進めるための仕組みです。Python では、asyncio が標準の非同期フレームワークとして用意されており、多くのライブラリもこの仕組みに沿って設計されています。

基本:async / await とイベントループ

まずは基本的なコルーチンの書き方と、複数のコルーチンを同時に走らせる asyncio.gather の例です。

以下のコードは、非同期関数を定義して同時に実行する最小例です。sleep 関数を使って並列実行の様子を示します。

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • このコードは asyncio.run() を使ってイベントループを起動し、3つのコルーチンを同時に実行します。

async with と非同期コンテキストマネージャ

非同期処理では、「接続を開く」「ファイルを閉じる」といったリソース管理も自然と複雑になりがちです。そこで役に立つのが async with を使った非同期コンテキストマネージャです。これは、同期コードの with と同じ感覚で使える構文ですが、内部処理が非同期で行われるため、async/await の流れに自然に組み込めます。

async with を使う理由は、大きく分けて次の 2 点にあります。

  • 接続やファイルハンドル、セッションなどの リソースを確実に後片づけするため。異常終了した場合でも適切に解放されるので安心です。
  • 接続の確立や切断、フラッシュなど、非同期で行う初期化や後処理を自動化するため。手動で書く手間が省け、コードの見通しもよくなります。

以下は、簡易的な非同期コンテキストマネージャを自作した例です。

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • __aenter____aexit__ を定義することで、async with が使用可能になります。
  • async with のブロックに入るとき・出るときの処理が非同期で安全に実行されます。

非同期ファイルI/O(aiofiles

ファイル操作はブロッキングの代表例です。aiofiles を使うことで、非同期インターフェースとして安全に扱えます。実際には内部でスレッドプールを使いつつ、async with により確実にファイルをクローズします。

次の例は、複数ファイルの非同期読み込みを並列に行うサンプルです。 pip install aiofilesで、aiofiles をインストールしてから実行する必要があります。

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • このコードは各ファイルの読み込みを並列化します。aiofiles は内部でスレッドプールを使うことが多く、ブロッキングなファイルI/Oを非同期インターフェースで扱えます。

非同期HTTPクライアント(aiohttp

ネットワークI/Oの代表例として HTTP リクエストを非同期で行う方法を示します。大量のHTTPリクエストを並列で叩く場面で力を発揮します。

以下は aiohttp を使って複数URLを並列にフェッチする例です。pip install aiohttpで、aiohttpをインストールする必要があります。

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • asyncio.as_completed を使うと、タスクが完了した順に結果を処理できます。多数のリクエストを効率よく処理する際に有用です。

ブロッキングI/Oとの共存:run_in_executor

CPUを多く使う処理やブロッキングな既存APIを非同期コードで扱う場合は、ThreadPoolExecutorProcessPoolExecutorloop.run_in_executor 経由で使います。

次のコードは、ブロッキング I/O を想定した処理をスレッドプールで並行実行する例です。

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • run_in_executor を活用することで、既存の同期コードを大きく書き換えずに非同期フローに組み込めます。ただし、スレッドの数やCPU負荷に注意が必要です。
  • CPUバウンドな処理には、ProcessPoolExecutor の利用が適しています。

非同期サーバ:asyncio ベースの TCP エコーサーバ

直接ソケットを扱いたい場合、asyncio.start_server を使うと簡単に非同期なサーバを構築できます。

次の例は単純なエコーサーバで、クライアントから受け取ったデータをそのまま返します。

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • asyncio の TCP 通信では、StreamReaderStreamWriter が非同期入出力の中心的な役割を担います。StreamReader はクライアントから送られてくるデータを非同期で読み取り、StreamWriter はサーバー側からクライアントへ応答を送り返すために使います。

  • ソケットの細かな操作を自分で扱わなくても、asyncio.start_server を使うことでシンプルかつ効率的に非同期サーバーを立ち上げられます。

  • asyncio.start_server にハンドラ関数を渡すと、その関数には readerwriter が引数として与えられます。これらを使えば、低レベルなソケット API を直接扱うよりも安全でわかりやすい形で通信処理を記述できます。たとえば、reader.read() でデータを受信し、writer.write()writer.drain() を組み合わせることで、送信が確実に完了するまで待つ非同期の送信処理を実現できます。

  • この構成は大量の同時接続を捌くのに適しており、シンプルなプロトコルや小規模のTCPサービスに向きます。

ストリーミング大容量データの扱い

大きなファイルや応答を逐次処理する際は、データをチャンクで読み書きしメモリ使用量を抑えます。以下は aiohttp のストリーミング読み取り例です。

次のコードはHTTPレスポンスをチャンク単位で処理し、受け取りながらディスクに書き込みます。

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • このコードは、大きなファイルを一度に読み込まず「チャンク(小さな塊)」に分けて受信し、そのまま非同期でファイルへ書き込む構造になっています。そのため、メモリ使用量を抑えながら高速かつ効率的にダウンロード処理を行える点が特徴です。aiohttp が非同期でデータを取得し、aiofiles がブロックせずにファイルへ書き込むため、他の処理と並行しやすいという利点もあります。

  • このパターンはメモリ消費を最小化しつつ、大容量ファイルを効率的にダウンロード/保存するのに適しています。

サブプロセスの非同期実行

外部コマンドを非同期に実行してその出力を逐次読み取りたい場合、asyncio.create_subprocess_exec が便利です。

以下は外部コマンドを起動して標準出力をリアルタイムに読み取る例です。

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • サブプロセスを非同期で操作することで、外部ツールのログをリアルタイムで扱ったり、複数プロセスを並列実行したりできます。

キャンセルとタイムアウトの取り扱い

非同期タスクはキャンセル可能です。タイムアウトを設ける際は asyncio.wait_for を使うと簡潔です。

下はタイムアウト付きでタスクを実行する例です。

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • wait_for はタイムアウト時に TimeoutError を投げ、必要に応じてタスクをキャンセルします。キャンセルの伝播やクリーンアップに注意してください。

同時実行数の制御(Semaphore

大量の接続やリクエストを同時に行うとリソース枯渇につながるため、asyncio.Semaphore で同時実行数を制限します。

次はセマフォで同時ダウンロード数を制限する例です。

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • この方法で外部サービスに優しいアクセスが可能になり、自分のプロセスへの過負荷も抑えられます。

エラー処理と再試行戦略

非同期処理でもエラーは必ず発生します。適切に例外をキャッチし、指数バックオフ等の再試行戦略を実装しましょう。

以下は最大 N 回の簡単な再試行を実装した例です。

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • 適切な再試行ロジックは一貫性とトラフィック制御のバランスを取る上で重要です。

デバッグとロギングのコツ

非同期処理ではタスクが同時に進むため、問題の原因が分かりにくくなることがあります。効率よく追跡するために、次の点を意識するとデバッグがスムーズになります。

  • asyncio.run()Task の例外は見落としやすいため、未処理例外をログに残す仕組みを入れます。
  • logging を使用する際は、コルーチン名や、Python 3.8以降であればtask.get_name()をログに含めると追跡が楽になります。
  • asyncio.Task.all_tasks()を使って現在のタスク状況を調べられます。ただし、デバッグ用途に向いたAPIであり、本番環境ではパフォーマンスや予期せぬ干渉を避けるためにも慎重に利用する必要があります。

パフォーマンス上の注意点

非同期プログラミングはI/O待ちに強い一方、使い方を誤ると逆にパフォーマンスが低下することがあります。以下の点を押さえて最適化を進めます。

  • 非同期処理はI/O待ちに強いですが、CPUバウンドな処理には向かないため、その場合はプロセスプールを検討します。
  • スレッドプールやプロセスプールを使用する場合は、プールサイズとタスクの性質を考慮します。
  • 大量の小さなタスクを同時に起動するとイベントループのオーバーヘッドが増えるため、バッチ化やセマフォで調整します。

まとめ

Python の非同期 I/O は、I/O 待ちの時間を有効活用し、ネットワーク処理やファイル処理を効率的に並行実行できる強力な仕組みです。asyncio を中心に、aiohttpaiofilesrun_in_executor などの技法を組み合わせることで、実践的な非同期アプリケーションを柔軟に構築できます。また、リソースの取得と解放を自動化する async with を活用することで、ファイルやHTTPセッション、ロックなどの非同期リソースを安全かつ確実に管理できます。正しいエラーハンドリングや同時実行数の管理を取り入れることで、信頼性の高い非同期プログラムを安全に運用できます。

YouTubeチャンネルでは、Visual Studio Codeを用いて上記の記事を見ながら確認できます。 ぜひYouTubeチャンネルもご覧ください。

YouTube Video