Python 中的 `threading` 模塊
本文將介紹 Python 中的 threading 模塊。
YouTube Video
Python 中的 threading 模塊
Python 中的 threading 模塊是一個支持多線程編程的標準庫。使用線程允許多個進程同時運行,這可以提高程序性能,特別是在涉及 I/O 等待之類的阻塞操作的情況下。由於 Python 的全局解釋器鎖(GIL),多線程對於 CPU 密集型操作的有效性有限,但在處理 I/O 密集型操作時則表現良好。
以下部分將介紹如何使用 threading 模塊以及控制線程的基本操作。
線程的基本用法
創建並運行線程
要創建線程並執行並發處理,可以使用 threading.Thread 類。指定目標函數以創建線程並執行該線程。
1import threading
2import time
3
4# Function to be executed in a thread
5def worker():
6 print("Worker thread started")
7 time.sleep(2)
8 print("Worker thread finished")
9
10# Create and start the thread
11thread = threading.Thread(target=worker)
12thread.start()
13
14# Processing in the main thread
15print("Main thread continues to run")
16
17# Wait for the thread to finish
18thread.join()
19print("Main thread finished")- 在此示例中,
worker函數在一個單獨的線程中執行,而主線程繼續運行。通過調用join()方法,主線程會等待子線程完成。
線程命名
給線程指定有意義的名稱能讓日誌和除錯更加容易。你可以通過 name 參數來指定名稱。
1import threading
2import time
3
4# Function to be executed in a thread
5def worker():
6 print("Worker thread started")
7 time.sleep(2)
8 print("Worker thread finished")
9
10t = threading.Thread(
11 target=worker,
12 args=("named-worker", 0.3),
13 name="MyWorkerThread"
14)
15
16t.start()
17
18print("Active threads:", threading.active_count())
19for th in threading.enumerate():
20 print(" -", th.name)
21
22t.join()threading.enumerate()會返回目前線程的列表,這對於除錯以及監控狀態很有幫助。
繼承 Thread 類
如果您想自定義線程執行類,可以通過繼承 threading.Thread 類來定義一個新類。
1import threading
2import time
3
4# Inherit from the Thread class
5class WorkerThread(threading.Thread):
6 def __init__(self, name, delay, repeat=3):
7 super().__init__(name=name)
8 self.delay = delay
9 self.repeat = repeat
10 self.results = []
11
12 def run(self):
13 for i in range(self.repeat):
14 msg = f"{self.name} step {i+1}"
15 print(msg)
16 self.results.append(msg)
17 time.sleep(self.delay)
18
19# Create and start the threads
20t1 = WorkerThread("Worker-A", delay=0.4, repeat=3)
21t2 = WorkerThread("Worker-B", delay=0.2, repeat=5)
22
23t1.start()
24t2.start()
25
26t1.join()
27t2.join()
28
29print("Results A:", t1.results)
30print("Results B:", t2.results)- 在這個範例中,通過覆寫
run()方法來定義線程的行為,使每個線程都能擁有自己的資料。當線程需要執行複雜處理或你希望每個線程有自己的獨立資料時,這種做法十分有用。
線程之間的同步
當多個線程同時訪問共享資源時,可能會發生數據競爭。為了防止這種情況,threading 模塊提供了多種同步機制。
鎖(Lock)
Lock 對象用於實現線程之間對資源的互斥控制。當一個線程鎖定一個資源時,其他線程無法訪問該資源。
1import threading
2
3lock = threading.Lock()
4shared_resource = 0
5
6def worker():
7 global shared_resource
8 with lock: # Acquire the lock
9 local_copy = shared_resource
10 local_copy += 1
11 shared_resource = local_copy
12
13threads = [threading.Thread(target=worker) for _ in range(5)]
14
15for t in threads:
16 t.start()
17
18for t in threads:
19 t.join()
20
21print(f"Final value of shared resource: {shared_resource}")- 在這個示例中,五個線程訪問共享資源,但通過使用
Lock防止多個線程同時修改數據。
可重入鎖 (RLock)
如果同一個線程需要多次獲取同一把鎖,請使用 RLock(可重入鎖)。這對於遞迴調用或庫可能在不同調用中獲取鎖的情況很有用。
1import threading
2
3rlock = threading.RLock()
4shared = []
5
6def outer():
7 with rlock:
8 shared.append("outer")
9 inner()
10
11def inner():
12 with rlock:
13 shared.append("inner")
14
15t = threading.Thread(target=outer)
16t.start()
17t.join()
18print(shared)- 使用
RLock時,已持有鎖的線程可以再次獲取該鎖,有助於防止嵌套加鎖時發生死鎖。
條件(Condition)
“Condition” 用於讓線程等待直到特定條件滿足。當一個執行緒滿足某個條件時,你可以呼叫 notify() 來通知另一個執行緒,或呼叫 notify_all() 來通知所有正在等待的執行緒。
以下是一個生產者與消費者使用 Condition 的範例。
1import threading
2
3condition = threading.Condition()
4shared_data = []
5
6def producer():
7 with condition:
8 shared_data.append(1)
9 print("Produced an item")
10 condition.notify() # Notify the consumer
11
12def consumer():
13 with condition:
14 condition.wait() # Wait until the condition is met
15
16 item = shared_data.pop(0)
17 print(f"Consumed an item: {item}")
18
19# Create the threads
20producer_thread = threading.Thread(target=producer)
21consumer_thread = threading.Thread(target=consumer)
22
23consumer_thread.start()
24producer_thread.start()
25
26producer_thread.join()
27consumer_thread.join()- 這段程式碼利用
Condition,讓生產者在添加資料後通知,消費者在收到通知後再取出資料,從而實現同步。
線程守護(Daemonization)
當主線程結束時,守護線程會被強制終止。普通線程必須等待才能終止,而守護線程則會自動終止。
1import threading
2import time
3
4def worker():
5 while True:
6 print("Working...")
7 time.sleep(1)
8
9# Create a daemon thread
10thread = threading.Thread(target=worker)
11thread.daemon = True # Set as a daemon thread
12
13thread.start()
14
15# Processing in the main thread
16time.sleep(3)
17print("Main thread finished")- 在此示例中,
worker線程已被設置為守護線程,因此當主線程結束時它將被強制終止。
使用 ThreadPoolExecutor 進行線程管理
除了 threading 模塊,還可以使用 concurrent.futures 模塊中的 ThreadPoolExecutor 來管理線程池並并行執行任務。
1from concurrent.futures import ThreadPoolExecutor
2import time
3
4def worker(seconds):
5 print(f"Sleeping for {seconds} second(s)")
6 time.sleep(seconds)
7 return f"Finished sleeping for {seconds} second(s)"
8
9with ThreadPoolExecutor(max_workers=3) as executor:
10 futures = [executor.submit(worker, i) for i in range(1, 4)]
11 for future in futures:
12 print(future.result())ThreadPoolExecutor用於創建線程池並高效處理任務。可以通過max_workers指定同時運行的線程數量。
線程間的事件通信
使用 threading.Event,您可以在線程之間設置標誌以通知其他線程事件的發生。
1import threading
2import time
3
4event = threading.Event()
5
6def worker():
7 print("Waiting for event to be set")
8 event.wait() # Wait until the event is set
9
10 print("Event received, continuing work")
11
12thread = threading.Thread(target=worker)
13thread.start()
14
15time.sleep(2)
16print("Setting the event")
17event.set() # Set the event and notify the thread- 這段程式碼展示了一種機制:工作線程會等待
Event訊號,當主線程呼叫event.set()時,工作線程會恢復處理。
線程中的例外處理與終止
當線程中發生例外時,不會直接傳遞到主線程,因此需要一種方式來捕捉和共享例外資訊。
1import threading
2import queue
3
4def worker(err_q):
5 try:
6 raise ValueError("Something bad")
7 except Exception as e:
8 err_q.put(e)
9
10q = queue.Queue()
11t = threading.Thread(target=worker, args=(q,))
12t.start()
13t.join()
14if not q.empty():
15 exc = q.get()
16 print("Worker raised:", exc)- 通過將例外放入
Queue並在主執行緒中取出,你可以可靠地檢測到錯誤。如果你使用concurrent.futures.ThreadPoolExecutor,可以透過future.result()重新拋出例外,使其更易於處理。
GIL(全域直譯器鎖)及其影響
由於 CPython 的 GIL(全域直譯器鎖) 機制,相同進程中的多個 Python 位元碼無法真正同時執行。對於需要大量 CPU 運算的任務,如複雜計算,建議使用 multiprocessing。而對於 I/O 密集型任務,例如檔案讀取或網路通信,則建議使用 threading。
總結
使用 Python 的 threading 模塊,可以實現 多線程程序,並同時運行多個過程。通過使用像 Lock 和 Condition 這類同步機制,可以安全地訪問共享資源並執行復雜的同步操作。此外,使用守護線程或 ThreadPoolExecutor,線程管理與高效並行處理將更加方便。
您可以在我們的 YouTube 頻道上使用 Visual Studio Code 來跟隨上述文章一起學習。 請也查看我們的 YouTube 頻道。