De `multiprocessing`-module in Python

De `multiprocessing`-module in Python

Dit artikel legt de multiprocessing-module in Python uit.

Dit artikel geeft praktische tips voor het schrijven van veilige en efficiënte parallelle code met de multiprocessing-module.

YouTube Video

De multiprocessing-module in Python

Basisprincipes: Waarom multiprocessing gebruiken?

multiprocessing maakt procesgebaseerde parallelisatie mogelijk, zodat je CPU-intensieve taken kunt paralleliseren zonder beperkt te worden door de GIL (Global Interpreter Lock) van Python. Voor I/O-intensieve taken zijn threading of asyncio vaak eenvoudiger en geschikter.

Eenvoudig gebruik van Process

Hier volgt eerst een eenvoudig voorbeeld van het uitvoeren van een functie in een apart proces met behulp van Process. Dit laat zien hoe je een proces start, wacht tot het klaar is en argumenten doorgeeft.

 1# Explanation:
 2# This example starts a separate process to run `worker` which prints messages.
 3# It demonstrates starting, joining, and passing arguments.
 4
 5from multiprocessing import Process
 6import time
 7
 8def worker(name, delay):
 9    # English comment in code per user's preference
10    for i in range(3):
11        print(f"Worker {name}: iteration {i}")
12        time.sleep(delay)
13
14if __name__ == "__main__":
15    p = Process(target=worker, args=("A", 0.5))
16    p.start()
17    print("Main: waiting for worker to finish")
18    p.join()
19    print("Main: worker finished")
  • Deze code toont het proces waarbij het hoofdproces een subproces worker start en wacht tot het voltooid is met join(). Je kunt argumenten doorgeven via args.

Eenvoudige parallelisatie met Pool (high-level API)

Pool.map is handig als je dezelfde functie wilt toepassen op meerdere onafhankelijke taken. Deze beheert de werkprocessen intern voor je.

 1# Explanation:
 2# Use Pool.map to parallelize a CPU-bound function across available processes.
 3# Good for "embarrassingly parallel" workloads.
 4
 5from multiprocessing import Pool, cpu_count
 6import math
 7import time
 8
 9def is_prime(n):
10    # Check primality (inefficient but CPU-heavy for demo)
11    if n < 2:
12        return False
13    for i in range(2, int(math.sqrt(n)) + 1):
14        if n % i == 0:
15            return False
16    return True
17
18if __name__ == "__main__":
19    nums = [10_000_000 + i for i in range(50)]
20    start = time.time()
21    with Pool(processes=cpu_count()) as pool:
22        results = pool.map(is_prime, nums)
23    end = time.time()
24    print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")
  • Pool kan automatisch het aantal werkers regelen, en map geeft de resultaten terug in de oorspronkelijke volgorde.

Interprocess-communicatie: Producer/Consumer-patroon met Queue

Queue is een First-In-First-Out (FIFO) wachtrij die veilig objecten tussen processen overdraagt. Hieronder staan enkele typische patronen.

 1# Explanation:
 2# Demonstrates a producer putting items into a Queue
 3# and consumer reading them.
 4# This is useful for task pipelines between processes.
 5
 6from multiprocessing import Process, Queue
 7import time
 8import random
 9
10def producer(q, n):
11    for i in range(n):
12        item = f"item-{i}"
13        print("Producer: putting", item)
14        q.put(item)
15        time.sleep(random.random() * 0.5)
16    q.put(None)  # sentinel to signal consumer to stop
17
18def consumer(q):
19    while True:
20        item = q.get()
21        if item is None:
22            break
23        print("Consumer: got", item)
24        time.sleep(0.2)
25
26if __name__ == "__main__":
27    q = Queue()
28    p = Process(target=producer, args=(q, 5))
29    c = Process(target=consumer, args=(q,))
30    p.start()
31    c.start()
32    p.join()
33    c.join()
34    print("Main: done")
  • Queue maakt het mogelijk om veilig data tussen processen uit te wisselen. Het is gebruikelijk om een speciale waarde zoals None te gebruiken om beëindiging aan te geven.

