Python中的`threading`模块

Python中的`threading`模块

这篇文章解释了Python中的threading模块。

YouTube Video

Python中的threading模块

Python中的threading模块是一个支持多线程编程的标准库。使用线程可以让多个进程并发运行,从而提高程序性能,特别是在涉及I/O等待等阻塞操作的情况下。由于Python的全局解释器锁 (GIL),多线程对CPU密集型操作的效率有限,但对于I/O密集型操作可以高效运行。

以下章节将介绍如何使用threading模块的基本知识以及如何控制线程。

线程的基本用法

创建和运行线程

要创建线程并进行并发处理,可以使用threading.Thread类。指定目标函数以创建线程并执行该线程。

 1import threading
 2import time
 3
 4# Function to be executed in a thread
 5def worker():
 6    print("Worker thread started")
 7    time.sleep(2)
 8    print("Worker thread finished")
 9
10# Create and start the thread
11thread = threading.Thread(target=worker)
12thread.start()
13
14# Processing in the main thread
15print("Main thread continues to run")
16
17# Wait for the thread to finish
18thread.join()
19print("Main thread finished")
  • 在此示例中,worker函数在一个单独的线程中执行,而主线程继续运行。通过调用join()方法,主线程会等待子线程完成。

线程命名

为线程赋予有意义的名称可以让日志记录和调试更加容易。你可以通过 name 参数来指定线程名称。

 1import threading
 2import time
 3
 4# Function to be executed in a thread
 5def worker():
 6    print("Worker thread started")
 7    time.sleep(2)
 8    print("Worker thread finished")
 9
10t = threading.Thread(
11    target=worker,
12    args=("named-worker", 0.3),
13    name="MyWorkerThread"
14)
15
16t.start()
17
18print("Active threads:", threading.active_count())
19for th in threading.enumerate():
20    print(" -", th.name)
21
22t.join()
  • threading.enumerate() 返回当前线程的列表,这对于调试和监控状态非常有用。

继承Thread类

如果要自定义执行线程的类,可以通过继承threading.Thread类来定义一个新类。

 1import threading
 2import time
 3
 4# Inherit from the Thread class
 5class WorkerThread(threading.Thread):
 6    def __init__(self, name, delay, repeat=3):
 7        super().__init__(name=name)
 8        self.delay = delay
 9        self.repeat = repeat
10        self.results = []
11
12    def run(self):
13        for i in range(self.repeat):
14            msg = f"{self.name} step {i+1}"
15            print(msg)
16            self.results.append(msg)
17            time.sleep(self.delay)
18
19# Create and start the threads
20t1 = WorkerThread("Worker-A", delay=0.4, repeat=3)
21t2 = WorkerThread("Worker-B", delay=0.2, repeat=5)
22
23t1.start()
24t2.start()
25
26t1.join()
27t2.join()
28
29print("Results A:", t1.results)
30print("Results B:", t2.results)
  • 在此示例中,重写了 run() 方法来自定义线程的行为,从而允许每个线程维护各自的数据。当线程执行复杂处理或希望每个线程拥有自己的独立数据时,这种方式很有用。

线程之间的同步

当多个线程同时访问共享资源时,可能会发生数据竞争。为防止这种情况,threading模块提供了几种同步机制。

锁 (Lock)

Lock对象用于在线程间实现对资源的独占控制。当一个线程锁定资源时,其他线程无法访问该资源。

 1import threading
 2
 3lock = threading.Lock()
 4shared_resource = 0
 5
 6def worker():
 7    global shared_resource
 8    with lock:  # Acquire the lock
 9        local_copy = shared_resource
10        local_copy += 1
11        shared_resource = local_copy
12
13threads = [threading.Thread(target=worker) for _ in range(5)]
14
15for t in threads:
16    t.start()
17
18for t in threads:
19    t.join()
20
21print(f"Final value of shared resource: {shared_resource}")
  • 在此示例中,五个线程访问一个共享资源,但通过使用Lock来防止多个线程同时修改数据。

