Python 中的 `multiprocessing` 模組

Python 中的 `multiprocessing` 模組

本文介紹了 Python 中的 multiprocessing 模組。

本文介紹如何使用 multiprocessing 模組編寫安全且高效的並行處理程式碼的實用技巧。

YouTube Video

Python 中的 multiprocessing 模組

基礎知識:為什麼要使用 multiprocessing

multiprocessing 允許以程序為單位進行並行處理,因此可以在不受 Python 全域直譯器鎖(GIL)限制的情況下,並行執行 CPU 密集型任務。對於 I/O 密集型任務,threadingasyncio 可能更簡單且更合適。

Process 的簡單用法

首先,以下是一個使用 Process 在獨立程序中執行函式的基本範例。這示範了如何啟動一個程序、等待其完成,並且傳遞參數。

 1# Explanation:
 2# This example starts a separate process to run `worker` which prints messages.
 3# It demonstrates starting, joining, and passing arguments.
 4
 5from multiprocessing import Process
 6import time
 7
 8def worker(name, delay):
 9    # English comment in code per user's preference
10    for i in range(3):
11        print(f"Worker {name}: iteration {i}")
12        time.sleep(delay)
13
14if __name__ == "__main__":
15    p = Process(target=worker, args=("A", 0.5))
16    p.start()
17    print("Main: waiting for worker to finish")
18    p.join()
19    print("Main: worker finished")
  • 這段程式碼展示主程序如何啟動子程序 worker,並用 join() 等待其完成的流程。可以用 args 傳遞參數。

使用 Pool 進行簡單的平行化處理(高階 API)

Pool.map 適用於將同一個函式應用於多個獨立任務的情境。它會自動在內部幫你管理工作程序。

 1# Explanation:
 2# Use Pool.map to parallelize a CPU-bound function across available processes.
 3# Good for "embarrassingly parallel" workloads.
 4
 5from multiprocessing import Pool, cpu_count
 6import math
 7import time
 8
 9def is_prime(n):
10    # Check primality (inefficient but CPU-heavy for demo)
11    if n < 2:
12        return False
13    for i in range(2, int(math.sqrt(n)) + 1):
14        if n % i == 0:
15            return False
16    return True
17
18if __name__ == "__main__":
19    nums = [10_000_000 + i for i in range(50)]
20    start = time.time()
21    with Pool(processes=cpu_count()) as pool:
22        results = pool.map(is_prime, nums)
23    end = time.time()
24    print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")
  • Pool 能自動控制工作程序數量,而 map 會照原本的順序返回結果。

程序間通訊:使用 Queue 的生產者/消費者模式

Queue 是一個**先進先出(FIFO)**的佇列,可以在進程間安全地傳遞物件。以下是一些典型的模式。

 1# Explanation:
 2# Demonstrates a producer putting items into a Queue
 3# and consumer reading them.
 4# This is useful for task pipelines between processes.
 5
 6from multiprocessing import Process, Queue
 7import time
 8import random
 9
10def producer(q, n):
11    for i in range(n):
12        item = f"item-{i}"
13        print("Producer: putting", item)
14        q.put(item)
15        time.sleep(random.random() * 0.5)
16    q.put(None)  # sentinel to signal consumer to stop
17
18def consumer(q):
19    while True:
20        item = q.get()
21        if item is None:
22            break
23        print("Consumer: got", item)
24        time.sleep(0.2)
25
26if __name__ == "__main__":
27    q = Queue()
28    p = Process(target=producer, args=(q, 5))
29    c = Process(target=consumer, args=(q,))
30    p.start()
31    c.start()
32    p.join()
33    c.join()
34    print("Main: done")
  • Queue 讓你可以在程序間安全地傳遞資料。通常會使用像是 None 這樣的特殊值來表示終止。

共用記憶體:ValueArray

當你需要在程序間共享小數據(如數字或小型陣列)時,可以使用 ValueArray。你需要使用鎖來避免衝突。

 1# Explanation:
 2# Use Value to share a single integer counter
 3# and Array for a small numeric array.
 4# Show how to use a Lock to avoid race conditions.
 5
 6from multiprocessing import Process, Value, Array, Lock
 7import time
 8
 9def increment(counter, lock, times):
10    for _ in range(times):
11        with lock:
12            counter.value += 1
13
14def update_array(arr):
15    for i in range(len(arr)):
16        arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19    lock = Lock()
20    counter = Value('i', 0)  # 'i' = signed int
21    shared_arr = Array('i', [0, 0, 0])
22
23    p1 = Process(target=increment, args=(counter, lock, 1000))
24    p2 = Process(target=increment, args=(counter, lock, 1000))
25    a = Process(target=update_array, args=(shared_arr,))
26
27    p1.start(); p2.start(); a.start()
28    p1.join(); p2.join(); a.join()
29
30    print("Counter:", counter.value)
31    print("Array:", list(shared_arr))
  • ValueArray 是用底層(C 語言層級)的共用記憶體機制而非 Python 本身來實現跨程序共享資料。因此,適合用於快速讀寫少量數據,但不適用於處理大量數據。

進階共享:利用 Manager 共用物件(如 dict、list)

如果需要更靈活地共用列表或字典等物件,可以使用 Manager()

 1# Explanation:
 2# Manager provides proxy objects like dict/list
 3# that can be shared across processes.
 4# Good for moderate-size shared state and easier programming model.
 5
 6from multiprocessing import Process, Manager
 7import time
 8
 9def worker(shared_dict, key, value):
