Pythonにおける`multiprocessing`モジュール
この記事ではPythonにおけるmultiprocessingモジュールについて説明します。
multiprocessingモジュールを使って並列処理を安全かつ効率的に書くための実践的なポイントを紹介します。
YouTube Video
Pythonにおけるmultiprocessingモジュール
基本:なぜ multiprocessing を使うか
multiprocessing はプロセス単位で並列化するため、Python の GIL(グローバル・インタプリタ・ロック)による制限を受けずに CPU バウンドな仕事を並列化できます。I/O バウンドの処理では threading や asyncio の方が簡潔な場合もあります。
シンプルな Process の使い方
まずは Process を使って別プロセスで関数を動かす基本例です。プロセスの開始、終了待ち、引数渡しがわかります。
1# Explanation:
2# This example starts a separate process to run `worker` which prints messages.
3# It demonstrates starting, joining, and passing arguments.
4
5from multiprocessing import Process
6import time
7
8def worker(name, delay):
9 # English comment in code per user's preference
10 for i in range(3):
11 print(f"Worker {name}: iteration {i}")
12 time.sleep(delay)
13
14if __name__ == "__main__":
15 p = Process(target=worker, args=("A", 0.5))
16 p.start()
17 print("Main: waiting for worker to finish")
18 p.join()
19 print("Main: worker finished")- このコードは、メインプロセスが子プロセス
workerを起動してjoin()で完了を待つ流れを示します。argsで引数を渡せます。
Pool を使った簡単な並列化(高レベルAPI)
複数の独立したタスクに同じ関数を適用するなら Pool.map が便利です。内部でワーカープロセスを管理してくれます。
1# Explanation:
2# Use Pool.map to parallelize a CPU-bound function across available processes.
3# Good for "embarrassingly parallel" workloads.
4
5from multiprocessing import Pool, cpu_count
6import math
7import time
8
9def is_prime(n):
10 # Check primality (inefficient but CPU-heavy for demo)
11 if n < 2:
12 return False
13 for i in range(2, int(math.sqrt(n)) + 1):
14 if n % i == 0:
15 return False
16 return True
17
18if __name__ == "__main__":
19 nums = [10_000_000 + i for i in range(50)]
20 start = time.time()
21 with Pool(processes=cpu_count()) as pool:
22 results = pool.map(is_prime, nums)
23 end = time.time()
24 print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")Poolはワーカーの数を自動で制御でき、mapは結果を元の順序で返します。
プロセス間通信:Queue を使ったプロデューサー/コンシューマー
Queue は、プロセス間でオブジェクトを安全にやり取りできる FIFO(先入れ先出し) のキューです。以下に典型的なパターンを示します。
1# Explanation:
2# Demonstrates a producer putting items into a Queue
3# and consumer reading them.
4# This is useful for task pipelines between processes.
5
6from multiprocessing import Process, Queue
7import time
8import random
9
10def producer(q, n):
11 for i in range(n):
12 item = f"item-{i}"
13 print("Producer: putting", item)
14 q.put(item)
15 time.sleep(random.random() * 0.5)
16 q.put(None) # sentinel to signal consumer to stop
17
18def consumer(q):
19 while True:
20 item = q.get()
21 if item is None:
22 break
23 print("Consumer: got", item)
24 time.sleep(0.2)
25
26if __name__ == "__main__":
27 q = Queue()
28 p = Process(target=producer, args=(q, 5))
29 c = Process(target=consumer, args=(q,))
30 p.start()
31 c.start()
32 p.join()
33 c.join()
34 print("Main: done")Queueはプロセス間で安全にデータを渡せます。終了時にNoneのような特別な値を使って終了を伝えるのが一般的です。
共有メモリ:Value と Array
プロセス間で小さな数値や配列を共有したい場合は Value と Array が使えます。ロックを使って競合を避ける必要があります。
1# Explanation:
2# Use Value to share a single integer counter
3# and Array for a small numeric array.
4# Show how to use a Lock to avoid race conditions.
5
6from multiprocessing import Process, Value, Array, Lock
7import time
8
9def increment(counter, lock, times):
10 for _ in range(times):
11 with lock:
12 counter.value += 1
13
14def update_array(arr):
15 for i in range(len(arr)):
16 arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19 lock = Lock()
20 counter = Value('i', 0) # 'i' = signed int
21 shared_arr = Array('i', [0, 0, 0])
22
23 p1 = Process(target=increment, args=(counter, lock, 1000))
24 p2 = Process(target=increment, args=(counter, lock, 1000))
25 a = Process(target=update_array, args=(shared_arr,))
26
27 p1.start(); p2.start(); a.start()
28 p1.join(); p2.join(); a.join()
29
30 print("Counter:", counter.value)
31 print("Array:", list(shared_arr))ValueやArrayは、Python ではなく より低い層の仕組み(C言語レベルの共有メモリ) を使ってプロセス間でデータを共有します。そのため、小さなデータを高速に読み書きするのに向いていますが、大きなデータを扱うのには適していません。
高度な共有:Manager を使った共有オブジェクト(辞書やリスト)
リストや辞書のようなもっと柔軟な共有オブジェクトを使いたい場合は Manager() を用います。
1# Explanation:
2# Manager provides proxy objects like dict/list
3# that can be shared across processes.
4# Good for moderate-size shared state and easier programming model.
5
6from multiprocessing import Process, Manager
7import time
8
9def worker(shared_dict, key, value):
10 shared_dict[key] = value
11
12if __name__ == "__main__":
13 with Manager() as manager:
14 d = manager.dict()
15 processes = []
16 for i in range(5):
17 p = Process(target=worker, args=(d, f"k{i}", i*i))
18 p.start()
19 processes.append(p)
20 for p in processes:
21 p.join()
22 print("Shared dict:", dict(d))Managerは辞書やリストを共有できて便利ですが、アクセスのたびにデータがプロセス間で送られ、pickle変換 が発生します。そのため、頻繁に大きなデータを更新すると処理が遅くなります。
同期機構:Lock と Semaphore の使い方
共有リソースへの同時アクセスを制御するには Lock や Semaphore を使います。with文で簡潔に使えます。
1# Explanation:
2# Demonstrates using Lock to prevent simultaneous access to a critical section.
3# Locks are necessary when shared resources are not atomic.
4
5from multiprocessing import Process, Lock, Value
6
7def safe_add(counter, lock):
8 for _ in range(10000):
9 with lock:
10 counter.value += 1
11
12if __name__ == "__main__":
13 lock = Lock()
14 counter = Value('i', 0)
15 p1 = Process(target=safe_add, args=(counter, lock))
16 p2 = Process(target=safe_add, args=(counter, lock))
17 p1.start(); p2.start()
18 p1.join(); p2.join()
19 print("Counter:", counter.value)- ロックはデータ競合を防ぎますが、ロック範囲が大きすぎると並列処理の性能が低下します。必要な部分だけをクリティカルセクションとして保護するようにします。
UNIX向けの fork と Windows の挙動の違い
UNIX 系ではデフォルトで fork を使いプロセスを複製するためメモリのコピーオンライトが効き効率的です。Windows は spawn(モジュール再インポート)で起動するためエントリーポイント保護やグローバル初期化の扱いに注意が必要です。
1# Explanation: Check start method (fork/spawn) and set it if needed.
2# Useful for debugging platform-dependent behavior.
3
4from multiprocessing import get_start_method, set_start_method
5
6if __name__ == "__main__":
7 print("Start method:", get_start_method())
8
9 # uncomment to force spawn on Unix for testing
10 # set_start_method('spawn')set_start_methodはプログラム開始時に一度だけ呼べます。ライブラリでは勝手に変更しない方が安全です。
実用例:CPUバウンド作業のベンチマーク(比較)
以下は、multiprocessing を使った並列化でどれだけ速くなるかを簡単に比較するスクリプトです。ここでは Pool を使います。
1# Explanation:
2# Compare sequential vs parallel execution times for CPU-bound task.
3# Helps understand speedup and overhead.
4
5import time
6from multiprocessing import Pool, cpu_count
7import math
8
9def heavy_task(n):
10 s = 0
11 for i in range(1, n):
12 s += math.sqrt(i)
13 return s
14
15def run_sequential(nums):
16 return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19 with Pool(processes=cpu_count()) as p:
20 return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23 nums = [2000000] * 8 # heavy tasks
24 t0 = time.time()
25 run_sequential(nums)
26 seq = time.time() - t0
27 t1 = time.time()
28 run_parallel(nums)
29 par = time.time() - t1
30 print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")- この例は、タスクの重さとプロセスの数によっては並列化が効かない(オーバーヘッドが勝る)ケースもあることを示します。タスクは“重め”で独立しているほど恩恵が大きいです。
重要な基本ルール
以下は、multiprocessing を安全かつ効率的に使うための基本的なポイントです。
- Windows では子プロセス起動時にモジュールが再インポートされるため、スクリプトのエントリポイントを
if __name__ == "__main__":で保護する必要があります。 - プロセス間通信はシリアライズ(
pickle変換)されるため、大きなオブジェクトのやり取りはコストが高くなります。 multiprocessingはプロセスを生成するため、プロセス数はmultiprocessing.cpu_count()を参考に決めるのが一般的です。- ワーカー内部でさらに
Poolを作ると複雑になるため、できる限りPoolをネストしないようにします。 - 子プロセスで例外が起きても、メインプロセスでは気づきにくいため、ログ出力やエラーハンドリングを明確に実装しておく必要があります。
- プロセス数は CPU に合わせ、I/O バウンドの場合はスレッドの利用を検討します。
実務での設計アドバイス
以下には、並列処理を設計するときに役立つ考え方やパターンをまとめています。
- 「パイプライン化」を行い、入力読み込み(I/O)、前処理(複数CPU)、集計(シリアル)といった役割にプロセスを分けると効率的です。
- デバッグを容易にするため、まずは単一プロセスで動作を確認してから並列化します。
- ロギングについては、各プロセスでログの出力先を分け(ファイル名に PID を含めるなど)、問題の切り分けをしやすくします。
- リトライやタイムアウト処理を用意し、プロセスがハングした場合にも安全に復旧できるようにします。
まとめ(すぐ役立つ要点)
並列処理は強力ですが、タスクの性質やデータの大きさ、プロセス間通信のコストを正しく判断することが重要です。multiprocessing は CPU バウンド処理で効果を発揮する一方、設計や同期を誤ると性能低下につながります。基本ルールとパターンを押さえておけば、安全で効率的な並列プログラムを構築できます。
YouTubeチャンネルでは、Visual Studio Codeを用いて上記の記事を見ながら確認できます。 ぜひYouTubeチャンネルもご覧ください。