Python中的`concurrent`模块
在本文中,我们将介绍Python中的concurrent模块。
在阐明并发和并行概念的同时,我们将通过实际示例介绍如何使用concurrent模块实现异步处理。
YouTube Video
Python中的concurrent模块
在加快Python处理速度时,注意并发和并行之间的差异非常重要。concurrent模块是在考虑这些差异的情况下,安全且简单地处理异步处理的重要手段。
并发与并行的区别
-
并发是指将流程设计为多个任务以小的工作单元进行切换来推进。 即使任务实际上没有同时运行,利用“等待时间”也能让整个流程更加高效。
-
并行是一种物理上同时执行多个任务的机制。 通过使用多个CPU核心,可以同时推进处理。
两者都是加速处理的技术,但并发关注的是“如何推进”的设计问题,而并行关注的是“如何运行”的执行问题,本质上是不同的。
concurrent模块是什么?
concurrent是Python的标准库,提供了用于安全且简单处理并发和并行的高级API。它被设计为让你专注于“执行任务”,无需关心如创建和管理线程或进程等低层操作。
ThreadPoolExecutor和ProcessPoolExecutor的作用
concurrent模块根据任务的性质提供了两种主要选择。
-
ThreadPoolExecutor这适用于并发实现,尤其是有大量I/O等待的任务,如网络或文件操作。通过在任务间切换,有效利用等待时间。 -
ProcessPoolExecutor此实现面向并行处理,适合CPU密集型任务。它使用多个进程,充分利用可用的CPU核心实现并行。
因此,concurrent模块的一个主要特点是,它提供了结构使你能根据需要恰当选择并发还是并行。
ThreadPoolExecutor基础(面向I/O任务)
ThreadPoolExecutor适用于I/O受限任务,如网络通信和文件操作。它将在多个线程间分配任务,有效利用等待时间。
1from concurrent.futures import ThreadPoolExecutor
2import time
3
4def fetch_data(n):
5 # Simulate an I/O-bound task
6 time.sleep(1)
7 return f"data-{n}"
8
9with ThreadPoolExecutor(max_workers=3) as executor:
10 futures = [executor.submit(fetch_data, i) for i in range(5)]
11
12 for future in futures:
13 print(future.result())- 在本例中,多个等待一秒的I/O任务被并发执行。通过使用
submit,将函数调用注册为异步任务,然后通过调用result()等待完成并获取结果,让你能够简洁地实现有效利用等待时间的并发处理。
使用map实现简单的并发
如果不需要复杂的控制,使用map能让代码更加简洁。
1from concurrent.futures import ThreadPoolExecutor
2import time
3
4def fetch_data(n):
5 # Simulate an I/O-bound task
6 time.sleep(1)
7 return f"data-{n}"
8
9with ThreadPoolExecutor(max_workers=3) as executor:
10 results = executor.map(fetch_data, range(5))
11
12 for result in results:
13 print(result)- 在本例中,借助
ThreadPoolExecutor.map,多个I/O任务被并发执行。由于map返回的结果顺序和输入一致,你可以编写接近顺序处理的代码,无需关注异步处理即可实现并发执行——这是一个巨大优势。
ProcessPoolExecutor基础(面向CPU任务)
对于充分利用CPU的重型计算,你应该使用进程而不是线程。这样可以避开全局解释器锁(GIL)的限制。
1from concurrent.futures import ProcessPoolExecutor
2
3def heavy_calculation(n):
4 # Simulate a CPU-bound task
5 total = 0
6 for i in range(10_000_000):
7 total += i * n
8 return total
9
10if __name__ == "__main__":
11 with ProcessPoolExecutor(max_workers=4) as executor:
12 results = executor.map(heavy_calculation, range(4))
13
14 for result in results:
15 print(result)在本例中,利用ProcessPoolExecutor并行执行CPU密集型计算。由于涉及创建进程,因此需要__main__保护,这样可以安全地利用多个CPU核心进行并行处理。
通过as_completed按照完成顺序处理
当你想按照任务完成顺序处理结果时,as_completed非常有用。
1from concurrent.futures import ThreadPoolExecutor, as_completed
2import time
3
4def fetch_data(n):
5 # Simulate an I/O-bound task
6 time.sleep(1)
7 return f"data-{n}"
8
9with ThreadPoolExecutor(max_workers=3) as executor:
10 futures = [executor.submit(fetch_data, i) for i in range(5)]
11
12 for future in as_completed(futures):
13 print(future.result())- 在本例中,多个异步任务同时执行,并以它们完成的顺序获取结果。使用
as_completed可以让你不论任务顺序都能快速处理结果,适合进度显示或需要按完成顺序处理的场景。
异常处理
在concurrent中,在调用result()时会抛出异常。
1from concurrent.futures import ThreadPoolExecutor
2
3def risky_task(n):
4 # Simulate a task that may fail for a specific input
5 if n == 3:
6 raise ValueError("Something went wrong")
7 return n * 2
8
9with ThreadPoolExecutor() as executor:
10 futures = [executor.submit(risky_task, i) for i in range(5)]
11
12 for future in futures:
13 try:
14 print(future.result())
15 except Exception as e:
16 print("Error:", e)- 本例说明,即使某些任务抛出异常,其他任务也会继续执行,你可以在获取结果时单独处理异常。借助
concurrent的Future,能够安全地处理异步处理中成功和失败的情况非常重要。
选择线程还是进程的指导原则
为了有效利用并发和并行,根据任务性质选择合适的方法非常重要。
在实际中,以下标准有助于你做出选择。
- 对于具有大量 I/O 等待的进程,例如通信或文件操作,请使用
ThreadPoolExecutor。 - 对于CPU受限的重型计算任务,应使用
ProcessPoolExecutor。 - 如果有很多简单任务,使用
map可以让代码更简洁。 - 如果需要精确控制执行顺序或异常处理,可以结合使用
submit和as_completed。
使用concurrent的好处
通过使用concurrent模块,你可以安全且直观地处理异步操作。
主要好处如下:。
- 你无需处理底层的线程或进程管理。
- 它作为Python标准库的一部分提供,你可以放心使用。
- 代码更具可读性且易于维护。
- 它是学习并发和并行的理想第一步。
只要牢记这些指导原则,使用concurrent的实现失败率会大大降低。
总结
concurrent模块是Python实际并发与并行的标准选项。它让你无需大幅改变处理逻辑即可提升性能,这在实际应用中是个重要好处。借助concurrent,你可以简洁地实现异步处理,并安全地管理异常处理和执行控制。
您可以在我们的YouTube频道上使用Visual Studio Code跟随上述文章进行学习。 请也查看我们的YouTube频道。