Python 中的 `threading` 模塊

Python 中的 `threading` 模塊

本文將介紹 Python 中的 threading 模塊。

YouTube Video

Python 中的 threading 模塊

Python 中的 threading 模塊是一個支持多線程編程的標準庫。使用線程允許多個進程同時運行,這可以提高程序性能,特別是在涉及 I/O 等待之類的阻塞操作的情況下。由於 Python 的全局解釋器鎖(GIL),多線程對於 CPU 密集型操作的有效性有限,但在處理 I/O 密集型操作時則表現良好。

以下部分將介紹如何使用 threading 模塊以及控制線程的基本操作。

線程的基本用法

創建並運行線程

要創建線程並執行並發處理,可以使用 threading.Thread 類。指定目標函數以創建線程並執行該線程。

 1import threading
 2import time
 3
 4# Function to be executed in a thread
 5def worker():
 6    print("Worker thread started")
 7    time.sleep(2)
 8    print("Worker thread finished")
 9
10# Create and start the thread
11thread = threading.Thread(target=worker)
12thread.start()
13
14# Processing in the main thread
15print("Main thread continues to run")
16
17# Wait for the thread to finish
18thread.join()
19print("Main thread finished")
  • 在此示例中,worker 函數在一個單獨的線程中執行,而主線程繼續運行。通過調用 join() 方法,主線程會等待子線程完成。

線程命名

給線程指定有意義的名稱能讓日誌和除錯更加容易。你可以通過 name 參數來指定名稱。

 1import threading
 2import time
 3
 4# Function to be executed in a thread
 5def worker():
 6    print("Worker thread started")
 7    time.sleep(2)
 8    print("Worker thread finished")
 9
10t = threading.Thread(
11    target=worker,
12    args=("named-worker", 0.3),
13    name="MyWorkerThread"
14)
15
16t.start()
17
18print("Active threads:", threading.active_count())
19for th in threading.enumerate():
20    print(" -", th.name)
21
22t.join()
  • threading.enumerate() 會返回目前線程的列表,這對於除錯以及監控狀態很有幫助。

繼承 Thread 類

如果您想自定義線程執行類,可以通過繼承 threading.Thread 類來定義一個新類。

 1import threading
 2import time
 3
 4# Inherit from the Thread class
 5class WorkerThread(threading.Thread):
 6    def __init__(self, name, delay, repeat=3):
 7        super().__init__(name=name)
 8        self.delay = delay
 9        self.repeat = repeat
10        self.results = []
11
12    def run(self):
13        for i in range(self.repeat):
14            msg = f"{self.name} step {i+1}"
15            print(msg)
16            self.results.append(msg)
17            time.sleep(self.delay)
18
19# Create and start the threads
20t1 = WorkerThread("Worker-A", delay=0.4, repeat=3)
21t2 = WorkerThread("Worker-B", delay=0.2, repeat=5)
22
23t1.start()
24t2.start()
25
26t1.join()
27t2.join()
28
29print("Results A:", t1.results)
30print("Results B:", t2.results)
  • 在這個範例中,通過覆寫 run() 方法來定義線程的行為,使每個線程都能擁有自己的資料。當線程需要執行複雜處理或你希望每個線程有自己的獨立資料時,這種做法十分有用。

線程之間的同步

當多個線程同時訪問共享資源時,可能會發生數據競爭。為了防止這種情況,threading 模塊提供了多種同步機制。

