Pythonにおける`concurrent`モジュール

Pythonにおける`concurrent`モジュール

この記事ではPythonにおけるconcurrentモジュールについて説明します。

並行処理と並列処理の考え方を整理しながら、concurrent モジュールを用いた非同期処理の実装方法を、実践的なサンプルとともに解説します。

YouTube Video

Pythonにおけるconcurrentモジュール

Python で処理を高速化するとき、「並行処理」と「並列処理」の違いを意識することは大切です。concurrent モジュールは、その違いを意識しながら 安全かつシンプルに非同期処理を扱うための重要な手段です。

「並行処理」と「並列処理」の違い

  • 並行処理(Concurrency)とは、複数の仕事を短い単位で切り替えながら進める設計の考え方です。 実際には同時に動いていなくても、「待ち時間」を有効活用することで、全体の処理を効率よく進められます。

  • 並列処理(Parallelism)とは、複数の仕事を物理的に同時に実行する仕組みです。 CPU コアを複数使い、計算処理そのものを同時に進めます。

この 2 つはどちらも処理を速くする手法ですが、並行処理は「どう進めるか」という設計の問題、並列処理は「どう実行されるか」という実行形態の問題という点で、本質的に異なります。

concurrent モジュールとは何か

concurrent は、Python で並行処理、並列処理を安全かつシンプルに扱うための高水準 APIを提供する標準ライブラリです。スレッドやプロセスの生成、管理といった低レベルな操作を意識せず、「タスクを実行する」という視点に集中できるよう設計されています。

ThreadPoolExecutorProcessPoolExecutor の役割

concurrent モジュールでは、処理の性質に応じて次の 2 つを使い分けます。

  • ThreadPoolExecutor 並行処理向けの実装で、通信やファイル操作などI/O 待ちが多い処理に適しています。処理を切り替えながら進めることで、待ち時間を無駄にしません。

  • ProcessPoolExecutor 並列処理向けの実装で、CPU 計算が重い処理に適しています。複数のプロセスを使い、CPU コアを並列に活用します。

このように concurrent モジュールは、「並行か、並列か」という違いを意識しながら正しく選択できる構造を提供している点が大きな特徴です。

ThreadPoolExecutor の基本(I/O 処理向け)

ThreadPoolExecutor は、ネットワーク通信やファイル操作などの I/O 待ち処理に適しています。複数スレッドで処理を分担し、待ち時間を有効活用します。

 1from concurrent.futures import ThreadPoolExecutor
 2import time
 3
 4def fetch_data(n):
 5    # Simulate an I/O-bound task
 6    time.sleep(1)
 7    return f"data-{n}"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    futures = [executor.submit(fetch_data, i) for i in range(5)]
11
12    for future in futures:
13        print(future.result())
  • この例では、1 秒待つ I/O 処理を複数並行して実行しています。submit によって関数呼び出しは非同期タスクとして登録され、result() を呼ぶことで処理の完了を待ちながら結果を取得できるため、待ち時間を有効活用した並行処理を簡潔に記述できます。

map を使ったシンプルな並行処理

複雑な制御が不要な場合は、map を使うとコードが簡潔になります。

 1from concurrent.futures import ThreadPoolExecutor
 2import time
 3
 4def fetch_data(n):
 5    # Simulate an I/O-bound task
 6    time.sleep(1)
 7    return f"data-{n}"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    results = executor.map(fetch_data, range(5))
11
12    for result in results:
13        print(result)
  • この例では、ThreadPoolExecutor.map を使って複数の I/O 処理を並行実行しています。map は入力の順序を保ったまま結果を返すため、逐次処理に近い書き方ができ、非同期処理を意識せずに並行化できる点が大きな特徴です。

ProcessPoolExecutor の基本(CPU 処理向け)