可重入锁(RLock

如果同一个线程需要多次获取锁,应使用 RLock(可重入锁)。这在递归调用或库函数在不同调用中获取锁时非常有用。

 1import threading
 2
 3rlock = threading.RLock()
 4shared = []
 5
 6def outer():
 7    with rlock:
 8        shared.append("outer")
 9        inner()
10
11def inner():
12    with rlock:
13        shared.append("inner")
14
15t = threading.Thread(target=outer)
16t.start()
17t.join()
18print(shared)
  • 使用 RLock,同一个线程可以重新获取它已经持有的锁,这有助于防止在嵌套获取锁时产生死锁。

条件变量 (Condition)

“Condition”用于在满足特定条件之前让线程等待。当一个线程满足某个条件时,你可以调用 notify() 通知另一个线程,或者调用 notify_all() 通知所有等待中的线程。

下面是一个生产者和消费者使用 Condition 的示例。

 1import threading
 2
 3condition = threading.Condition()
 4shared_data = []
 5
 6def producer():
 7    with condition:
 8        shared_data.append(1)
 9        print("Produced an item")
10        condition.notify()  # Notify the consumer
11
12def consumer():
13    with condition:
14        condition.wait()  # Wait until the condition is met
15
16        item = shared_data.pop(0)
17        print(f"Consumed an item: {item}")
18
19# Create the threads
20producer_thread = threading.Thread(target=producer)
21consumer_thread = threading.Thread(target=consumer)
22
23consumer_thread.start()
24producer_thread.start()
25
26producer_thread.join()
27consumer_thread.join()
  • 此代码使用 Condition,让生产者在添加数据时通知,消费者在接收到通知后再取数据,实现同步。

线程守护化

当主线程结束时,守护线程会被强制终止。普通线程必须等待终止,而守护线程会自动终止。

 1import threading
 2import time
 3
 4def worker():
 5    while True:
 6        print("Working...")
 7        time.sleep(1)
 8
 9# Create a daemon thread
10thread = threading.Thread(target=worker)
11thread.daemon = True  # Set as a daemon thread
12
13thread.start()
14
15# Processing in the main thread
16time.sleep(3)
17print("Main thread finished")
  • 在此示例中,worker线程被设置为守护线程,因此当主线程结束时,它会被强制终止。

使用ThreadPoolExecutor进行线程管理

除了threading模块,还可以使用concurrent.futures模块中的ThreadPoolExecutor管理线程池并并行执行任务。

 1from concurrent.futures import ThreadPoolExecutor
 2import time
 3
 4def worker(seconds):
 5    print(f"Sleeping for {seconds} second(s)")
 6    time.sleep(seconds)
 7    return f"Finished sleeping for {seconds} second(s)"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    futures = [executor.submit(worker, i) for i in range(1, 4)]
11    for future in futures:
12        print(future.result())
  • ThreadPoolExecutor创建线程池并高效处理任务。通过max_workers指定同时运行的线程数。

线程之间的事件通信

使用threading.Event,可以在线程之间设置标志以通知其他线程某事件的发生。

 1import threading
 2import time
 3
 4event = threading.Event()
 5
 6def worker():
 7    print("Waiting for event to be set")
 8    event.wait()  # Wait until the event is set
 9
10    print("Event received, continuing work")
11
12thread = threading.Thread(target=worker)
13thread.start()
14
15time.sleep(2)
16print("Setting the event")
17event.set()  # Set the event and notify the thread
  • 此代码演示了这样一种机制:工作线程等待 Event 信号,而主线程调用 event.set() 时,工作线程恢复处理。

线程中的异常处理与线程终止

线程中发生异常时不会直接传递到主线程,因此需要制定捕获和共享异常的模式。

 1import threading
 2import queue
 3
 4def worker(err_q):
 5    try:
 6        raise ValueError("Something bad")
 7    except Exception as e:
 8        err_q.put(e)
 9
10q = queue.Queue()
11t = threading.Thread(target=worker, args=(q,))
12t.start()
13t.join()
14if not q.empty():
15    exc = q.get()
16    print("Worker raised:", exc)
  • 通过将异常放入 Queue 并在主线程中取出,你可以可靠地检测到故障。如果使用 concurrent.futures.ThreadPoolExecutor,可以通过 future.result() 重新抛出异常,从而更容易处理。

GIL(全局解释器锁)及其影响

由于 CPython 中 GIL(全局解释器锁) 的机制,多个 Python 字节码实际上不会在同一进程中同时运行。针对 CPU 密集型任务,如大量计算,建议使用 multiprocessing。而对于 I/O 密集型任务,比如文件读取或网络通信,threading 可以有效地工作。

总结

使用Python的threading模块,可以实现多线程程序并同时执行多个进程。使用诸如LockCondition之类的同步机制,可以安全地访问共享资源并执行复杂的同步操作。此外,使用守护线程或ThreadPoolExecutor,线程管理和高效并行处理变得更加容易。

您可以在我们的YouTube频道上使用Visual Studio Code跟随上述文章进行学习。 请也查看我们的YouTube频道。

YouTube Video