鎖(Lock

Lock 對象用於實現線程之間對資源的互斥控制。當一個線程鎖定一個資源時,其他線程無法訪問該資源。

 1import threading
 2
 3lock = threading.Lock()
 4shared_resource = 0
 5
 6def worker():
 7    global shared_resource
 8    with lock:  # Acquire the lock
 9        local_copy = shared_resource
10        local_copy += 1
11        shared_resource = local_copy
12
13threads = [threading.Thread(target=worker) for _ in range(5)]
14
15for t in threads:
16    t.start()
17
18for t in threads:
19    t.join()
20
21print(f"Final value of shared resource: {shared_resource}")
  • 在這個示例中,五個線程訪問共享資源,但通過使用 Lock 防止多個線程同時修改數據。

可重入鎖 (RLock)

如果同一個線程需要多次獲取同一把鎖,請使用 RLock(可重入鎖)。這對於遞迴調用或庫可能在不同調用中獲取鎖的情況很有用。

 1import threading
 2
 3rlock = threading.RLock()
 4shared = []
 5
 6def outer():
 7    with rlock:
 8        shared.append("outer")
 9        inner()
10
11def inner():
12    with rlock:
13        shared.append("inner")
14
15t = threading.Thread(target=outer)
16t.start()
17t.join()
18print(shared)
  • 使用 RLock 時,已持有鎖的線程可以再次獲取該鎖,有助於防止嵌套加鎖時發生死鎖。

條件(Condition

“Condition” 用於讓線程等待直到特定條件滿足。當一個執行緒滿足某個條件時,你可以呼叫 notify() 來通知另一個執行緒,或呼叫 notify_all() 來通知所有正在等待的執行緒。

以下是一個生產者與消費者使用 Condition 的範例。

 1import threading
 2
 3condition = threading.Condition()
 4shared_data = []
 5
 6def producer():
 7    with condition:
 8        shared_data.append(1)
 9        print("Produced an item")
10        condition.notify()  # Notify the consumer
11
12def consumer():
13    with condition:
14        condition.wait()  # Wait until the condition is met
15
16        item = shared_data.pop(0)
17        print(f"Consumed an item: {item}")
18
19# Create the threads
20producer_thread = threading.Thread(target=producer)
21consumer_thread = threading.Thread(target=consumer)
22
23consumer_thread.start()
24producer_thread.start()
25
26producer_thread.join()
27consumer_thread.join()
  • 這段程式碼利用 Condition,讓生產者在添加資料後通知,消費者在收到通知後再取出資料,從而實現同步。

線程守護(Daemonization)

當主線程結束時,守護線程會被強制終止。普通線程必須等待才能終止,而守護線程則會自動終止。

 1import threading
 2import time
 3
 4def worker():
 5    while True:
 6        print("Working...")
 7        time.sleep(1)
 8
 9# Create a daemon thread
10thread = threading.Thread(target=worker)
11thread.daemon = True  # Set as a daemon thread
12
13thread.start()
14
15# Processing in the main thread
16time.sleep(3)
17print("Main thread finished")
  • 在此示例中,worker 線程已被設置為守護線程,因此當主線程結束時它將被強制終止。

使用 ThreadPoolExecutor 進行線程管理

除了 threading 模塊,還可以使用 concurrent.futures 模塊中的 ThreadPoolExecutor 來管理線程池並并行執行任務。

 1from concurrent.futures import ThreadPoolExecutor
 2import time
 3
 4def worker(seconds):
 5    print(f"Sleeping for {seconds} second(s)")
 6    time.sleep(seconds)
 7    return f"Finished sleeping for {seconds} second(s)"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    futures = [executor.submit(worker, i) for i in range(1, 4)]
11    for future in futures:
12        print(future.result())
  • ThreadPoolExecutor 用於創建線程池並高效處理任務。可以通過 max_workers 指定同時運行的線程數量。

線程間的事件通信

使用 threading.Event,您可以在線程之間設置標誌以通知其他線程事件的發生。

 1import threading
 2import time
 3
 4event = threading.Event()
 5
 6def worker():
 7    print("Waiting for event to be set")
 8    event.wait()  # Wait until the event is set
 9
10    print("Event received, continuing work")
11
12thread = threading.Thread(target=worker)
13thread.start()
14
15time.sleep(2)
16print("Setting the event")
17event.set()  # Set the event and notify the thread
  • 這段程式碼展示了一種機制:工作線程會等待 Event 訊號,當主線程呼叫 event.set() 時,工作線程會恢復處理。

線程中的例外處理與終止

當線程中發生例外時,不會直接傳遞到主線程,因此需要一種方式來捕捉和共享例外資訊。

 1import threading
 2import queue
 3
 4def worker(err_q):
 5    try:
 6        raise ValueError("Something bad")
 7    except Exception as e:
 8        err_q.put(e)
 9
10q = queue.Queue()
11t = threading.Thread(target=worker, args=(q,))
12t.start()
13t.join()
14if not q.empty():
15    exc = q.get()
16    print("Worker raised:", exc)
  • 通過將例外放入 Queue 並在主執行緒中取出,你可以可靠地檢測到錯誤。如果你使用 concurrent.futures.ThreadPoolExecutor,可以透過 future.result() 重新拋出例外,使其更易於處理。

GIL(全域直譯器鎖)及其影響

由於 CPython 的 GIL(全域直譯器鎖) 機制,相同進程中的多個 Python 位元碼無法真正同時執行。對於需要大量 CPU 運算的任務,如複雜計算,建議使用 multiprocessing。而對於 I/O 密集型任務,例如檔案讀取或網路通信,則建議使用 threading

總結

使用 Python 的 threading 模塊,可以實現 多線程程序,並同時運行多個過程。通過使用像 LockCondition 這類同步機制,可以安全地訪問共享資源並執行復雜的同步操作。此外,使用守護線程或 ThreadPoolExecutor,線程管理與高效並行處理將更加方便。

您可以在我們的 YouTube 頻道上使用 Visual Studio Code 來跟隨上述文章一起學習。 請也查看我們的 YouTube 頻道。

YouTube Video