Il modulo `multiprocessing` in Python
Questo articolo spiega il modulo multiprocessing in Python.
Questo articolo introduce suggerimenti pratici per scrivere codice di elaborazione parallela sicuro ed efficiente utilizzando il modulo multiprocessing.
YouTube Video
Il modulo multiprocessing in Python
Basi: Perché usare multiprocessing?
multiprocessing consente la parallelizzazione a livello di processo, quindi puoi parallelizzare compiti orientati alla CPU senza essere limitato dal GIL (Global Interpreter Lock) di Python. Per i compiti legati all'I/O, threading o asyncio possono essere opzioni più semplici e adatte.
Uso semplice di Process
Ecco un esempio di base per eseguire una funzione in un processo separato usando Process. Questo dimostra come avviare un processo, attendere il suo completamento e passare argomenti.
1# Explanation:
2# This example starts a separate process to run `worker` which prints messages.
3# It demonstrates starting, joining, and passing arguments.
4
5from multiprocessing import Process
6import time
7
8def worker(name, delay):
9 # English comment in code per user's preference
10 for i in range(3):
11 print(f"Worker {name}: iteration {i}")
12 time.sleep(delay)
13
14if __name__ == "__main__":
15 p = Process(target=worker, args=("A", 0.5))
16 p.start()
17 print("Main: waiting for worker to finish")
18 p.join()
19 print("Main: worker finished")- Questo codice mostra il flusso in cui il processo principale avvia un processo figlio
workere attende il suo completamento usandojoin(). È possibile passare argomenti utilizzandoargs.
Parallelizzazione semplice con Pool (API di alto livello)
Pool.map è utile quando vuoi applicare la stessa funzione a diversi compiti indipendenti. Gestisce internamente i processi worker per te.
1# Explanation:
2# Use Pool.map to parallelize a CPU-bound function across available processes.
3# Good for "embarrassingly parallel" workloads.
4
5from multiprocessing import Pool, cpu_count
6import math
7import time
8
9def is_prime(n):
10 # Check primality (inefficient but CPU-heavy for demo)
11 if n < 2:
12 return False
13 for i in range(2, int(math.sqrt(n)) + 1):
14 if n % i == 0:
15 return False
16 return True
17
18if __name__ == "__main__":
19 nums = [10_000_000 + i for i in range(50)]
20 start = time.time()
21 with Pool(processes=cpu_count()) as pool:
22 results = pool.map(is_prime, nums)
23 end = time.time()
24 print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")Poolpuò controllare automaticamente il numero di worker emaprestituisce i risultati nell'ordine originale.
Comunicazione tra processi: pattern Produttore/Consumatore usando Queue
Queue è una coda First-In-First-Out (FIFO) che trasferisce oggetti in modo sicuro tra i processi. Di seguito sono riportati alcuni schemi tipici.
1# Explanation:
2# Demonstrates a producer putting items into a Queue
3# and consumer reading them.
4# This is useful for task pipelines between processes.
5
6from multiprocessing import Process, Queue
7import time
8import random
9
10def producer(q, n):
11 for i in range(n):
12 item = f"item-{i}"
13 print("Producer: putting", item)
14 q.put(item)
15 time.sleep(random.random() * 0.5)
16 q.put(None) # sentinel to signal consumer to stop
17
18def consumer(q):
19 while True:
20 item = q.get()
21 if item is None:
22 break
23 print("Consumer: got", item)
24 time.sleep(0.2)
25
26if __name__ == "__main__":
27 q = Queue()
28 p = Process(target=producer, args=(q, 5))
29 c = Process(target=consumer, args=(q,))
30 p.start()
31 c.start()
32 p.join()
33 c.join()
34 print("Main: done")Queuepermette di passare dati tra processi in modo sicuro. È comune utilizzare un valore speciale comeNoneper segnalare la terminazione.
Memoria condivisa: Value e Array
Puoi usare Value e Array quando vuoi condividere piccoli numeri o array tra processi. È necessario utilizzare i lock per evitare conflitti.
1# Explanation:
2# Use Value to share a single integer counter
3# and Array for a small numeric array.
4# Show how to use a Lock to avoid race conditions.
5
6from multiprocessing import Process, Value, Array, Lock
7import time
8
9def increment(counter, lock, times):
10 for _ in range(times):
11 with lock:
12 counter.value += 1
13
14def update_array(arr):
15 for i in range(len(arr)):
16 arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19 lock = Lock()
20 counter = Value('i', 0) # 'i' = signed int
21 shared_arr = Array('i', [0, 0, 0])
22
23 p1 = Process(target=increment, args=(counter, lock, 1000))
24 p2 = Process(target=increment, args=(counter, lock, 1000))
25 a = Process(target=update_array, args=(shared_arr,))
26
27 p1.start(); p2.start(); a.start()
28 p1.join(); p2.join(); a.join()
29
30 print("Counter:", counter.value)
31 print("Array:", list(shared_arr))ValueeArraycondividono dati tra processi utilizzando meccanismi di basso livello (memoria condivisa a livello C), non direttamente da Python. Pertanto, è adatto per leggere e scrivere rapidamente piccole quantità di dati, ma non è adatto per gestire grandi quantità di dati..
Condivisione avanzata: Oggetti condivisi (dizionari, liste) con Manager
Se vuoi usare oggetti condivisi più flessibili come liste o dizionari, usa Manager().
1# Explanation:
2# Manager provides proxy objects like dict/list
3# that can be shared across processes.
4# Good for moderate-size shared state and easier programming model.
5
6from multiprocessing import Process, Manager
7import time
8
9def worker(shared_dict, key, value):
10 shared_dict[key] = value
11
12if __name__ == "__main__":
13 with Manager() as manager:
14 d = manager.dict()
15 processes = []
16 for i in range(5):
17 p = Process(target=worker, args=(d, f"k{i}", i*i))
18 p.start()
19 processes.append(p)
20 for p in processes:
21 p.join()
22 print("Shared dict:", dict(d))Managerè comodo per condividere dizionari e liste, ma ogni accesso invia dati tra processi e richiede la conversione conpickle. Pertanto, aggiornare frequentemente grandi quantità di dati rallenterà l'elaborazione.
Meccanismi di sincronizzazione: Come usare Lock e Semaphore
Usa Lock o Semaphore per controllare l'accesso concorrente alle risorse condivise. Puoi usarli in modo conciso con l'istruzione with.
1# Explanation:
2# Demonstrates using Lock to prevent simultaneous access to a critical section.
3# Locks are necessary when shared resources are not atomic.
4
5from multiprocessing import Process, Lock, Value
6
7def safe_add(counter, lock):
8 for _ in range(10000):
9 with lock:
10 counter.value += 1
11
12if __name__ == "__main__":
13 lock = Lock()
14 counter = Value('i', 0)
15 p1 = Process(target=safe_add, args=(counter, lock))
16 p2 = Process(target=safe_add, args=(counter, lock))
17 p1.start(); p2.start()
18 p1.join(); p2.join()
19 print("Counter:", counter.value)- I lock prevengono le condizioni di race sui dati, ma se la regione bloccata è troppo ampia, le prestazioni dell'elaborazione parallela diminuiranno. Solo le parti necessarie dovrebbero essere protette come sezione critica.
Differenze tra fork su UNIX e comportamento su Windows
Nei sistemi UNIX, i processi vengono duplicati di default usando fork, rendendo efficiente la memoria tramite copy-on-write. Windows avvia i processi utilizzando spawn (che re-importa i moduli), quindi è necessario prestare attenzione alla protezione del punto di ingresso e all'inizializzazione globale.
1# Explanation: Check start method (fork/spawn) and set it if needed.
2# Useful for debugging platform-dependent behavior.
3
4from multiprocessing import get_start_method, set_start_method
5
6if __name__ == "__main__":
7 print("Start method:", get_start_method())
8
9 # uncomment to force spawn on Unix for testing
10 # set_start_method('spawn')set_start_methodpuò essere chiamato solo una volta all'inizio del programma. È più sicuro non modificare questa impostazione arbitrariamente all'interno delle librerie.
Esempio pratico: Benchmark di carichi di lavoro orientati alla CPU (confronto)
Di seguito è riportato uno script che confronta semplicemente quanto possa essere più veloce l’elaborazione con la parallelizzazione utilizzando multiprocessing. Qui, usiamo Pool.
1# Explanation:
2# Compare sequential vs parallel execution times for CPU-bound task.
3# Helps understand speedup and overhead.
4
5import time
6from multiprocessing import Pool, cpu_count
7import math
8
9def heavy_task(n):
10 s = 0
11 for i in range(1, n):
12 s += math.sqrt(i)
13 return s
14
15def run_sequential(nums):
16 return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19 with Pool(processes=cpu_count()) as p:
20 return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23 nums = [2000000] * 8 # heavy tasks
24 t0 = time.time()
25 run_sequential(nums)
26 seq = time.time() - t0
27 t1 = time.time()
28 run_parallel(nums)
29 par = time.time() - t1
30 print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")- Questo esempio mostra che, a seconda del carico dei compiti e del numero di processi, la parallelizzazione può a volte essere inefficace a causa del sovraccarico. Più grandi (“pesanti”) e indipendenti sono i compiti, maggiore è il beneficio.
Regole di base importanti
Di seguito sono riportati i punti fondamentali per utilizzare multiprocessing in modo sicuro ed efficiente.
- Su Windows, i moduli vengono reimportati all'avvio dei processi figli, quindi è necessario proteggere il punto di ingresso dello script con
if __name__ == "__main__":. - La comunicazione tra processi è serializzata (con conversione
pickle), quindi il trasferimento di oggetti di grandi dimensioni risulta costoso. - Poiché
multiprocessingcrea processi, è comune decidere il numero di processi in base amultiprocessing.cpu_count(). - Creare un altro
Poolall'interno di un worker diventa complesso, quindi è consigliabile evitare il più possibile il nesting delle istanze diPool. - Poiché le eccezioni che si verificano nei processi figli sono difficili da rilevare dal processo principale, è necessario implementare esplicitamente il logging e la gestione degli errori.
- Imposta il numero di processi in base alla CPU e valuta l'uso dei thread per i compiti legati all'I/O.
Consigli pratici di progettazione
Di seguito sono riportati alcuni concetti e schemi utili per progettare l'elaborazione parallela.
- È efficiente separare i processi in ruoli come lettura input (I/O), pre-elaborazione (multi-CPU) e aggregazione (seriale) tramite 'pipelining'.
- Per semplificare il debug, verifica prima il funzionamento in un singolo processo prima di parallelizzare.
- Per il logging, separa gli output di log per processo (es. inserisci il PID nei nomi dei file) per facilitare la risoluzione dei problemi.
- Prepara meccanismi di retry e timeout in modo da poter recuperare in sicurezza anche se un processo si blocca.
Riepilogo (Punti chiave che puoi usare subito)
L'elaborazione parallela è potente, ma è importante valutare correttamente la natura dei compiti, la dimensione dei dati e il costo della comunicazione tra processi. multiprocessing è efficace per l'elaborazione orientata alla CPU, ma una progettazione scorretta o errori di sincronizzazione possono ridurre le prestazioni. Se segui le regole di base e i pattern, puoi realizzare programmi paralleli sicuri ed efficienti.
Puoi seguire l'articolo sopra utilizzando Visual Studio Code sul nostro canale YouTube. Controlla anche il nostro canale YouTube.