Python 中的 `multiprocessing` 模块

Python 中的 `multiprocessing` 模块

本文解释了 Python 中的 multiprocessing 模块。

本文介绍了使用multiprocessing模块编写安全高效并行处理代码的实用技巧。

YouTube Video

Python 中的 multiprocessing 模块

基础知识:为什么使用multiprocessing

multiprocessing实现了进程级并行,因此可以在不受Python全局解释器锁(GIL)限制的情况下并行处理CPU密集型任务。对于I/O密集型任务,threadingasyncio可能更简单、更合适。

Process的简单用法

首先,这是一个用Process在独立进程中运行函数的基本示例。这演示了如何启动进程、等待其完成以及传递参数。

 1# Explanation:
 2# This example starts a separate process to run `worker` which prints messages.
 3# It demonstrates starting, joining, and passing arguments.
 4
 5from multiprocessing import Process
 6import time
 7
 8def worker(name, delay):
 9    # English comment in code per user's preference
10    for i in range(3):
11        print(f"Worker {name}: iteration {i}")
12        time.sleep(delay)
13
14if __name__ == "__main__":
15    p = Process(target=worker, args=("A", 0.5))
16    p.start()
17    print("Main: waiting for worker to finish")
18    p.join()
19    print("Main: worker finished")
  • 此代码展示了主进程启动子进程worker并通过join()等待其完成的流程。可以通过args传递参数。

使用Pool进行简单并行(高级API)

当你需要对多个独立任务应用相同的函数时,Pool.map非常有用。它会自动帮你管理工作进程。

 1# Explanation:
 2# Use Pool.map to parallelize a CPU-bound function across available processes.
 3# Good for "embarrassingly parallel" workloads.
 4
 5from multiprocessing import Pool, cpu_count
 6import math
 7import time
 8
 9def is_prime(n):
10    # Check primality (inefficient but CPU-heavy for demo)
11    if n < 2:
12        return False
13    for i in range(2, int(math.sqrt(n)) + 1):
14        if n % i == 0:
15            return False
16    return True
17
18if __name__ == "__main__":
19    nums = [10_000_000 + i for i in range(50)]
20    start = time.time()
21    with Pool(processes=cpu_count()) as pool:
22        results = pool.map(is_prime, nums)
23    end = time.time()
24    print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")
  • Pool可以自动控制工作进程数量,map按原顺序返回结果。

进程间通信:使用Queue实现生产者/消费者模式

Queue 是一个先进先出 (FIFO) 队列,可以在进程之间安全地传递对象。以下是一些典型的模式。

 1# Explanation:
 2# Demonstrates a producer putting items into a Queue
 3# and consumer reading them.
 4# This is useful for task pipelines between processes.
 5
 6from multiprocessing import Process, Queue
 7import time
 8import random
 9
10def producer(q, n):
11    for i in range(n):
12        item = f"item-{i}"
13        print("Producer: putting", item)
14        q.put(item)
15        time.sleep(random.random() * 0.5)
16    q.put(None)  # sentinel to signal consumer to stop
17
18def consumer(q):
19    while True:
20        item = q.get()
21        if item is None:
22            break
23        print("Consumer: got", item)
24        time.sleep(0.2)
25
26if __name__ == "__main__":
27    q = Queue()
28    p = Process(target=producer, args=(q, 5))
29    c = Process(target=consumer, args=(q,))
30    p.start()
31    c.start()
32    p.join()
33    c.join()
34    print("Main: done")
  • Queue允许你在进程间安全传递数据。通常会使用像None这样的特殊值来表示终止。

共享内存:ValueArray

当你需要在进程间共享少量数字或数组时,可使用ValueArray。你需要使用锁来避免冲突。

 1# Explanation:
 2# Use Value to share a single integer counter
 3# and Array for a small numeric array.
 4# Show how to use a Lock to avoid race conditions.
 5
 6from multiprocessing import Process, Value, Array, Lock
 7import time
 8
 9def increment(counter, lock, times):
10    for _ in range(times):
11        with lock:
12            counter.value += 1
13
14def update_array(arr):
15    for i in range(len(arr)):
16        arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19    lock = Lock()
20    counter = Value('i', 0)  # 'i' = signed int
21    shared_arr = Array('i', [0, 0, 0])
22
23    p1 = Process(target=increment, args=(counter, lock, 1000))
24    p2 = Process(target=increment, args=(counter, lock, 1000))
25    a = Process(target=update_array, args=(shared_arr,))
26
27    p1.start(); p2.start(); a.start()
28    p1.join(); p2.join(); a.join()
29
30    print("Counter:", counter.value)
31    print("Array:", list(shared_arr))
  • ValueArray通过**底层机制(C语言级别的共享内存)**在进程间共享数据,而非Python本身。因此,它适合快速读写少量数据,但不适合处理大量数据。

高级共享:使用Manager共享对象(字典、列表等)