Gedeeld geheugen: Value en Array

Je kunt Value en Array gebruiken als je kleine nummers of arrays tussen processen wilt delen. Je moet sloten (locks) gebruiken om conflicten te voorkomen.

 1# Explanation:
 2# Use Value to share a single integer counter
 3# and Array for a small numeric array.
 4# Show how to use a Lock to avoid race conditions.
 5
 6from multiprocessing import Process, Value, Array, Lock
 7import time
 8
 9def increment(counter, lock, times):
10    for _ in range(times):
11        with lock:
12            counter.value += 1
13
14def update_array(arr):
15    for i in range(len(arr)):
16        arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19    lock = Lock()
20    counter = Value('i', 0)  # 'i' = signed int
21    shared_arr = Array('i', [0, 0, 0])
22
23    p1 = Process(target=increment, args=(counter, lock, 1000))
24    p2 = Process(target=increment, args=(counter, lock, 1000))
25    a = Process(target=update_array, args=(shared_arr,))
26
27    p1.start(); p2.start(); a.start()
28    p1.join(); p2.join(); a.join()
29
30    print("Counter:", counter.value)
31    print("Array:", list(shared_arr))
  • Value en Array delen data tussen processen via low-level mechanismen (gedeeld geheugen op C-niveau), niet via Python zelf. Daarom is het geschikt voor het snel lezen en schrijven van kleine hoeveelheden gegevens, maar niet geschikt voor het verwerken van grote hoeveelheden gegevens..

Geavanceerd delen: Gedeelde objecten (dicts, lijsten) met Manager

Als je flexibelere gedeelde objecten zoals lijsten of dictionaries wilt gebruiken, gebruik dan Manager().

 1# Explanation:
 2# Manager provides proxy objects like dict/list
 3# that can be shared across processes.
 4# Good for moderate-size shared state and easier programming model.
 5
 6from multiprocessing import Process, Manager
 7import time
 8
 9def worker(shared_dict, key, value):
10    shared_dict[key] = value
11
12if __name__ == "__main__":
13    with Manager() as manager:
14        d = manager.dict()
15        processes = []
16        for i in range(5):
17            p = Process(target=worker, args=(d, f"k{i}", i*i))
18            p.start()
19            processes.append(p)
20        for p in processes:
21            p.join()
22        print("Shared dict:", dict(d))
  • Manager is handig om woordenboeken en lijsten te delen, maar bij elk gebruik wordt data tussen processen verstuurd en is pickle-conversie vereist. Daarom zal het frequent bijwerken van grote hoeveelheden data de verwerking vertragen.

Synchronisatiemechanismen: Hoe gebruik je Lock en Semaphore

Gebruik Lock of Semaphore om gelijktijdige toegang tot gedeelde resources te regelen. Je kunt ze beknopt gebruiken met de with-statement.

 1# Explanation:
 2# Demonstrates using Lock to prevent simultaneous access to a critical section.
 3# Locks are necessary when shared resources are not atomic.
 4
 5from multiprocessing import Process, Lock, Value
 6
 7def safe_add(counter, lock):
 8    for _ in range(10000):
 9        with lock:
10            counter.value += 1
11
12if __name__ == "__main__":
13    lock = Lock()
14    counter = Value('i', 0)
15    p1 = Process(target=safe_add, args=(counter, lock))
16    p2 = Process(target=safe_add, args=(counter, lock))
17    p1.start(); p2.start()
18    p1.join(); p2.join()
19    print("Counter:", counter.value)
  • Locks voorkomen datastrijd, maar als het vergrendelde gebied te groot is, zal de prestaties van parallelle verwerking afnemen. Alleen de noodzakelijke delen moeten als een kritieke sectie worden beschermd.

Verschillen tussen fork op UNIX en gedrag op Windows

Op UNIX-systemen worden processen standaard gedupliceerd met fork, waardoor copy-on-write van geheugen efficiënt is. Windows start processen met spawn (dat modules opnieuw importeert), dus moet je letten op bescherming van het entry point en globale initialisatie.

 1# Explanation: Check start method (fork/spawn) and set it if needed.
 2# Useful for debugging platform-dependent behavior.
 3
 4from multiprocessing import get_start_method, set_start_method
 5
 6if __name__ == "__main__":
 7    print("Start method:", get_start_method())
 8
 9    # uncomment to force spawn on Unix for testing
