El módulo `multiprocessing` en Python
Este artículo explica el módulo multiprocessing en Python.
Este artículo presenta consejos prácticos para escribir código de procesamiento paralelo seguro y eficiente utilizando el módulo multiprocessing.
YouTube Video
El módulo multiprocessing en Python
Conceptos básicos: ¿Por qué usar multiprocessing?
multiprocessing permite la paralelización a nivel de procesos, por lo que puedes paralelizar tareas intensivas en CPU sin estar limitado por el GIL (Global Interpreter Lock) de Python. Para tareas dependientes de I/O, threading o asyncio pueden ser más simples y adecuadas.
Uso sencillo de Process
Primero, aquí tienes un ejemplo básico de cómo ejecutar una función en un proceso separado usando Process. Esto demuestra cómo iniciar un proceso, esperar su finalización y pasarle argumentos.
1# Explanation:
2# This example starts a separate process to run `worker` which prints messages.
3# It demonstrates starting, joining, and passing arguments.
4
5from multiprocessing import Process
6import time
7
8def worker(name, delay):
9 # English comment in code per user's preference
10 for i in range(3):
11 print(f"Worker {name}: iteration {i}")
12 time.sleep(delay)
13
14if __name__ == "__main__":
15 p = Process(target=worker, args=("A", 0.5))
16 p.start()
17 print("Main: waiting for worker to finish")
18 p.join()
19 print("Main: worker finished")- Este código muestra el flujo donde el proceso principal lanza un subproceso
workery espera su finalización usandojoin(). Puedes pasar argumentos usandoargs.
Paralelización sencilla con Pool (API de alto nivel)
Pool.map es útil cuando deseas aplicar la misma función a varias tareas independientes. Gestiona los procesos de trabajo internamente por ti.
1# Explanation:
2# Use Pool.map to parallelize a CPU-bound function across available processes.
3# Good for "embarrassingly parallel" workloads.
4
5from multiprocessing import Pool, cpu_count
6import math
7import time
8
9def is_prime(n):
10 # Check primality (inefficient but CPU-heavy for demo)
11 if n < 2:
12 return False
13 for i in range(2, int(math.sqrt(n)) + 1):
14 if n % i == 0:
15 return False
16 return True
17
18if __name__ == "__main__":
19 nums = [10_000_000 + i for i in range(50)]
20 start = time.time()
21 with Pool(processes=cpu_count()) as pool:
22 results = pool.map(is_prime, nums)
23 end = time.time()
24 print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")Poolpuede controlar automáticamente el número de procesos, ymapdevuelve los resultados en el orden original.
Comunicación entre procesos: patrón productor/consumidor usando Queue
Queue es una cola Primero en Entrar, Primero en Salir (FIFO) que transfiere objetos de manera segura entre procesos. A continuación se muestran algunos patrones típicos.
1# Explanation:
2# Demonstrates a producer putting items into a Queue
3# and consumer reading them.
4# This is useful for task pipelines between processes.
5
6from multiprocessing import Process, Queue
7import time
8import random
9
10def producer(q, n):
11 for i in range(n):
12 item = f"item-{i}"
13 print("Producer: putting", item)
14 q.put(item)
15 time.sleep(random.random() * 0.5)
16 q.put(None) # sentinel to signal consumer to stop
17
18def consumer(q):
19 while True:
20 item = q.get()
21 if item is None:
22 break
23 print("Consumer: got", item)
24 time.sleep(0.2)
25
26if __name__ == "__main__":
27 q = Queue()
28 p = Process(target=producer, args=(q, 5))
29 c = Process(target=consumer, args=(q,))
30 p.start()
31 c.start()
32 p.join()
33 c.join()
34 print("Main: done")Queuete permite pasar datos de manera segura entre procesos. Es común usar un valor especial comoNonepara señalar la terminación.
Memoria compartida: Value y Array
Puedes usar Value y Array cuando quieras compartir números o arreglos pequeños entre procesos. Es necesario usar bloqueos (locks) para evitar conflictos.
1# Explanation:
2# Use Value to share a single integer counter
3# and Array for a small numeric array.
4# Show how to use a Lock to avoid race conditions.
5
6from multiprocessing import Process, Value, Array, Lock
7import time
8
9def increment(counter, lock, times):
10 for _ in range(times):
11 with lock:
12 counter.value += 1
13
14def update_array(arr):
15 for i in range(len(arr)):
16 arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19 lock = Lock()
20 counter = Value('i', 0) # 'i' = signed int
21 shared_arr = Array('i', [0, 0, 0])
22
23 p1 = Process(target=increment, args=(counter, lock, 1000))
24 p2 = Process(target=increment, args=(counter, lock, 1000))
25 a = Process(target=update_array, args=(shared_arr,))
26
27 p1.start(); p2.start(); a.start()
28 p1.join(); p2.join(); a.join()
29
30 print("Counter:", counter.value)
31 print("Array:", list(shared_arr))ValueyArraycomparten datos entre procesos usando mecanismos de bajo nivel (memoria compartida a nivel de C), no en el propio Python. Por lo tanto, es adecuado para leer y escribir rápidamente pequeñas cantidades de datos, pero no es adecuado para manejar grandes cantidades de datos..
Compartición avanzada: objetos compartidos (diccionarios, listas) con Manager
Si quieres usar objetos compartidos más flexibles como listas o diccionarios, utiliza Manager().
1# Explanation:
2# Manager provides proxy objects like dict/list
3# that can be shared across processes.
4# Good for moderate-size shared state and easier programming model.
5
6from multiprocessing import Process, Manager
7import time
8
9def worker(shared_dict, key, value):
10 shared_dict[key] = value
11
12if __name__ == "__main__":
13 with Manager() as manager:
14 d = manager.dict()
15 processes = []
16 for i in range(5):
17 p = Process(target=worker, args=(d, f"k{i}", i*i))
18 p.start()
19 processes.append(p)
20 for p in processes:
21 p.join()
22 print("Shared dict:", dict(d))Manageres conveniente para compartir diccionarios y listas, pero cada acceso envía datos entre procesos y requiere conversión conpickle. Por lo tanto, actualizar frecuentemente grandes cantidades de datos ralentizará el procesamiento.
Mecanismos de sincronización: cómo usar Lock y Semaphore
Utiliza Lock o Semaphore para controlar el acceso concurrente a recursos compartidos. Puedes usarlos de forma concisa con la instrucción with.
1# Explanation:
2# Demonstrates using Lock to prevent simultaneous access to a critical section.
3# Locks are necessary when shared resources are not atomic.
4
5from multiprocessing import Process, Lock, Value
6
7def safe_add(counter, lock):
8 for _ in range(10000):
9 with lock:
10 counter.value += 1
11
12if __name__ == "__main__":
13 lock = Lock()
14 counter = Value('i', 0)
15 p1 = Process(target=safe_add, args=(counter, lock))
16 p2 = Process(target=safe_add, args=(counter, lock))
17 p1.start(); p2.start()
18 p1.join(); p2.join()
19 print("Counter:", counter.value)- Los bloqueos previenen condiciones de carrera de datos, pero si la región bloqueada es demasiado grande, el rendimiento del procesamiento paralelo disminuirá. Solo las partes necesarias deben protegerse como una sección crítica.
Diferencias entre fork en UNIX y el comportamiento en Windows
En sistemas UNIX, los procesos se duplican utilizando fork por defecto, haciendo eficiente el uso de memoria mediante copy-on-write. Windows inicia los procesos usando spawn (que reimporta los módulos), así que debes prestar atención a la protección del punto de entrada y la inicialización global.
1# Explanation: Check start method (fork/spawn) and set it if needed.
2# Useful for debugging platform-dependent behavior.
3
4from multiprocessing import get_start_method, set_start_method
5
6if __name__ == "__main__":
7 print("Start method:", get_start_method())
8
9 # uncomment to force spawn on Unix for testing
10 # set_start_method('spawn')set_start_methodsólo puede llamarse una vez al inicio del programa. Es más seguro no cambiar esto arbitrariamente dentro de las librerías.
Ejemplo práctico: comparación de cargas de trabajo intensivas en CPU
A continuación hay un script que simplemente compara cuánto más rápido puede ser el procesamiento con paralelización usando multiprocessing. Aquí usamos Pool.
1# Explanation:
2# Compare sequential vs parallel execution times for CPU-bound task.
3# Helps understand speedup and overhead.
4
5import time
6from multiprocessing import Pool, cpu_count
7import math
8
9def heavy_task(n):
10 s = 0
11 for i in range(1, n):
12 s += math.sqrt(i)
13 return s
14
15def run_sequential(nums):
16 return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19 with Pool(processes=cpu_count()) as p:
20 return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23 nums = [2000000] * 8 # heavy tasks
24 t0 = time.time()
25 run_sequential(nums)
26 seq = time.time() - t0
27 t1 = time.time()
28 run_parallel(nums)
29 par = time.time() - t1
30 print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")- Este ejemplo muestra que, dependiendo de la carga de trabajo y del número de procesos, la paralelización puede ser ineficaz debido a la sobrecarga. Cuanto más grandes e independientes sean las tareas, mayor será el beneficio.
Reglas básicas importantes
A continuación se presentan los puntos básicos para usar multiprocessing de forma segura y eficiente.
- En Windows, los módulos se reimportan cuando se inician los procesos hijos, por lo que debes proteger el punto de entrada de tu script con
if __name__ == "__main__":. - La comunicación entre procesos se serializa (con conversión
pickle), por lo que transferir objetos grandes se vuelve costoso. - Dado que
multiprocessingcrea procesos, es común decidir el número de procesos basándose enmultiprocessing.cpu_count(). - Crear otro
Pooldentro de un trabajador se vuelve complejo, por lo que se debe evitar anidar instancias dePooltanto como sea posible. - Dado que las excepciones que ocurren en los procesos hijos son difíciles de detectar desde el proceso principal, es necesario implementar explícitamente el registro y el manejo de errores.
- Configura el número de procesos de acuerdo al CPU y considera usar hilos para tareas dependientes de I/O.
Consejos prácticos de diseño
A continuación se presentan algunos conceptos y patrones útiles para diseñar el procesamiento paralelo.
- Es eficiente separar los procesos en roles como la lectura de entrada (I/O), preprocesamiento (multi-CPU) y agregación (serial) mediante 'pipelining'.
- Para simplificar la depuración, primero verifica el funcionamiento en un solo proceso antes de paralelizar.
- Para el registro, separa los logs por proceso (por ejemplo, incluye el PID en los nombres de archivo) para facilitar la identificación de problemas.
- Prepara mecanismos de reintentos y de tiempo de espera para poder recuperarte de forma segura si un proceso se queda colgado.
Resumen (Puntos clave que puedes usar de inmediato)
El procesamiento en paralelo es poderoso, pero es importante juzgar correctamente la naturaleza de las tareas, el tamaño de los datos y el costo de la comunicación entre procesos. multiprocessing es efectivo para procesamiento intensivo en CPU, pero un mal diseño o errores de sincronización pueden reducir el rendimiento. Si sigues las reglas y patrones básicos, puedes construir programas paralelos seguros y eficientes.
Puedes seguir el artículo anterior utilizando Visual Studio Code en nuestro canal de YouTube. Por favor, también revisa nuestro canal de YouTube.