Il modulo `multiprocessing` in Python

Il modulo `multiprocessing` in Python

Questo articolo spiega il modulo multiprocessing in Python.

Questo articolo introduce suggerimenti pratici per scrivere codice di elaborazione parallela sicuro ed efficiente utilizzando il modulo multiprocessing.

YouTube Video

Il modulo multiprocessing in Python

Basi: Perché usare multiprocessing?

multiprocessing consente la parallelizzazione a livello di processo, quindi puoi parallelizzare compiti orientati alla CPU senza essere limitato dal GIL (Global Interpreter Lock) di Python. Per i compiti legati all'I/O, threading o asyncio possono essere opzioni più semplici e adatte.

Uso semplice di Process

Ecco un esempio di base per eseguire una funzione in un processo separato usando Process. Questo dimostra come avviare un processo, attendere il suo completamento e passare argomenti.

 1# Explanation:
 2# This example starts a separate process to run `worker` which prints messages.
 3# It demonstrates starting, joining, and passing arguments.
 4
 5from multiprocessing import Process
 6import time
 7
 8def worker(name, delay):
 9    # English comment in code per user's preference
10    for i in range(3):
11        print(f"Worker {name}: iteration {i}")
12        time.sleep(delay)
13
14if __name__ == "__main__":
15    p = Process(target=worker, args=("A", 0.5))
16    p.start()
17    print("Main: waiting for worker to finish")
18    p.join()
19    print("Main: worker finished")
  • Questo codice mostra il flusso in cui il processo principale avvia un processo figlio worker e attende il suo completamento usando join(). È possibile passare argomenti utilizzando args.

Parallelizzazione semplice con Pool (API di alto livello)

Pool.map è utile quando vuoi applicare la stessa funzione a diversi compiti indipendenti. Gestisce internamente i processi worker per te.

 1# Explanation:
 2# Use Pool.map to parallelize a CPU-bound function across available processes.
 3# Good for "embarrassingly parallel" workloads.
 4
 5from multiprocessing import Pool, cpu_count
 6import math
 7import time
 8
 9def is_prime(n):
10    # Check primality (inefficient but CPU-heavy for demo)
11    if n < 2:
12        return False
13    for i in range(2, int(math.sqrt(n)) + 1):
14        if n % i == 0:
15            return False
16    return True
17
18if __name__ == "__main__":
19    nums = [10_000_000 + i for i in range(50)]
20    start = time.time()
21    with Pool(processes=cpu_count()) as pool:
22        results = pool.map(is_prime, nums)
23    end = time.time()
24    print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")
  • Pool può controllare automaticamente il numero di worker e map restituisce i risultati nell'ordine originale.

Comunicazione tra processi: pattern Produttore/Consumatore usando Queue

Queue è una coda First-In-First-Out (FIFO) che trasferisce oggetti in modo sicuro tra i processi. Di seguito sono riportati alcuni schemi tipici.

 1# Explanation:
 2# Demonstrates a producer putting items into a Queue
 3# and consumer reading them.
 4# This is useful for task pipelines between processes.
 5
 6from multiprocessing import Process, Queue
 7import time
 8import random
 9
10def producer(q, n):
11    for i in range(n):
12        item = f"item-{i}"
13        print("Producer: putting", item)
14        q.put(item)
15        time.sleep(random.random() * 0.5)
16    q.put(None)  # sentinel to signal consumer to stop
17
18def consumer(q):
19    while True:
20        item = q.get()
21        if item is None:
22            break
23        print("Consumer: got", item)
24        time.sleep(0.2)
25
26if __name__ == "__main__":
27    q = Queue()
28    p = Process(target=producer, args=(q, 5))
29    c = Process(target=consumer, args=(q,))
30    p.start()
31    c.start()
32    p.join()
33    c.join()
34    print("Main: done")
  • Queue permette di passare dati tra processi in modo sicuro. È comune utilizzare un valore speciale come None per segnalare la terminazione.

Memoria condivisa: Value e Array

