Python中的`concurrent`模块

Python中的`concurrent`模块

在本文中,我们将介绍Python中的concurrent模块。

在阐明并发和并行概念的同时,我们将通过实际示例介绍如何使用concurrent模块实现异步处理。

YouTube Video

Python中的concurrent模块

在加快Python处理速度时,注意并发和并行之间的差异非常重要。concurrent模块是在考虑这些差异的情况下,安全且简单地处理异步处理的重要手段

并发与并行的区别

  • 并发是指将流程设计为多个任务以小的工作单元进行切换来推进。 即使任务实际上没有同时运行,利用“等待时间”也能让整个流程更加高效。

  • 并行是一种物理上同时执行多个任务的机制。 通过使用多个CPU核心,可以同时推进处理。

两者都是加速处理的技术,但并发关注的是“如何推进”的设计问题,而并行关注的是“如何运行”的执行问题,本质上是不同的。

concurrent模块是什么?

concurrent是Python的标准库,提供了用于安全且简单处理并发和并行的高级API。它被设计为让你专注于“执行任务”,无需关心如创建和管理线程或进程等低层操作。

ThreadPoolExecutorProcessPoolExecutor的作用

concurrent模块根据任务的性质提供了两种主要选择。

  • ThreadPoolExecutor 这适用于并发实现,尤其是有大量I/O等待的任务,如网络或文件操作。通过在任务间切换,有效利用等待时间。

  • ProcessPoolExecutor 此实现面向并行处理,适合CPU密集型任务。它使用多个进程,充分利用可用的CPU核心实现并行。

因此,concurrent模块的一个主要特点是,它提供了结构使你能根据需要恰当选择并发还是并行

ThreadPoolExecutor基础(面向I/O任务)

ThreadPoolExecutor适用于I/O受限任务,如网络通信和文件操作。它将在多个线程间分配任务,有效利用等待时间。

 1from concurrent.futures import ThreadPoolExecutor
 2import time
 3
 4def fetch_data(n):
 5    # Simulate an I/O-bound task
 6    time.sleep(1)
 7    return f"data-{n}"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    futures = [executor.submit(fetch_data, i) for i in range(5)]
11
12    for future in futures:
13        print(future.result())
  • 在本例中,多个等待一秒的I/O任务被并发执行。通过使用submit,将函数调用注册为异步任务,然后通过调用result()等待完成并获取结果,让你能够简洁地实现有效利用等待时间的并发处理

使用map实现简单的并发

如果不需要复杂的控制,使用map能让代码更加简洁。

 1from concurrent.futures import ThreadPoolExecutor
 2import time
 3
 4def fetch_data(n):
 5    # Simulate an I/O-bound task
 6    time.sleep(1)
 7    return f"data-{n}"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    results = executor.map(fetch_data, range(5))
11
12    for result in results:
13        print(result)
  • 在本例中,借助ThreadPoolExecutor.map,多个I/O任务被并发执行。由于map返回的结果顺序和输入一致,你可以编写接近顺序处理的代码,无需关注异步处理即可实现并发执行——这是一个巨大优势。

ProcessPoolExecutor基础(面向CPU任务)

对于充分利用CPU的重型计算,你应该使用进程而不是线程。这样可以避开全局解释器锁(GIL)的限制。

 1from concurrent.futures import ProcessPoolExecutor
 2
 3def heavy_calculation(n):
 4    # Simulate a CPU-bound task
 5    total = 0
 6    for i in range(10_000_000):
 7        total += i * n
 8    return total
 9
10if __name__ == "__main__":
11    with ProcessPoolExecutor(max_workers=4) as executor:
12        results = executor.map(heavy_calculation, range(4))
13
14        for result in results:
15            print(result)

在本例中,利用ProcessPoolExecutor并行执行CPU密集型计算。由于涉及创建进程,因此需要__main__保护,这样可以安全地利用多个CPU核心进行并行处理

通过as_completed按照完成顺序处理

当你想按照任务完成顺序处理结果时,as_completed非常有用。

 1from concurrent.futures import ThreadPoolExecutor, as_completed
 2import time
 3
 4def fetch_data(n):
 5    # Simulate an I/O-bound task
 6    time.sleep(1)
 7    return f"data-{n}"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    futures = [executor.submit(fetch_data, i) for i in range(5)]
11
12    for future in as_completed(futures):
13        print(future.result())
  • 在本例中,多个异步任务同时执行,并以它们完成的顺序获取结果。使用as_completed可以让你不论任务顺序都能快速处理结果,适合进度显示或需要按完成顺序处理的场景。

异常处理

concurrent中,在调用result()时会抛出异常

 1from concurrent.futures import ThreadPoolExecutor
 2
 3def risky_task(n):
 4    # Simulate a task that may fail for a specific input
 5    if n == 3:
 6        raise ValueError("Something went wrong")
 7    return n * 2
 8
 9with ThreadPoolExecutor() as executor:
10    futures = [executor.submit(risky_task, i) for i in range(5)]
11
12    for future in futures:
13        try:
14            print(future.result())
15        except Exception as e:
16            print("Error:", e)
  • 本例说明,即使某些任务抛出异常,其他任务也会继续执行,你可以在获取结果时单独处理异常。借助concurrentFuture,能够安全地处理异步处理中成功和失败的情况非常重要。

选择线程还是进程的指导原则

为了有效利用并发和并行,根据任务性质选择合适的方法非常重要

在实际中,以下标准有助于你做出选择。

  • 对于具有大量 I/O 等待的进程,例如通信或文件操作,请使用 ThreadPoolExecutor
  • 对于CPU受限的重型计算任务,应使用ProcessPoolExecutor
  • 如果有很多简单任务,使用map可以让代码更简洁。
  • 如果需要精确控制执行顺序或异常处理,可以结合使用submitas_completed

使用concurrent的好处

通过使用concurrent模块,你可以安全且直观地处理异步操作。

主要好处如下:。

  • 你无需处理底层的线程或进程管理。
  • 它作为Python标准库的一部分提供,你可以放心使用。
  • 代码更具可读性且易于维护。
  • 它是学习并发和并行的理想第一步。

只要牢记这些指导原则,使用concurrent的实现失败率会大大降低。

总结

concurrent模块是Python实际并发与并行的标准选项。它让你无需大幅改变处理逻辑即可提升性能,这在实际应用中是个重要好处。借助concurrent,你可以简洁地实现异步处理,并安全地管理异常处理和执行控制

您可以在我们的YouTube频道上使用Visual Studio Code跟随上述文章进行学习。 请也查看我们的YouTube频道。

YouTube Video