CPU をフルに使う重い計算では、スレッドではなく プロセス を使います。これにより GIL(グローバルインタプリタロック)の制限を回避できます。

 1from concurrent.futures import ProcessPoolExecutor
 2
 3def heavy_calculation(n):
 4    # Simulate a CPU-bound task
 5    total = 0
 6    for i in range(10_000_000):
 7        total += i * n
 8    return total
 9
10if __name__ == "__main__":
11    with ProcessPoolExecutor(max_workers=4) as executor:
12        results = executor.map(heavy_calculation, range(4))
13
14        for result in results:
15            print(result)

この例では、CPU を長時間使用する計算処理を ProcessPoolExecutor で並列実行しています。プロセスを生成する都合上、__main__ ガードが必須となり、これによって 複数の CPU コアを安全に活用した並列処理が可能になります。

as_completed を使った完了順処理

処理が終わった順に結果を扱いたい場合は as_completed が便利です。

 1from concurrent.futures import ThreadPoolExecutor, as_completed
 2import time
 3
 4def fetch_data(n):
 5    # Simulate an I/O-bound task
 6    time.sleep(1)
 7    return f"data-{n}"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    futures = [executor.submit(fetch_data, i) for i in range(5)]
11
12    for future in as_completed(futures):
13        print(future.result())
  • この例では、複数の非同期タスクを同時に実行し、完了したものから順に結果を取得しています。as_completed を使うことで、処理順に依存せず素早く結果を扱えるため、進捗表示や逐次処理が必要な場面に適しています。

例外処理の扱い方

concurrent では、例外は result() を呼んだタイミングで発生します。

 1from concurrent.futures import ThreadPoolExecutor
 2
 3def risky_task(n):
 4    # Simulate a task that may fail for a specific input
 5    if n == 3:
 6        raise ValueError("Something went wrong")
 7    return n * 2
 8
 9with ThreadPoolExecutor() as executor:
10    futures = [executor.submit(risky_task, i) for i in range(5)]
11
12    for future in futures:
13        try:
14            print(future.result())
15        except Exception as e:
16            print("Error:", e)
  • この例では、複数のタスクを並行実行しつつ、一部のタスクが例外を発生させても他のタスクの実行は継続され、結果取得時に個別に例外を処理できることを示しています。concurrentFuture を使うことで、非同期処理における成功、失敗を安全に扱える点が重要です。

スレッドとプロセスの使い分け指針

並行処理、並列処理を正しく使うためには、処理の性質に応じて手法を選ぶことが重要です。

実務では、次の基準を目安にすると判断しやすくなります。

  • 通信やファイル操作などの I/O 待ちが多い処理の場合は、ThreadPoolExecutor を使います。
  • CPU を使う重い計算処理の場合は、ProcessPoolExecutor を使います。
  • 処理の数が多く、内容が単純な場合は、map を使うとコードを簡潔に書けます。
  • 実行順や例外処理などの細かい制御が重要な場合は、submitas_completed を組み合わせて使います。

concurrent を使うメリット

concurrent モジュールを使うことで、非同期処理を安全かつ分かりやすく扱えるようになります。

主なメリットは次のとおりです。

  • 低レベルなスレッドやプロセス管理を意識する必要がありません。
  • Python の標準ライブラリで提供されており、安心して利用できます。
  • コードの見通しが良くなり、可読性や保守性が向上します。
  • 並行処理、並列処理を学ぶ最初のステップとして最適です。

この指針を意識するだけでも、concurrent を使った実装の失敗を大きく減らすことができます。

まとめ

concurrent モジュールは、Python で並行処理、並列処理を実用レベルで扱うための標準的な選択肢です。処理の中身を大きく変えずに性能を向上させられるため、実務においても非常に大きなメリットになります。concurrent を使うことで、実装を簡潔にしつつ、例外処理や実行管理を安全に制御できる非同期処理を実現できます。

YouTubeチャンネルでは、Visual Studio Codeを用いて上記の記事を見ながら確認できます。 ぜひYouTubeチャンネルもご覧ください。

YouTube Video