Python中的`threading`模块
这篇文章解释了Python中的threading模块。
YouTube Video
Python中的threading模块
Python中的threading模块是一个支持多线程编程的标准库。使用线程可以让多个进程并发运行,从而提高程序性能,特别是在涉及I/O等待等阻塞操作的情况下。由于Python的全局解释器锁 (GIL),多线程对CPU密集型操作的效率有限,但对于I/O密集型操作可以高效运行。
以下章节将介绍如何使用threading模块的基本知识以及如何控制线程。
线程的基本用法
创建和运行线程
要创建线程并进行并发处理,可以使用threading.Thread类。指定目标函数以创建线程并执行该线程。
1import threading
2import time
3
4# Function to be executed in a thread
5def worker():
6 print("Worker thread started")
7 time.sleep(2)
8 print("Worker thread finished")
9
10# Create and start the thread
11thread = threading.Thread(target=worker)
12thread.start()
13
14# Processing in the main thread
15print("Main thread continues to run")
16
17# Wait for the thread to finish
18thread.join()
19print("Main thread finished")- 在此示例中,
worker函数在一个单独的线程中执行,而主线程继续运行。通过调用join()方法,主线程会等待子线程完成。
线程命名
为线程赋予有意义的名称可以让日志记录和调试更加容易。你可以通过 name 参数来指定线程名称。
1import threading
2import time
3
4# Function to be executed in a thread
5def worker():
6 print("Worker thread started")
7 time.sleep(2)
8 print("Worker thread finished")
9
10t = threading.Thread(
11 target=worker,
12 args=("named-worker", 0.3),
13 name="MyWorkerThread"
14)
15
16t.start()
17
18print("Active threads:", threading.active_count())
19for th in threading.enumerate():
20 print(" -", th.name)
21
22t.join()threading.enumerate()返回当前线程的列表,这对于调试和监控状态非常有用。
继承Thread类
如果要自定义执行线程的类,可以通过继承threading.Thread类来定义一个新类。
1import threading
2import time
3
4# Inherit from the Thread class
5class WorkerThread(threading.Thread):
6 def __init__(self, name, delay, repeat=3):
7 super().__init__(name=name)
8 self.delay = delay
9 self.repeat = repeat
10 self.results = []
11
12 def run(self):
13 for i in range(self.repeat):
14 msg = f"{self.name} step {i+1}"
15 print(msg)
16 self.results.append(msg)
17 time.sleep(self.delay)
18
19# Create and start the threads
20t1 = WorkerThread("Worker-A", delay=0.4, repeat=3)
21t2 = WorkerThread("Worker-B", delay=0.2, repeat=5)
22
23t1.start()
24t2.start()
25
26t1.join()
27t2.join()
28
29print("Results A:", t1.results)
30print("Results B:", t2.results)- 在此示例中,重写了
run()方法来自定义线程的行为,从而允许每个线程维护各自的数据。当线程执行复杂处理或希望每个线程拥有自己的独立数据时,这种方式很有用。
线程之间的同步
当多个线程同时访问共享资源时,可能会发生数据竞争。为防止这种情况,threading模块提供了几种同步机制。
锁 (Lock)
Lock对象用于在线程间实现对资源的独占控制。当一个线程锁定资源时,其他线程无法访问该资源。
1import threading
2
3lock = threading.Lock()
4shared_resource = 0
5
6def worker():
7 global shared_resource
8 with lock: # Acquire the lock
9 local_copy = shared_resource
10 local_copy += 1
11 shared_resource = local_copy
12
13threads = [threading.Thread(target=worker) for _ in range(5)]
14
15for t in threads:
16 t.start()
17
18for t in threads:
19 t.join()
20
21print(f"Final value of shared resource: {shared_resource}")- 在此示例中,五个线程访问一个共享资源,但通过使用
Lock来防止多个线程同时修改数据。
可重入锁(RLock)
如果同一个线程需要多次获取锁,应使用 RLock(可重入锁)。这在递归调用或库函数在不同调用中获取锁时非常有用。
1import threading
2
3rlock = threading.RLock()
4shared = []
5
6def outer():
7 with rlock:
8 shared.append("outer")
9 inner()
10
11def inner():
12 with rlock:
13 shared.append("inner")
14
15t = threading.Thread(target=outer)
16t.start()
17t.join()
18print(shared)- 使用
RLock,同一个线程可以重新获取它已经持有的锁,这有助于防止在嵌套获取锁时产生死锁。
条件变量 (Condition)
“Condition”用于在满足特定条件之前让线程等待。当一个线程满足某个条件时,你可以调用 notify() 通知另一个线程,或者调用 notify_all() 通知所有等待中的线程。
下面是一个生产者和消费者使用 Condition 的示例。
1import threading
2
3condition = threading.Condition()
4shared_data = []
5
6def producer():
7 with condition:
8 shared_data.append(1)
9 print("Produced an item")
10 condition.notify() # Notify the consumer
11
12def consumer():
13 with condition:
14 condition.wait() # Wait until the condition is met
15
16 item = shared_data.pop(0)
17 print(f"Consumed an item: {item}")
18
19# Create the threads
20producer_thread = threading.Thread(target=producer)
21consumer_thread = threading.Thread(target=consumer)
22
23consumer_thread.start()
24producer_thread.start()
25
26producer_thread.join()
27consumer_thread.join()- 此代码使用
Condition,让生产者在添加数据时通知,消费者在接收到通知后再取数据,实现同步。
线程守护化
当主线程结束时,守护线程会被强制终止。普通线程必须等待终止,而守护线程会自动终止。
1import threading
2import time
3
4def worker():
5 while True:
6 print("Working...")
7 time.sleep(1)
8
9# Create a daemon thread
10thread = threading.Thread(target=worker)
11thread.daemon = True # Set as a daemon thread
12
13thread.start()
14
15# Processing in the main thread
16time.sleep(3)
17print("Main thread finished")- 在此示例中,
worker线程被设置为守护线程,因此当主线程结束时,它会被强制终止。
使用ThreadPoolExecutor进行线程管理
除了threading模块,还可以使用concurrent.futures模块中的ThreadPoolExecutor管理线程池并并行执行任务。
1from concurrent.futures import ThreadPoolExecutor
2import time
3
4def worker(seconds):
5 print(f"Sleeping for {seconds} second(s)")
6 time.sleep(seconds)
7 return f"Finished sleeping for {seconds} second(s)"
8
9with ThreadPoolExecutor(max_workers=3) as executor:
10 futures = [executor.submit(worker, i) for i in range(1, 4)]
11 for future in futures:
12 print(future.result())ThreadPoolExecutor创建线程池并高效处理任务。通过max_workers指定同时运行的线程数。
线程之间的事件通信
使用threading.Event,可以在线程之间设置标志以通知其他线程某事件的发生。
1import threading
2import time
3
4event = threading.Event()
5
6def worker():
7 print("Waiting for event to be set")
8 event.wait() # Wait until the event is set
9
10 print("Event received, continuing work")
11
12thread = threading.Thread(target=worker)
13thread.start()
14
15time.sleep(2)
16print("Setting the event")
17event.set() # Set the event and notify the thread- 此代码演示了这样一种机制:工作线程等待
Event信号,而主线程调用event.set()时,工作线程恢复处理。
线程中的异常处理与线程终止
线程中发生异常时不会直接传递到主线程,因此需要制定捕获和共享异常的模式。
1import threading
2import queue
3
4def worker(err_q):
5 try:
6 raise ValueError("Something bad")
7 except Exception as e:
8 err_q.put(e)
9
10q = queue.Queue()
11t = threading.Thread(target=worker, args=(q,))
12t.start()
13t.join()
14if not q.empty():
15 exc = q.get()
16 print("Worker raised:", exc)- 通过将异常放入
Queue并在主线程中取出,你可以可靠地检测到故障。如果使用concurrent.futures.ThreadPoolExecutor,可以通过future.result()重新抛出异常,从而更容易处理。
GIL(全局解释器锁)及其影响
由于 CPython 中 GIL(全局解释器锁) 的机制,多个 Python 字节码实际上不会在同一进程中同时运行。针对 CPU 密集型任务,如大量计算,建议使用 multiprocessing。而对于 I/O 密集型任务,比如文件读取或网络通信,threading 可以有效地工作。
总结
使用Python的threading模块,可以实现多线程程序并同时执行多个进程。使用诸如Lock和Condition之类的同步机制,可以安全地访问共享资源并执行复杂的同步操作。此外,使用守护线程或ThreadPoolExecutor,线程管理和高效并行处理变得更加容易。
您可以在我们的YouTube频道上使用Visual Studio Code跟随上述文章进行学习。 请也查看我们的YouTube频道。