10    # set_start_method('spawn')
  • set_start_method kan slechts één keer aan het begin van je programma worden aangeroepen. Het is veiliger om dit niet willekeurig in libraries aan te passen.

Praktisch voorbeeld: Benchmarken van CPU-intensieve taken (vergelijking)

Hieronder staat een script dat simpelweg vergelijkt hoeveel sneller verwerking kan zijn met parallelisatie via multiprocessing. Hier gebruiken we Pool.

 1# Explanation:
 2# Compare sequential vs parallel execution times for CPU-bound task.
 3# Helps understand speedup and overhead.
 4
 5import time
 6from multiprocessing import Pool, cpu_count
 7import math
 8
 9def heavy_task(n):
10    s = 0
11    for i in range(1, n):
12        s += math.sqrt(i)
13    return s
14
15def run_sequential(nums):
16    return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19    with Pool(processes=cpu_count()) as p:
20        return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23    nums = [2000000] * 8  # heavy tasks
24    t0 = time.time()
25    run_sequential(nums)
26    seq = time.time() - t0
27    t1 = time.time()
28    run_parallel(nums)
29    par = time.time() - t1
30    print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")
  • Dit voorbeeld laat zien dat afhankelijk van de taaklast en het aantal processen, parallelisatie soms door overhead niet effectief is. Hoe groter (zwaarder) en onafhankelijker de taken zijn, hoe groter het voordeel.

Belangrijke basisregels

Hieronder staan de basispunten voor veilig en efficiënt gebruik van multiprocessing.

  • Op Windows worden modules opnieuw geïmporteerd wanneer sub-processen starten, dus je moet het entry point van je script beschermen met if __name__ == "__main__":.
  • Interprocescommunicatie wordt geserialiseerd (met pickle-conversie), waardoor het overdragen van grote objecten duur wordt.
  • Aangezien multiprocessing processen aanmaakt, wordt het aantal processen meestal bepaald aan de hand van multiprocessing.cpu_count().
  • Het aanmaken van een andere Pool binnen een worker wordt complex, dus je moet het nesten van Pool-instanties zoveel mogelijk vermijden.
  • Aangezien uitzonderingen die zich voordoen in subprocessen moeilijk te detecteren zijn vanuit het hoofdproces, is het noodzakelijk om expliciet logging en foutafhandeling te implementeren.
  • Stel het aantal processen in naar het aantal CPU's, en overweeg threads voor I/O-intensieve taken.

Praktisch ontwerpadvies

Hieronder staan enkele nuttige concepten en patronen voor het ontwerpen van parallelle verwerking.

  • Het is efficiënt om processen op te splitsen in rollen zoals inlezen (I/O), voorbewerking (multi-CPU) en samenvoeging (serie) via 'pipelining'.
  • Om debuggen te vereenvoudigen, controleer je eerst de werking in een enkel proces voordat je paralleliseert.
  • Voor logging kun je loguitvoer per proces scheiden (bijvoorbeeld de PID in de bestandsnaam zetten), zodat je problemen makkelijker kunt isoleren.
  • Voorzie retry- en timeoutmechanismen zodat je veilig kunt herstellen als een proces vastloopt.

Samenvatting (Belangrijke punten die je direct kunt toepassen)

Parallelle verwerking is krachtig, maar het is belangrijk om goed te letten op taaksoort, dataomvang en de kosten van interprocess-communicatie. multiprocessing is effectief voor CPU-intensieve verwerking, maar slechte ontwerpkeuzes of synchronisatieproblemen kunnen de prestaties verlagen. Als je de basisregels en -patronen volgt, kun je veilige en efficiënte parallelle programma’s maken.

Je kunt het bovenstaande artikel volgen met Visual Studio Code op ons YouTube-kanaal. Bekijk ook het YouTube-kanaal.

YouTube Video