10    shared_dict[key] = value
11
12if __name__ == "__main__":
13    with Manager() as manager:
14        d = manager.dict()
15        processes = []
16        for i in range(5):
17            p = Process(target=worker, args=(d, f"k{i}", i*i))
18            p.start()
19            processes.append(p)
20        for p in processes:
21            p.join()
22        print("Shared dict:", dict(d))
  • Manager 方便於共享字典和列表,但每次存取都會在程序間傳送資料,並需要進行 pickle 轉換。因此,頻繁更新大量資料會導致處理速度變慢。

同步機制:如何使用 LockSemaphore

請使用 LockSemaphore 來控制對共用資源的並發存取。可以搭配 with 語法簡潔地使用鎖。

 1# Explanation:
 2# Demonstrates using Lock to prevent simultaneous access to a critical section.
 3# Locks are necessary when shared resources are not atomic.
 4
 5from multiprocessing import Process, Lock, Value
 6
 7def safe_add(counter, lock):
 8    for _ in range(10000):
 9        with lock:
10            counter.value += 1
11
12if __name__ == "__main__":
13    lock = Lock()
14    counter = Value('i', 0)
15    p1 = Process(target=safe_add, args=(counter, lock))
16    p2 = Process(target=safe_add, args=(counter, lock))
17    p1.start(); p2.start()
18    p1.join(); p2.join()
19    print("Counter:", counter.value)
  • 鎖可以防止資料競爭,但如果加鎖的區域過大,並行處理的效能會降低。只有必要的部分應該作為臨界區來保護。

UNIX 的 fork 和 Windows 行為的差異

在 UNIX 系統上,默認使用 fork 複製程序,可以實現高效的寫時複製(copy-on-write)。Windows 使用 spawn 啟動程序(會重新導入模組),因此必須特別注意入口點保護以及全域初始化。

 1# Explanation: Check start method (fork/spawn) and set it if needed.
 2# Useful for debugging platform-dependent behavior.
 3
 4from multiprocessing import get_start_method, set_start_method
 5
 6if __name__ == "__main__":
 7    print("Start method:", get_start_method())
 8
 9    # uncomment to force spawn on Unix for testing
10    # set_start_method('spawn')
  • set_start_method 必須在程式一開始時調用一次,不能多次設定。建議不要在函式庫內隨意更改這個設定,比較安全。

實務範例:CPU 密集型任務的效能比較

以下是一個腳本,簡單比較使用 multiprocessing 進行平行處理能提升多少處理速度。這裡我們使用了 Pool

 1# Explanation:
 2# Compare sequential vs parallel execution times for CPU-bound task.
 3# Helps understand speedup and overhead.
 4
 5import time
 6from multiprocessing import Pool, cpu_count
 7import math
 8
 9def heavy_task(n):
10    s = 0
11    for i in range(1, n):
12        s += math.sqrt(i)
13    return s
14
15def run_sequential(nums):
16    return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19    with Pool(processes=cpu_count()) as p:
20        return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23    nums = [2000000] * 8  # heavy tasks
24    t0 = time.time()
25    run_sequential(nums)
26    seq = time.time() - t0
27    t1 = time.time()
28    run_parallel(nums)
29    par = time.time() - t1
30    print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")
  • 此例子展示了根據任務負載及程序數量,有時候因資源開銷反而無法提升效能。任務越大且越獨立,從並行化中獲得的效益就越明顯。

重要的基本規則

以下是安全且高效地使用 multiprocessing 的基本要點。

  • 在 Windows 上,每次新建子程序時會重新導入模組,所以必須用 if __name__ == "__main__": 保護腳本的入口點。
  • 跨進程通訊會進行序列化(使用 pickle 轉換),因此傳送大型物件會變得昂貴。
  • multiprocessing 會建立多個程序,一般建議根據 multiprocessing.cpu_count() 來決定程序數量。
  • 在 worker 中建立另一個 Pool 會增加複雜性,因此應盡量避免巢狀使用 Pool 實例。
  • 由於主進程難以偵測到子進程發生的異常,所以必須明確實作日誌紀錄和錯誤處理。
  • 程序數請依照 CPU 核心數設定,I/O 密集型任務建議考慮採用多線程。

實務設計建議

以下是設計並行處理時一些有用的概念和模式。

  • 將流程分為不同角色,如輸入讀取(I/O)、預處理(多 CPU)、彙總(序列化)等做「管線化(pipelining)」設計,效率更高。
  • 為簡化除錯,建議先在單程序中確認運作再做平行化。
  • 紀錄 log 時建議各個程序各自寫檔(例如檔名含 PID),以便追蹤問題。
  • 設計重試(retry)與逾時(timeout)機制,以確保即使程序卡住也能安全復原。

總結(立即可用的重點規則)

並行處理非常強大,但正確判斷任務性質、數據量及程序間通訊成本十分重要。multiprocessing 對 CPU 密集處理很有效,但設計不良或同步錯誤會嚴重影響效能。只要遵循基本規則與設計模式,就能實作安全且高效的並行程式。

您可以在我們的 YouTube 頻道上使用 Visual Studio Code 來跟隨上述文章一起學習。 請也查看我們的 YouTube 頻道。

YouTube Video