如果你想使用更灵活的共享对象(如列表或字典),请使用Manager()

 1# Explanation:
 2# Manager provides proxy objects like dict/list
 3# that can be shared across processes.
 4# Good for moderate-size shared state and easier programming model.
 5
 6from multiprocessing import Process, Manager
 7import time
 8
 9def worker(shared_dict, key, value):
10    shared_dict[key] = value
11
12if __name__ == "__main__":
13    with Manager() as manager:
14        d = manager.dict()
15        processes = []
16        for i in range(5):
17            p = Process(target=worker, args=(d, f"k{i}", i*i))
18            p.start()
19            processes.append(p)
20        for p in processes:
21            p.join()
22        print("Shared dict:", dict(d))
  • Manager方便于共享字典和列表,但每次访问都会在进程间传输数据,并且需要进行**pickle转换**。因此,频繁更新大量数据会导致处理速度变慢。

同步机制:如何使用LockSemaphore

使用LockSemaphore控制对共享资源的并发访问。可以结合with语句简单地使用它们。

 1# Explanation:
 2# Demonstrates using Lock to prevent simultaneous access to a critical section.
 3# Locks are necessary when shared resources are not atomic.
 4
 5from multiprocessing import Process, Lock, Value
 6
 7def safe_add(counter, lock):
 8    for _ in range(10000):
 9        with lock:
10            counter.value += 1
11
12if __name__ == "__main__":
13    lock = Lock()
14    counter = Value('i', 0)
15    p1 = Process(target=safe_add, args=(counter, lock))
16    p2 = Process(target=safe_add, args=(counter, lock))
17    p1.start(); p2.start()
18    p1.join(); p2.join()
19    print("Counter:", counter.value)
  • 锁可以防止数据竞争,但如果加锁范围过大,会降低并行处理性能。只有必要的部分才应作为临界区进行保护。

UNIX上的fork与Windows上的行为差异

在UNIX系统中,进程默认通过fork复制,内存采用写时复制(COW),效率较高。Windows通过spawn启动进程(会重新导入模块),因此需要注意入口保护及全局初始化。

 1# Explanation: Check start method (fork/spawn) and set it if needed.
 2# Useful for debugging platform-dependent behavior.
 3
 4from multiprocessing import get_start_method, set_start_method
 5
 6if __name__ == "__main__":
 7    print("Start method:", get_start_method())
 8
 9    # uncomment to force spawn on Unix for testing
10    # set_start_method('spawn')
  • set_start_method只能在程序启动时调用一次。在库中随意更改该设置并不安全。

实用示例:CPU密集型任务的性能基准测试(对比)

下面是一个脚本,简单对比了使用multiprocessing实现并行处理后速度提高了多少。这里我们使用Pool

 1# Explanation:
 2# Compare sequential vs parallel execution times for CPU-bound task.
 3# Helps understand speedup and overhead.
 4
 5import time
 6from multiprocessing import Pool, cpu_count
 7import math
 8
 9def heavy_task(n):
10    s = 0
11    for i in range(1, n):
12        s += math.sqrt(i)
13    return s
14
15def run_sequential(nums):
16    return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19    with Pool(processes=cpu_count()) as p:
20        return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23    nums = [2000000] * 8  # heavy tasks
24    t0 = time.time()
25    run_sequential(nums)
26    seq = time.time() - t0
27    t1 = time.time()
28    run_parallel(nums)
29    par = time.time() - t1
30    print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")
  • 该示例表明,根据任务负载和进程数量,并行化有时因开销过大反而效果不佳。任务越大(越重)、越独立,并行化收益越大。

重要的基本规则

以下是安全高效使用multiprocessing的基本要点。

  • 在Windows上,子进程启动时会重新导入模块,因此必须用if __name__ == "__main__":保护脚本入口。
  • 进程间通信是序列化的(要经过pickle转换),所以传输大型对象的开销会很高。
  • 由于multiprocessing创建的是进程,通常根据multiprocessing.cpu_count()来决定进程数。
  • 在一个工作进程中创建另一个Pool会变得很复杂,所以应尽量避免嵌套Pool实例。
  • 由于主进程很难检测到子进程中发生的异常,因此有必要明确实现日志记录和错误处理。
  • 根据CPU设置进程数量,I/O密集型任务可考虑使用线程。

实用设计建议

以下是设计并行处理时一些有用的概念和模式。

  • 通过“流水线”方式将进程划分为输入读取(I/O)、预处理(多CPU)、聚合(串行)等角色更高效。
  • 为简化调试,建议先单进程验证功能后再并行化。
  • 日志方面,建议每个进程分别输出日志(如文件名中包含PID),便于定位问题。
  • 请准备重试与超时机制,即使进程挂起也能安全恢复。

总结(可立即应用的要点)

并行处理很强大,但需要正确判断任务性质、数据量和进程间通信开销。multiprocessing适用于CPU密集型任务,但设计不当或同步错误会降低性能。遵循基本规则和模式即可构建安全高效的并行程序。

您可以在我们的YouTube频道上使用Visual Studio Code跟随上述文章进行学习。 请也查看我们的YouTube频道。

YouTube Video