Puoi usare Value e Array quando vuoi condividere piccoli numeri o array tra processi. È necessario utilizzare i lock per evitare conflitti.

 1# Explanation:
 2# Use Value to share a single integer counter
 3# and Array for a small numeric array.
 4# Show how to use a Lock to avoid race conditions.
 5
 6from multiprocessing import Process, Value, Array, Lock
 7import time
 8
 9def increment(counter, lock, times):
10    for _ in range(times):
11        with lock:
12            counter.value += 1
13
14def update_array(arr):
15    for i in range(len(arr)):
16        arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19    lock = Lock()
20    counter = Value('i', 0)  # 'i' = signed int
21    shared_arr = Array('i', [0, 0, 0])
22
23    p1 = Process(target=increment, args=(counter, lock, 1000))
24    p2 = Process(target=increment, args=(counter, lock, 1000))
25    a = Process(target=update_array, args=(shared_arr,))
26
27    p1.start(); p2.start(); a.start()
28    p1.join(); p2.join(); a.join()
29
30    print("Counter:", counter.value)
31    print("Array:", list(shared_arr))
  • Value e Array condividono dati tra processi utilizzando meccanismi di basso livello (memoria condivisa a livello C), non direttamente da Python. Pertanto, è adatto per leggere e scrivere rapidamente piccole quantità di dati, ma non è adatto per gestire grandi quantità di dati..

Condivisione avanzata: Oggetti condivisi (dizionari, liste) con Manager

Se vuoi usare oggetti condivisi più flessibili come liste o dizionari, usa Manager().

 1# Explanation:
 2# Manager provides proxy objects like dict/list
 3# that can be shared across processes.
 4# Good for moderate-size shared state and easier programming model.
 5
 6from multiprocessing import Process, Manager
 7import time
 8
 9def worker(shared_dict, key, value):
10    shared_dict[key] = value
11
12if __name__ == "__main__":
13    with Manager() as manager:
14        d = manager.dict()
15        processes = []
16        for i in range(5):
17            p = Process(target=worker, args=(d, f"k{i}", i*i))
18            p.start()
19            processes.append(p)
20        for p in processes:
21            p.join()
22        print("Shared dict:", dict(d))
  • Manager è comodo per condividere dizionari e liste, ma ogni accesso invia dati tra processi e richiede la conversione con pickle. Pertanto, aggiornare frequentemente grandi quantità di dati rallenterà l'elaborazione.

Meccanismi di sincronizzazione: Come usare Lock e Semaphore

Usa Lock o Semaphore per controllare l'accesso concorrente alle risorse condivise. Puoi usarli in modo conciso con l'istruzione with.

 1# Explanation:
 2# Demonstrates using Lock to prevent simultaneous access to a critical section.
 3# Locks are necessary when shared resources are not atomic.
 4
 5from multiprocessing import Process, Lock, Value
 6
 7def safe_add(counter, lock):
 8    for _ in range(10000):
 9        with lock:
10            counter.value += 1
11
12if __name__ == "__main__":
13    lock = Lock()
14    counter = Value('i', 0)
15    p1 = Process(target=safe_add, args=(counter, lock))
16    p2 = Process(target=safe_add, args=(counter, lock))
17    p1.start(); p2.start()
18    p1.join(); p2.join()
19    print("Counter:", counter.value)
  • I lock prevengono le condizioni di race sui dati, ma se la regione bloccata è troppo ampia, le prestazioni dell'elaborazione parallela diminuiranno. Solo le parti necessarie dovrebbero essere protette come sezione critica.

Differenze tra fork su UNIX e comportamento su Windows

Nei sistemi UNIX, i processi vengono duplicati di default usando fork, rendendo efficiente la memoria tramite copy-on-write. Windows avvia i processi utilizzando spawn (che re-importa i moduli), quindi è necessario prestare attenzione alla protezione del punto di ingresso e all'inizializzazione globale.

 1# Explanation: Check start method (fork/spawn) and set it if needed.
 2# Useful for debugging platform-dependent behavior.
 3
 4from multiprocessing import get_start_method, set_start_method
 5
 6if __name__ == "__main__":
 7    print("Start method:", get_start_method())
 8
 9    # uncomment to force spawn on Unix for testing
