De `multiprocessing`-module in Python
Dit artikel legt de multiprocessing-module in Python uit.
Dit artikel geeft praktische tips voor het schrijven van veilige en efficiënte parallelle code met de multiprocessing-module.
YouTube Video
De multiprocessing-module in Python
Basisprincipes: Waarom multiprocessing gebruiken?
multiprocessing maakt procesgebaseerde parallelisatie mogelijk, zodat je CPU-intensieve taken kunt paralleliseren zonder beperkt te worden door de GIL (Global Interpreter Lock) van Python. Voor I/O-intensieve taken zijn threading of asyncio vaak eenvoudiger en geschikter.
Eenvoudig gebruik van Process
Hier volgt eerst een eenvoudig voorbeeld van het uitvoeren van een functie in een apart proces met behulp van Process. Dit laat zien hoe je een proces start, wacht tot het klaar is en argumenten doorgeeft.
1# Explanation:
2# This example starts a separate process to run `worker` which prints messages.
3# It demonstrates starting, joining, and passing arguments.
4
5from multiprocessing import Process
6import time
7
8def worker(name, delay):
9 # English comment in code per user's preference
10 for i in range(3):
11 print(f"Worker {name}: iteration {i}")
12 time.sleep(delay)
13
14if __name__ == "__main__":
15 p = Process(target=worker, args=("A", 0.5))
16 p.start()
17 print("Main: waiting for worker to finish")
18 p.join()
19 print("Main: worker finished")- Deze code toont het proces waarbij het hoofdproces een subproces
workerstart en wacht tot het voltooid is metjoin(). Je kunt argumenten doorgeven viaargs.
Eenvoudige parallelisatie met Pool (high-level API)
Pool.map is handig als je dezelfde functie wilt toepassen op meerdere onafhankelijke taken. Deze beheert de werkprocessen intern voor je.
1# Explanation:
2# Use Pool.map to parallelize a CPU-bound function across available processes.
3# Good for "embarrassingly parallel" workloads.
4
5from multiprocessing import Pool, cpu_count
6import math
7import time
8
9def is_prime(n):
10 # Check primality (inefficient but CPU-heavy for demo)
11 if n < 2:
12 return False
13 for i in range(2, int(math.sqrt(n)) + 1):
14 if n % i == 0:
15 return False
16 return True
17
18if __name__ == "__main__":
19 nums = [10_000_000 + i for i in range(50)]
20 start = time.time()
21 with Pool(processes=cpu_count()) as pool:
22 results = pool.map(is_prime, nums)
23 end = time.time()
24 print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")Poolkan automatisch het aantal werkers regelen, enmapgeeft de resultaten terug in de oorspronkelijke volgorde.
Interprocess-communicatie: Producer/Consumer-patroon met Queue
Queue is een First-In-First-Out (FIFO) wachtrij die veilig objecten tussen processen overdraagt. Hieronder staan enkele typische patronen.
1# Explanation:
2# Demonstrates a producer putting items into a Queue
3# and consumer reading them.
4# This is useful for task pipelines between processes.
5
6from multiprocessing import Process, Queue
7import time
8import random
9
10def producer(q, n):
11 for i in range(n):
12 item = f"item-{i}"
13 print("Producer: putting", item)
14 q.put(item)
15 time.sleep(random.random() * 0.5)
16 q.put(None) # sentinel to signal consumer to stop
17
18def consumer(q):
19 while True:
20 item = q.get()
21 if item is None:
22 break
23 print("Consumer: got", item)
24 time.sleep(0.2)
25
26if __name__ == "__main__":
27 q = Queue()
28 p = Process(target=producer, args=(q, 5))
29 c = Process(target=consumer, args=(q,))
30 p.start()
31 c.start()
32 p.join()
33 c.join()
34 print("Main: done")Queuemaakt het mogelijk om veilig data tussen processen uit te wisselen. Het is gebruikelijk om een speciale waarde zoalsNonete gebruiken om beëindiging aan te geven.
Gedeeld geheugen: Value en Array
Je kunt Value en Array gebruiken als je kleine nummers of arrays tussen processen wilt delen. Je moet sloten (locks) gebruiken om conflicten te voorkomen.
1# Explanation:
2# Use Value to share a single integer counter
3# and Array for a small numeric array.
4# Show how to use a Lock to avoid race conditions.
5
6from multiprocessing import Process, Value, Array, Lock
7import time
8
9def increment(counter, lock, times):
10 for _ in range(times):
11 with lock:
12 counter.value += 1
13
14def update_array(arr):
15 for i in range(len(arr)):
16 arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19 lock = Lock()
20 counter = Value('i', 0) # 'i' = signed int
21 shared_arr = Array('i', [0, 0, 0])
22
23 p1 = Process(target=increment, args=(counter, lock, 1000))
24 p2 = Process(target=increment, args=(counter, lock, 1000))
25 a = Process(target=update_array, args=(shared_arr,))
26
27 p1.start(); p2.start(); a.start()
28 p1.join(); p2.join(); a.join()
29
30 print("Counter:", counter.value)
31 print("Array:", list(shared_arr))ValueenArraydelen data tussen processen via low-level mechanismen (gedeeld geheugen op C-niveau), niet via Python zelf. Daarom is het geschikt voor het snel lezen en schrijven van kleine hoeveelheden gegevens, maar niet geschikt voor het verwerken van grote hoeveelheden gegevens..
Geavanceerd delen: Gedeelde objecten (dicts, lijsten) met Manager
Als je flexibelere gedeelde objecten zoals lijsten of dictionaries wilt gebruiken, gebruik dan Manager().
1# Explanation:
2# Manager provides proxy objects like dict/list
3# that can be shared across processes.
4# Good for moderate-size shared state and easier programming model.
5
6from multiprocessing import Process, Manager
7import time
8
9def worker(shared_dict, key, value):
10 shared_dict[key] = value
11
12if __name__ == "__main__":
13 with Manager() as manager:
14 d = manager.dict()
15 processes = []
16 for i in range(5):
17 p = Process(target=worker, args=(d, f"k{i}", i*i))
18 p.start()
19 processes.append(p)
20 for p in processes:
21 p.join()
22 print("Shared dict:", dict(d))Manageris handig om woordenboeken en lijsten te delen, maar bij elk gebruik wordt data tussen processen verstuurd en ispickle-conversie vereist. Daarom zal het frequent bijwerken van grote hoeveelheden data de verwerking vertragen.
Synchronisatiemechanismen: Hoe gebruik je Lock en Semaphore
Gebruik Lock of Semaphore om gelijktijdige toegang tot gedeelde resources te regelen. Je kunt ze beknopt gebruiken met de with-statement.
1# Explanation:
2# Demonstrates using Lock to prevent simultaneous access to a critical section.
3# Locks are necessary when shared resources are not atomic.
4
5from multiprocessing import Process, Lock, Value
6
7def safe_add(counter, lock):
8 for _ in range(10000):
9 with lock:
10 counter.value += 1
11
12if __name__ == "__main__":
13 lock = Lock()
14 counter = Value('i', 0)
15 p1 = Process(target=safe_add, args=(counter, lock))
16 p2 = Process(target=safe_add, args=(counter, lock))
17 p1.start(); p2.start()
18 p1.join(); p2.join()
19 print("Counter:", counter.value)- Locks voorkomen datastrijd, maar als het vergrendelde gebied te groot is, zal de prestaties van parallelle verwerking afnemen. Alleen de noodzakelijke delen moeten als een kritieke sectie worden beschermd.
Verschillen tussen fork op UNIX en gedrag op Windows
Op UNIX-systemen worden processen standaard gedupliceerd met fork, waardoor copy-on-write van geheugen efficiënt is. Windows start processen met spawn (dat modules opnieuw importeert), dus moet je letten op bescherming van het entry point en globale initialisatie.
1# Explanation: Check start method (fork/spawn) and set it if needed.
2# Useful for debugging platform-dependent behavior.
3
4from multiprocessing import get_start_method, set_start_method
5
6if __name__ == "__main__":
7 print("Start method:", get_start_method())
8
9 # uncomment to force spawn on Unix for testing
10 # set_start_method('spawn')set_start_methodkan slechts één keer aan het begin van je programma worden aangeroepen. Het is veiliger om dit niet willekeurig in libraries aan te passen.
Praktisch voorbeeld: Benchmarken van CPU-intensieve taken (vergelijking)
Hieronder staat een script dat simpelweg vergelijkt hoeveel sneller verwerking kan zijn met parallelisatie via multiprocessing. Hier gebruiken we Pool.
1# Explanation:
2# Compare sequential vs parallel execution times for CPU-bound task.
3# Helps understand speedup and overhead.
4
5import time
6from multiprocessing import Pool, cpu_count
7import math
8
9def heavy_task(n):
10 s = 0
11 for i in range(1, n):
12 s += math.sqrt(i)
13 return s
14
15def run_sequential(nums):
16 return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19 with Pool(processes=cpu_count()) as p:
20 return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23 nums = [2000000] * 8 # heavy tasks
24 t0 = time.time()
25 run_sequential(nums)
26 seq = time.time() - t0
27 t1 = time.time()
28 run_parallel(nums)
29 par = time.time() - t1
30 print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")- Dit voorbeeld laat zien dat afhankelijk van de taaklast en het aantal processen, parallelisatie soms door overhead niet effectief is. Hoe groter (zwaarder) en onafhankelijker de taken zijn, hoe groter het voordeel.
Belangrijke basisregels
Hieronder staan de basispunten voor veilig en efficiënt gebruik van multiprocessing.
- Op Windows worden modules opnieuw geïmporteerd wanneer sub-processen starten, dus je moet het entry point van je script beschermen met
if __name__ == "__main__":. - Interprocescommunicatie wordt geserialiseerd (met
pickle-conversie), waardoor het overdragen van grote objecten duur wordt. - Aangezien
multiprocessingprocessen aanmaakt, wordt het aantal processen meestal bepaald aan de hand vanmultiprocessing.cpu_count(). - Het aanmaken van een andere
Poolbinnen een worker wordt complex, dus je moet het nesten vanPool-instanties zoveel mogelijk vermijden. - Aangezien uitzonderingen die zich voordoen in subprocessen moeilijk te detecteren zijn vanuit het hoofdproces, is het noodzakelijk om expliciet logging en foutafhandeling te implementeren.
- Stel het aantal processen in naar het aantal CPU's, en overweeg threads voor I/O-intensieve taken.
Praktisch ontwerpadvies
Hieronder staan enkele nuttige concepten en patronen voor het ontwerpen van parallelle verwerking.
- Het is efficiënt om processen op te splitsen in rollen zoals inlezen (I/O), voorbewerking (multi-CPU) en samenvoeging (serie) via 'pipelining'.
- Om debuggen te vereenvoudigen, controleer je eerst de werking in een enkel proces voordat je paralleliseert.
- Voor logging kun je loguitvoer per proces scheiden (bijvoorbeeld de PID in de bestandsnaam zetten), zodat je problemen makkelijker kunt isoleren.
- Voorzie retry- en timeoutmechanismen zodat je veilig kunt herstellen als een proces vastloopt.
Samenvatting (Belangrijke punten die je direct kunt toepassen)
Parallelle verwerking is krachtig, maar het is belangrijk om goed te letten op taaksoort, dataomvang en de kosten van interprocess-communicatie. multiprocessing is effectief voor CPU-intensieve verwerking, maar slechte ontwerpkeuzes of synchronisatieproblemen kunnen de prestaties verlagen. Als je de basisregels en -patronen volgt, kun je veilige en efficiënte parallelle programma’s maken.
Je kunt het bovenstaande artikel volgen met Visual Studio Code op ons YouTube-kanaal. Bekijk ook het YouTube-kanaal.