Python中的 `concurrent` 模組
在本文中,我們將說明 Python 中的 concurrent 模組。
我們將在釐清並行與平行處理概念的同時,通過實際範例說明如何利用 concurrent 模組實現非同步處理。
YouTube Video
Python中的 concurrent 模組
在加快 Python 處理速度時,留意並行與平行處理之間的差異是非常重要的。concurrent 模組是一種重要方式,能在意識到這些差異的同時,安全且輕鬆地處理非同步流程。
並行 (Concurrency) 與平行 (Parallelism) 的差異
-
並行指的是設計流程讓多個任務能夠以小單元的方式相互切換進行。 即使任務並未同時執行,也可以善用「等待時間」來提升整體流程效率。
-
平行處理則是指實際上同時執行多個任務的機制。 通過活用多個CPU核心,能同時推進多項處理。
兩者都是加快處理速度的技術,但並行是「如何進行」的設計問題,而平行則是「如何執行」的問題,兩者本質上是不同的。
concurrent 模組是什麼?
concurrent 是 Python 的標準函式庫,提供了可以安全且簡單地處理並行與平行的高階 API。它的設計讓你能專注於「執行任務」,而不必費心於底層的執行緒或程序的建立與管理。
ThreadPoolExecutor 與 ProcessPoolExecutor 的角色
concurrent 模組根據任務性質,主要提供兩種選擇。
-
ThreadPoolExecutor這適合進行並行實作,特別是有大量 I/O 等待的任務,比如網路或檔案操作。透過在多個任務之間切換,可以有效利用等待時間。 -
ProcessPoolExecutor這種實現目標是平行運算,尤其適用於 CPU 密集型任務。它利用多個程序,能充分發揮可用 CPU 核心進行平行處理。
因此,concurrent 模組的一大特點是它提供了可根據需求正確選擇並行或平行的架構。
ThreadPoolExecutor 基礎(適合 I/O 任務)
ThreadPoolExecutor 適用於I/O 密集型的任務,例如網路通訊及檔案操作。它將任務分配到多個執行緒,能有效利用等待時間。
1from concurrent.futures import ThreadPoolExecutor
2import time
3
4def fetch_data(n):
5 # Simulate an I/O-bound task
6 time.sleep(1)
7 return f"data-{n}"
8
9with ThreadPoolExecutor(max_workers=3) as executor:
10 futures = [executor.submit(fetch_data, i) for i in range(5)]
11
12 for future in futures:
13 print(future.result())- 在此範例中,會同時並行地執行多個需等待一秒的 I/O 任務。利用
submit能將函數呼叫註冊為非同步任務,通過result()等待其完成並獲得結果,實現簡潔而有效利用等待時間的並行處理。
利用 map 的簡易並行處理
如果不需要複雜控制,利用 map 方法可以讓程式碼更簡潔。
1from concurrent.futures import ThreadPoolExecutor
2import time
3
4def fetch_data(n):
5 # Simulate an I/O-bound task
6 time.sleep(1)
7 return f"data-{n}"
8
9with ThreadPoolExecutor(max_workers=3) as executor:
10 results = executor.map(fetch_data, range(5))
11
12 for result in results:
13 print(result)- 此範例中,會利用
ThreadPoolExecutor.map同時並行地執行多個 I/O 任務。由於map會按輸入順序返回結果,所以可寫出與順序處理接近的程式,同時無需擔心非同步實現就能進行並行運算,這是一大優點。
ProcessPoolExecutor 基礎(適合 CPU 密集型任務)
對於完全活用 CPU 的重度計算任務,應選用程序而非執行緒。如此可避免全域直譯器鎖(GIL)的限制。
1from concurrent.futures import ProcessPoolExecutor
2
3def heavy_calculation(n):
4 # Simulate a CPU-bound task
5 total = 0
6 for i in range(10_000_000):
7 total += i * n
8 return total
9
10if __name__ == "__main__":
11 with ProcessPoolExecutor(max_workers=4) as executor:
12 results = executor.map(heavy_calculation, range(4))
13
14 for result in results:
15 print(result)在此範例中,利用 ProcessPoolExecutor 來平行處理 CPU 密集型運算。由於會建立新程序,所以需加上 __main__ 防護,確保能安全利用多核心進行平行運算。
以完成順序處理(利用 as_completed)
當你希望按照任務完成順序來處理結果時,as_completed 非常有用。
1from concurrent.futures import ThreadPoolExecutor, as_completed
2import time
3
4def fetch_data(n):
5 # Simulate an I/O-bound task
6 time.sleep(1)
7 return f"data-{n}"
8
9with ThreadPoolExecutor(max_workers=3) as executor:
10 futures = [executor.submit(fetch_data, i) for i in range(5)]
11
12 for future in as_completed(futures):
13 print(future.result())- 此範例會同時執行多個非同步任務,並依各自完成順序獲取結果。利用
as_completed可以不受任務排序影響,及時處理已完成任務,非常適合用於進度顯示或需要依完成順序處理的場合。
例外狀況的處理
在 concurrent 中,當你呼叫 result() 時會拋出例外。
1from concurrent.futures import ThreadPoolExecutor
2
3def risky_task(n):
4 # Simulate a task that may fail for a specific input
5 if n == 3:
6 raise ValueError("Something went wrong")
7 return n * 2
8
9with ThreadPoolExecutor() as executor:
10 futures = [executor.submit(risky_task, i) for i in range(5)]
11
12 for future in futures:
13 try:
14 print(future.result())
15 except Exception as e:
16 print("Error:", e)- 此範例展示即使部分任務發生例外,其他任務仍會繼續執行,你也可在獲取每個結果時分別處理例外。透過
concurrent的Future,可安全地處理非同步任務的成功與失敗,這點非常重要。
選擇執行緒與程序的指引
要有效運用並行與平行,根據任務性質選擇正確方式非常關鍵。
實際上,可根據以下標準進行選擇。
- 對於具有大量 I/O 等待的程序,例如通訊或檔案操作,請使用
ThreadPoolExecutor。 - 若是 CPU 密集型、需要大量運算的任務,則選用
ProcessPoolExecutor。 - 多數簡單任務可利用
map讓程式碼更精簡。 - 若需要精確控制執行順序或例外處理,請結合
submit與as_completed使用。
使用 concurrent 的優點
利用 concurrent 模組,你可以安全且直觀地處理非同步流程。
其主要優點如下:。
- 你不必煩心於底層執行緒或程序的管理。
- 它是 Python 標準函式庫的一部分,可以安心使用。
- 可讓程式碼更容易閱讀與維護。
- 非常適合作為學習並行與平行的入門工具。
只要記住這些指引,就能大幅減少使用 concurrent 時的實作錯誤。
總結
concurrent 模組是在 Python 中實現並行與平行的標準選擇。它能在不須大幅更改處理內容的情況下提升性能,這在實務上是非常重要的優點。利用 concurrent,你可以簡明實現非同步處理,並安全地管理例外狀況處理和執行控制。
您可以在我們的 YouTube 頻道上使用 Visual Studio Code 來跟隨上述文章一起學習。 請也查看我們的 YouTube 頻道。