10    # set_start_method('spawn')
  • set_start_method può essere chiamato solo una volta all'inizio del programma. È più sicuro non modificare questa impostazione arbitrariamente all'interno delle librerie.

Esempio pratico: Benchmark di carichi di lavoro orientati alla CPU (confronto)

Di seguito è riportato uno script che confronta semplicemente quanto possa essere più veloce l’elaborazione con la parallelizzazione utilizzando multiprocessing. Qui, usiamo Pool.

 1# Explanation:
 2# Compare sequential vs parallel execution times for CPU-bound task.
 3# Helps understand speedup and overhead.
 4
 5import time
 6from multiprocessing import Pool, cpu_count
 7import math
 8
 9def heavy_task(n):
10    s = 0
11    for i in range(1, n):
12        s += math.sqrt(i)
13    return s
14
15def run_sequential(nums):
16    return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19    with Pool(processes=cpu_count()) as p:
20        return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23    nums = [2000000] * 8  # heavy tasks
24    t0 = time.time()
25    run_sequential(nums)
26    seq = time.time() - t0
27    t1 = time.time()
28    run_parallel(nums)
29    par = time.time() - t1
30    print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")
  • Questo esempio mostra che, a seconda del carico dei compiti e del numero di processi, la parallelizzazione può a volte essere inefficace a causa del sovraccarico. Più grandi (“pesanti”) e indipendenti sono i compiti, maggiore è il beneficio.

Regole di base importanti

Di seguito sono riportati i punti fondamentali per utilizzare multiprocessing in modo sicuro ed efficiente.

  • Su Windows, i moduli vengono reimportati all'avvio dei processi figli, quindi è necessario proteggere il punto di ingresso dello script con if __name__ == "__main__":.
  • La comunicazione tra processi è serializzata (con conversione pickle), quindi il trasferimento di oggetti di grandi dimensioni risulta costoso.
  • Poiché multiprocessing crea processi, è comune decidere il numero di processi in base a multiprocessing.cpu_count().
  • Creare un altro Pool all'interno di un worker diventa complesso, quindi è consigliabile evitare il più possibile il nesting delle istanze di Pool.
  • Poiché le eccezioni che si verificano nei processi figli sono difficili da rilevare dal processo principale, è necessario implementare esplicitamente il logging e la gestione degli errori.
  • Imposta il numero di processi in base alla CPU e valuta l'uso dei thread per i compiti legati all'I/O.

Consigli pratici di progettazione

Di seguito sono riportati alcuni concetti e schemi utili per progettare l'elaborazione parallela.

  • È efficiente separare i processi in ruoli come lettura input (I/O), pre-elaborazione (multi-CPU) e aggregazione (seriale) tramite 'pipelining'.
  • Per semplificare il debug, verifica prima il funzionamento in un singolo processo prima di parallelizzare.
  • Per il logging, separa gli output di log per processo (es. inserisci il PID nei nomi dei file) per facilitare la risoluzione dei problemi.
  • Prepara meccanismi di retry e timeout in modo da poter recuperare in sicurezza anche se un processo si blocca.

Riepilogo (Punti chiave che puoi usare subito)

L'elaborazione parallela è potente, ma è importante valutare correttamente la natura dei compiti, la dimensione dei dati e il costo della comunicazione tra processi. multiprocessing è efficace per l'elaborazione orientata alla CPU, ma una progettazione scorretta o errori di sincronizzazione possono ridurre le prestazioni. Se segui le regole di base e i pattern, puoi realizzare programmi paralleli sicuri ed efficienti.

Puoi seguire l'articolo sopra utilizzando Visual Studio Code sul nostro canale YouTube. Controlla anche il nostro canale YouTube.

YouTube Video