Python の非同期処理における同期制御

Python の非同期処理における同期制御

この記事ではPython の非同期処理における同期制御について説明します。

asyncio の基本から、同期制御によく使う実践パターンまで順に学びます。

YouTube Video

Python の非同期処理における同期制御

非同期処理では「複数のタスクを同時に動かす」ことが簡単にできます。しかし、実際には並行度の制御やタスク同士の待ち合わせ、共有資源の排他制御、同期的な重たい処理への対処、キャンセル時の後片付けなど、より高度な調整が必要になります。

ここでは asyncio の基本から、同期制御によく使う実践パターンまで順に学びます。

イントロ:基礎(async / awaitcreate_task

まずは最小限の非同期コードについて見てみましょう。await は呼んだコルーチンが完了するまでその場で待ち、asyncio.create_task は並列実行のためにタスクをスケジュールします。

 1import asyncio
 2
 3async def worker(name, delay):
 4    # Simulate async work
 5    await asyncio.sleep(delay)
 6    return f"{name} done after {delay}s"
 7
 8async def main():
 9    # Create two tasks that run concurrently.
10    t1 = asyncio.create_task(worker("A", 1))
11    t2 = asyncio.create_task(worker("B", 2))
12
13    # Await both results (this suspends until both are done).
14    result1 = await t1
15    result2 = await t2
16    print(result1, result2)
17
18if __name__ == "__main__":
19    asyncio.run(main())
  • このコードは、タスクを明示的に作って並列に実行し、最後に await して結果を受け取る典型的なパターンです。create_task により同時進行が可能になります。

asyncio.gatherasyncio.wait / asyncio.as_completed の違い

複数のコルーチンを同時に実行するとき、結果をどう受け取るかで使い分けます。gather は「全部終わるまで待って、入力順に結果を返す」方法で、as_completed は「終わったものから順に処理できる」方法です。

 1import asyncio
 2import random
 3
 4async def job(i):
 5    delay = random.random() * 2
 6    await asyncio.sleep(delay)
 7    return (i, delay)
 8
 9async def gather_example():
10    # gather waits for all tasks and returns results in the same order as input
11    results = await asyncio.gather(*(job(i) for i in range(5)))
12    print("gather order:", results)
13
14async def as_completed_example():
15    # as_completed yields results as they finish (useful to process early results)
16    tasks = [asyncio.create_task(job(i)) for i in range(5)]
17    for coro in asyncio.as_completed(tasks):
18        res = await coro
19        print("completed:", res)
20
21async def main():
22    await gather_example()
23    await as_completed_example()
24
25if __name__ == "__main__":
26    asyncio.run(main())
  • このコードのように、gather は入力順に結果を返すので順序を保持したい場合に便利です。as_completed は早く終わったものから処理したいときに使います。

並列度の制御:asyncio.Semaphore で同時実行数を制限する

外部 API のレート制限や DB 接続数の制限があるとき、Semaphore で同時実行数を管理できます。

 1import asyncio
 2import random
 3
 4sem = asyncio.Semaphore(3)  # allow up to 3 concurrent workers
 5
 6async def limited_worker(i):
 7    async with sem:
 8        # Only 3 tasks can be inside this block concurrently
 9        delay = random.random() * 2
10        await asyncio.sleep(delay)
11        print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14    tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15    await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18    asyncio.run(main())
  • Semaphoreasync with で使うと、同時に実行できる数を簡単に制限できます。外部制約がある場面で有効です。

共有資源の排他制御:asyncio.Lock

共有データの同時更新を防ぐために Lock を使います。asyncio.Lock は非同期用の排他プリミティブです。

 1import asyncio
 2
 3counter = 0
 4lock = asyncio.Lock()
 5
 6async def incrementer(n_times):
 7    global counter
 8    for _ in range(n_times):
 9        # Acquire lock to update shared counter safely
10        async with lock:
11            tmp = counter
12            await asyncio.sleep(0)  # yield control to increase race likelihood
13            counter = tmp + 1
14
15async def main():
16    tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17    await asyncio.gather(*tasks)
18    print("final counter:", counter)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • グローバルな counter のような共有変数を複数タスクで更新すると競合が起きます。Lock で囲むことで整合性を保てます。

タスク間の待ち合わせ:asyncio.Event

あるタスクが「準備ができた」合図を出し、それを他のタスクが待ち受けるときに Event を使います。これは、タスク同士で合図を共有するための簡単な同期手段です。

 1import asyncio
 2
 3event = asyncio.Event()
 4
 5async def waiter(name):
 6    print(f"{name} is waiting for event")
 7    await event.wait()
 8    print(f"{name} resumed after event set")
 9
10async def setter():
11    print("setter will sleep and then set the event")
12    await asyncio.sleep(1)
13    event.set()
14    print("event set by setter")
15
16async def main():
17    tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18    await asyncio.create_task(setter())
19    await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22    asyncio.run(main())
  • Event はブール的なフラグを持ち、set() で待っているすべてのタスクを再開させます。単純な同期に便利です。

生産者・消費者パターン:asyncio.Queue

Queue を使うと、データを作る側(プロデューサ)と処理する側(コンシューマ)が非同期でスムーズに連携できます。また、キューが満杯になると自動的に待ちが発生するため、生産しすぎを防ぐ仕組み(バックプレッシャー)も自然に実現できます。

 1import asyncio
 2import random
 3
 4async def producer(q, n_items):
 5    for i in range(n_items):
 6        await asyncio.sleep(random.random() * 0.5)
 7        item = f"item-{i}"
 8        await q.put(item)
 9        print("produced", item)
10    # signal consumers to stop
11    await q.put(None)
12
13async def consumer(q, name):
14    while True:
15        item = await q.get()
16        if item is None:
17            # put sentinel back for other consumers and break
18            await q.put(None)
19            break
20        await asyncio.sleep(random.random() * 1)
21        print(name, "consumed", item)
22        q.task_done()
23
24async def main():
25    q = asyncio.Queue(maxsize=5)  # bounded queue to apply backpressure
26    prod = asyncio.create_task(producer(q, 10))
27    cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28    await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31    asyncio.run(main())
  • Queue は、プロデューサとコンシューマの動きを非同期でうまく調整してくれます。さらに maxsize を設定すると、キューが満杯のときプロデューサは put で待たされるため、作りすぎを防げます。

同期的なブロッキング処理を扱う:run_in_executor

CPU を多く使う処理や、非同期に対応していないライブラリを使う場合は、run_in_executor を使って別のスレッドやプロセスに処理を任せます。こうすることで、メインのイベントループが止まらず、ほかの非同期タスクがスムーズに動き続けられます。

 1import asyncio
 2import time
 3
 4def blocking_io(x):
 5    # simulate blocking I/O or CPU-bound work
 6    time.sleep(2)
 7    return x * x
 8
 9async def main():
10    loop = asyncio.get_running_loop()
11    # run blocking_io in default thread pool
12    result = await loop.run_in_executor(None, blocking_io, 3)
13    print("blocking result:", result)
14
15if __name__ == "__main__":
16    asyncio.run(main())
  • 同期関数を直接呼ぶとイベントループが停止します。run_in_executor を使うと別スレッドで実行され、非同期タスクは並行して進行できます。

例:レート制限つき API 呼び出し(Semaphore + run_in_executor の組合せ)

以下は、外部 API を呼び出す回数を制限しながら、その結果に対して重い処理を行う状況を想定したサンプルです。Semaphorerun_in_executor を組み合わせて、安全かつ効率的に処理を進めます。

 1import asyncio
 2import time
 3import random
 4
 5sem = asyncio.Semaphore(5)
 6
 7def heavy_sync_processing(data):
 8    # simulate heavy CPU-bound work
 9    time.sleep(1)
10    return f"processed-{data}"
11
12async def api_call(i):
13    await asyncio.sleep(random.random() * 0.5)  # simulate network latency
14    return f"data-{i}"
15
16async def worker(i):
17    async with sem:
18        data = await api_call(i)
19        # offload CPU-bound work to threadpool
20        loop = asyncio.get_running_loop()
21        result = await loop.run_in_executor(None, heavy_sync_processing, data)
22        print(result)
23
24async def main():
25    tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26    await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29    asyncio.run(main())
  • ここでは Semaphore で同時 API 呼び出し数を制限し、得られたデータに対する重い処理はスレッドプールへ投げています。ネットワークと CPU を分離して効率的に処理できます。

タスクのキャンセルとクリーンアップ

タスクがキャンセルされた場合は、finallyasyncio.CancelledError を正しく処理することがとても重要です。これにより、開いたファイルや接続の解放、途中状態の整理などを確実に行い、アプリ全体の一貫性を保てます。

 1import asyncio
 2
 3async def long_running():
 4    try:
 5        print("started long_running")
 6        await asyncio.sleep(10)
 7        print("finished long_running")
 8    except asyncio.CancelledError:
 9        print("long_running was cancelled, cleaning up")
10        # perform cleanup here
11        raise
12
13async def main():
14    task = asyncio.create_task(long_running())
15    await asyncio.sleep(1)
16    task.cancel()
17    try:
18        await task
19    except asyncio.CancelledError:
20        print("task cancelled in main")
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • キャンセルは例外 (CancelledError) として届くため、except ブロックで必要なクリーンアップを行い、必要に応じて例外を再送出します。

実践的な設計のポイント

以下は、非同期処理を設計するときに役立つ実践的なポイントです。

  • 並列度は明確に制御する 外部 API や DB などリソースに上限がある場合は、Semaphore で同時実行数を絞れます。

  • 共有資源は安全に扱う 複数タスクで状態を更新する必要があるなら Lock を使います。共有状態を減らし、不変データを中心に設計するとより安全になります。

  • 結果の受け取り方を選ぶ 完了したタスクから順に処理したいときは asyncio.as_completed、入力順のまままとめて処理したいときは gather を使います。

  • 重たい同期処理は隔離する CPU を使う処理や同期ライブラリの呼び出しは run_in_executorProcessPoolExecutor に任せ、イベントループのブロックを避けます。

  • キャンセルと例外を前提にする タスクが途中でキャンセルされても安全に終われるように、例外処理を適切に書いてリソースを確実に解放します。

  • テストしやすくする I/O、時間、乱数などの副作用は抽象化して差し替え可能にしておくと、非同期コードでもテストが容易になります。

まとめ

asyncio は強力ですが「並列に動かせる」ことだけに気を取られると、共有資源競合や外部制限違反、イベントループのブロックなどに遭遇します。SemaphoreLockEventQueuerun_in_executor、そして適切なキャンセル処理を組み合わせることで、安全で効率的な非同期アプリケーションを設計できます。「生産者・消費者」パターンをはじめ、「同時実行数の制限」や「非同期処理とブロッキング処理の切り離し」といった仕組みを活用することで、非同期処理をより安全で効率的に組み立てられます。

YouTubeチャンネルでは、Visual Studio Codeを用いて上記の記事を見ながら確認できます。 ぜひYouTubeチャンネルもご覧ください。

YouTube Video