Python中的 `concurrent` 模組

Python中的 `concurrent` 模組

在本文中,我們將說明 Python 中的 concurrent 模組。

我們將在釐清並行與平行處理概念的同時,通過實際範例說明如何利用 concurrent 模組實現非同步處理。

YouTube Video

Python中的 concurrent 模組

在加快 Python 處理速度時,留意並行與平行處理之間的差異是非常重要的。concurrent 模組是一種重要方式,能在意識到這些差異的同時,安全且輕鬆地處理非同步流程

並行 (Concurrency) 與平行 (Parallelism) 的差異

  • 並行指的是設計流程讓多個任務能夠以小單元的方式相互切換進行。 即使任務並未同時執行,也可以善用「等待時間」來提升整體流程效率。

  • 平行處理則是指實際上同時執行多個任務的機制。 通過活用多個CPU核心,能同時推進多項處理。

兩者都是加快處理速度的技術,但並行是「如何進行」的設計問題,而平行則是「如何執行」的問題,兩者本質上是不同的。

concurrent 模組是什麼?

concurrent 是 Python 的標準函式庫,提供了可以安全且簡單地處理並行與平行的高階 API。它的設計讓你能專注於「執行任務」,而不必費心於底層的執行緒或程序的建立與管理。

ThreadPoolExecutorProcessPoolExecutor 的角色

concurrent 模組根據任務性質,主要提供兩種選擇。

  • ThreadPoolExecutor 這適合進行並行實作,特別是有大量 I/O 等待的任務,比如網路或檔案操作。透過在多個任務之間切換,可以有效利用等待時間。

  • ProcessPoolExecutor 這種實現目標是平行運算,尤其適用於 CPU 密集型任務。它利用多個程序,能充分發揮可用 CPU 核心進行平行處理。

因此,concurrent 模組的一大特點是它提供了可根據需求正確選擇並行或平行的架構。

ThreadPoolExecutor 基礎(適合 I/O 任務)

ThreadPoolExecutor 適用於I/O 密集型的任務,例如網路通訊及檔案操作。它將任務分配到多個執行緒,能有效利用等待時間。

 1from concurrent.futures import ThreadPoolExecutor
 2import time
 3
 4def fetch_data(n):
 5    # Simulate an I/O-bound task
 6    time.sleep(1)
 7    return f"data-{n}"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    futures = [executor.submit(fetch_data, i) for i in range(5)]
11
12    for future in futures:
13        print(future.result())
  • 在此範例中,會同時並行地執行多個需等待一秒的 I/O 任務。利用 submit 能將函數呼叫註冊為非同步任務,通過 result() 等待其完成並獲得結果,實現簡潔而有效利用等待時間的並行處理

利用 map 的簡易並行處理

如果不需要複雜控制,利用 map 方法可以讓程式碼更簡潔。

 1from concurrent.futures import ThreadPoolExecutor
 2import time
 3
 4def fetch_data(n):
 5    # Simulate an I/O-bound task
 6    time.sleep(1)
 7    return f"data-{n}"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    results = executor.map(fetch_data, range(5))
11
12    for result in results:
13        print(result)
  • 此範例中,會利用 ThreadPoolExecutor.map 同時並行地執行多個 I/O 任務。由於 map 會按輸入順序返回結果,所以可寫出與順序處理接近的程式,同時無需擔心非同步實現就能進行並行運算,這是一大優點。

ProcessPoolExecutor 基礎(適合 CPU 密集型任務)

對於完全活用 CPU 的重度計算任務,應選用程序而非執行緒。如此可避免全域直譯器鎖(GIL)的限制。

 1from concurrent.futures import ProcessPoolExecutor
 2
 3def heavy_calculation(n):
 4    # Simulate a CPU-bound task
 5    total = 0
 6    for i in range(10_000_000):
 7        total += i * n
 8    return total
 9
10if __name__ == "__main__":
11    with ProcessPoolExecutor(max_workers=4) as executor:
12        results = executor.map(heavy_calculation, range(4))
13
14        for result in results:
15            print(result)

在此範例中,利用 ProcessPoolExecutor 來平行處理 CPU 密集型運算。由於會建立新程序,所以需加上 __main__ 防護,確保能安全利用多核心進行平行運算

以完成順序處理(利用 as_completed

當你希望按照任務完成順序來處理結果時,as_completed 非常有用。

 1from concurrent.futures import ThreadPoolExecutor, as_completed
 2import time
 3
 4def fetch_data(n):
 5    # Simulate an I/O-bound task
 6    time.sleep(1)
 7    return f"data-{n}"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    futures = [executor.submit(fetch_data, i) for i in range(5)]
11
12    for future in as_completed(futures):
13        print(future.result())
  • 此範例會同時執行多個非同步任務,並依各自完成順序獲取結果。利用 as_completed 可以不受任務排序影響,及時處理已完成任務,非常適合用於進度顯示或需要依完成順序處理的場合。

例外狀況的處理

concurrent 中,當你呼叫 result() 時會拋出例外

 1from concurrent.futures import ThreadPoolExecutor
 2
 3def risky_task(n):
 4    # Simulate a task that may fail for a specific input
 5    if n == 3:
 6        raise ValueError("Something went wrong")
 7    return n * 2
 8
 9with ThreadPoolExecutor() as executor:
10    futures = [executor.submit(risky_task, i) for i in range(5)]
11
12    for future in futures:
13        try:
14            print(future.result())
15        except Exception as e:
16            print("Error:", e)
  • 此範例展示即使部分任務發生例外,其他任務仍會繼續執行,你也可在獲取每個結果時分別處理例外。透過 concurrentFuture,可安全地處理非同步任務的成功與失敗,這點非常重要。

選擇執行緒與程序的指引

要有效運用並行與平行,根據任務性質選擇正確方式非常關鍵

實際上,可根據以下標準進行選擇。

  • 對於具有大量 I/O 等待的程序,例如通訊或檔案操作,請使用 ThreadPoolExecutor
  • 若是 CPU 密集型、需要大量運算的任務,則選用 ProcessPoolExecutor
  • 多數簡單任務可利用 map 讓程式碼更精簡。
  • 若需要精確控制執行順序或例外處理,請結合 submitas_completed 使用。

使用 concurrent 的優點

利用 concurrent 模組,你可以安全且直觀地處理非同步流程。

其主要優點如下:。

  • 你不必煩心於底層執行緒或程序的管理。
  • 它是 Python 標準函式庫的一部分,可以安心使用。
  • 可讓程式碼更容易閱讀與維護。
  • 非常適合作為學習並行與平行的入門工具。

只要記住這些指引,就能大幅減少使用 concurrent 時的實作錯誤。

總結

concurrent 模組是在 Python 中實現並行與平行的標準選擇。它能在不須大幅更改處理內容的情況下提升性能,這在實務上是非常重要的優點。利用 concurrent,你可以簡明實現非同步處理,並安全地管理例外狀況處理和執行控制

您可以在我們的 YouTube 頻道上使用 Visual Studio Code 來跟隨上述文章一起學習。 請也查看我們的 YouTube 頻道。

YouTube Video