`Multiprocessing`-modulen i Python

`Multiprocessing`-modulen i Python

Den här artikeln förklarar multiprocessing-modulen i Python.

Denna artikel introducerar praktiska tips för att skriva säker och effektiv parallell bearbetningskod med hjälp av multiprocessing-modulen.

YouTube Video

Multiprocessing-modulen i Python

Grunder: Varför använda multiprocessing?

multiprocessing möjliggör parallellisering på processnivå, så du kan parallellisera CPU-intensiva uppgifter utan att begränsas av Pythons GIL (Global Interpreter Lock). För I/O-intensiva uppgifter kan threading eller asyncio vara enklare och mer lämpligt.

Enkel användning av Process

Först, här är ett grundläggande exempel på att köra en funktion i en separat process med Process. Detta visar hur man startar en process, väntar på att den ska slutföras och skickar argument.

 1# Explanation:
 2# This example starts a separate process to run `worker` which prints messages.
 3# It demonstrates starting, joining, and passing arguments.
 4
 5from multiprocessing import Process
 6import time
 7
 8def worker(name, delay):
 9    # English comment in code per user's preference
10    for i in range(3):
11        print(f"Worker {name}: iteration {i}")
12        time.sleep(delay)
13
14if __name__ == "__main__":
15    p = Process(target=worker, args=("A", 0.5))
16    p.start()
17    print("Main: waiting for worker to finish")
18    p.join()
19    print("Main: worker finished")
  • Denna kod visar flödet där huvudprocessen startar en underprocess worker och väntar på att den ska slutföras med hjälp av join(). Du kan skicka argument med args.

Enkel parallellisering med Pool (högnivå-API)

Pool.map är användbart när du vill applicera samma funktion på flera oberoende uppgifter. Den hanterar arbetsprocesser internt åt dig.

 1# Explanation:
 2# Use Pool.map to parallelize a CPU-bound function across available processes.
 3# Good for "embarrassingly parallel" workloads.
 4
 5from multiprocessing import Pool, cpu_count
 6import math
 7import time
 8
 9def is_prime(n):
10    # Check primality (inefficient but CPU-heavy for demo)
11    if n < 2:
12        return False
13    for i in range(2, int(math.sqrt(n)) + 1):
14        if n % i == 0:
15            return False
16    return True
17
18if __name__ == "__main__":
19    nums = [10_000_000 + i for i in range(50)]
20    start = time.time()
21    with Pool(processes=cpu_count()) as pool:
22        results = pool.map(is_prime, nums)
23    end = time.time()
24    print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")
  • Pool kan automatiskt styra antalet arbetare, och map returnerar resultat i ursprunglig ordning.

Kommunikation mellan processer: Producer/Consumer-mönster med Queue

Queue är en först in, först ut (FIFO) kö som säkert överför objekt mellan processer. Nedan följer några typiska mönster.

 1# Explanation:
 2# Demonstrates a producer putting items into a Queue
 3# and consumer reading them.
 4# This is useful for task pipelines between processes.
 5
 6from multiprocessing import Process, Queue
 7import time
 8import random
 9
10def producer(q, n):
11    for i in range(n):
12        item = f"item-{i}"
13        print("Producer: putting", item)
14        q.put(item)
15        time.sleep(random.random() * 0.5)
16    q.put(None)  # sentinel to signal consumer to stop
17
18def consumer(q):
19    while True:
20        item = q.get()
21        if item is None:
22            break
23        print("Consumer: got", item)
24        time.sleep(0.2)
25
26if __name__ == "__main__":
27    q = Queue()
28    p = Process(target=producer, args=(q, 5))
29    c = Process(target=consumer, args=(q,))
30    p.start()
31    c.start()
32    p.join()
33    c.join()
34    print("Main: done")
  • Queue gör att du säkert kan skicka data mellan processer. Det är vanligt att använda ett speciellt värde såsom None för att signalera avslut.

Delat minne: Value och Array

Du kan använda Value och Array när du vill dela små tal eller arrayer mellan processer. Du måste använda lås för att undvika konflikter.

 1# Explanation:
 2# Use Value to share a single integer counter
 3# and Array for a small numeric array.
 4# Show how to use a Lock to avoid race conditions.
 5
 6from multiprocessing import Process, Value, Array, Lock
 7import time
 8
 9def increment(counter, lock, times):
10    for _ in range(times):
11        with lock:
12            counter.value += 1
13
14def update_array(arr):
15    for i in range(len(arr)):
16        arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19    lock = Lock()
20    counter = Value('i', 0)  # 'i' = signed int
21    shared_arr = Array('i', [0, 0, 0])
22
23    p1 = Process(target=increment, args=(counter, lock, 1000))
24    p2 = Process(target=increment, args=(counter, lock, 1000))
25    a = Process(target=update_array, args=(shared_arr,))
26
27    p1.start(); p2.start(); a.start()
28    p1.join(); p2.join(); a.join()
29
30    print("Counter:", counter.value)
31    print("Array:", list(shared_arr))
  • Value och Array delar data mellan processer med hjälp av lågnivåmekanismer (delat minne på C-språksnivå), inte via Python själv. Därför är det lämpligt för snabb läsning och skrivning av små datamängder, men det är inte lämpligt för hantering av stora datamängder.

Avancerad delning: Delade objekt (dictionaries, listor) med Manager

Om du vill använda mer flexibla delade objekt som listor eller dictionaries, använd Manager().

 1# Explanation:
 2# Manager provides proxy objects like dict/list
 3# that can be shared across processes.
 4# Good for moderate-size shared state and easier programming model.
 5
 6from multiprocessing import Process, Manager
 7import time
 8
 9def worker(shared_dict, key, value):
10    shared_dict[key] = value
11
12if __name__ == "__main__":
13    with Manager() as manager:
14        d = manager.dict()
15        processes = []
16        for i in range(5):
17            p = Process(target=worker, args=(d, f"k{i}", i*i))
18            p.start()
19            processes.append(p)
20        for p in processes:
21            p.join()
22        print("Shared dict:", dict(d))
  • Manager är bekvämt för att dela ordböcker och listor, men varje åtkomst skickar data mellan processer och kräver pickle-konvertering. Därför kommer frekventa uppdateringar av stora datamängder att sakta ner bearbetningen.

Synkroniseringsmekanismer: Hur man använder Lock och Semaphore

Använd Lock eller Semaphore för att styra samtidig åtkomst till delade resurser. Du kan använda dem koncist med with-satsen.

 1# Explanation:
 2# Demonstrates using Lock to prevent simultaneous access to a critical section.
 3# Locks are necessary when shared resources are not atomic.
 4
 5from multiprocessing import Process, Lock, Value
 6
 7def safe_add(counter, lock):
 8    for _ in range(10000):
 9        with lock:
10            counter.value += 1
11
12if __name__ == "__main__":
13    lock = Lock()
14    counter = Value('i', 0)
15    p1 = Process(target=safe_add, args=(counter, lock))
16    p2 = Process(target=safe_add, args=(counter, lock))
17    p1.start(); p2.start()
18    p1.join(); p2.join()
19    print("Counter:", counter.value)
  • Lås förhindrar datakonflikter, men om det låsta området är för stort kommer prestandan vid parallell bearbetning att minska. Endast de nödvändiga delarna bör skyddas som en kritisk sektion.

Skillnader mellan fork på UNIX och beteendet på Windows

På UNIX-system dupliceras processer med fork som standard vilket gör copy-on-write för minne effektivt. Windows startar processer med spawn (vilket återimporterar moduler), så du måste vara försiktig med skydd av startpunkt och global initialisering.

 1# Explanation: Check start method (fork/spawn) and set it if needed.
 2# Useful for debugging platform-dependent behavior.
 3
 4from multiprocessing import get_start_method, set_start_method
 5
 6if __name__ == "__main__":
 7    print("Start method:", get_start_method())
 8
 9    # uncomment to force spawn on Unix for testing
10    # set_start_method('spawn')
  • set_start_method kan bara anropas en gång i början av ditt program. Det är säkrare att inte ändra detta godtyckligt inne i bibliotek.

Praktiskt exempel: Benchmarking av CPU-intensiva arbetslaster (jämförelse)

Nedan finns ett skript som enkelt jämför hur mycket snabbare bearbetningen kan bli med parallellisering med hjälp av multiprocessing. Här använder vi Pool.

 1# Explanation:
 2# Compare sequential vs parallel execution times for CPU-bound task.
 3# Helps understand speedup and overhead.
 4
 5import time
 6from multiprocessing import Pool, cpu_count
 7import math
 8
 9def heavy_task(n):
10    s = 0
11    for i in range(1, n):
12        s += math.sqrt(i)
13    return s
14
15def run_sequential(nums):
16    return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19    with Pool(processes=cpu_count()) as p:
20        return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23    nums = [2000000] * 8  # heavy tasks
24    t0 = time.time()
25    run_sequential(nums)
26    seq = time.time() - t0
27    t1 = time.time()
28    run_parallel(nums)
29    par = time.time() - t1
30    print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")
  • Detta exempel visar att beroende på uppgiftsbelastning och antal processer kan parallellisering ibland vara ineffektiv på grund av overhead. Ju större (“tyngre”) och mer oberoende uppgifterna är, desto större fördel.

Viktiga grundläggande regler

Nedan är de grundläggande punkterna för att använda multiprocessing säkert och effektivt.

  • På Windows återimporteras moduler när underprocesser startar, så du måste skydda din scripts startpunkt med if __name__ == "__main__":.
  • Kommunikation mellan processer är serialiserad (med pickle-konvertering), så överföring av stora objekt blir kostsamt.
  • Eftersom multiprocessing skapar processer är det vanligt att bestämma antalet processer baserat på multiprocessing.cpu_count().
  • Att skapa en annan Pool inom en arbetare blir komplicerat, så du bör undvika att nästla Pool-instanser så mycket som möjligt.
  • Eftersom undantag som uppstår i barnprocesser är svåra att upptäcka från huvudprocessen, är det nödvändigt att uttryckligen implementera loggning och felhantering.
  • Sätt antalet processer enligt CPU:n och överväg att använda trådar för I/O-intensiva uppgifter.

Praktiska designråd

Nedan följer några användbara koncept och mönster för att designa parallell bearbetning.

  • Det är effektivt att dela upp processer i roller som inläsning (I/O), förbearbetning (multi-CPU) och aggregering (seriell) via 'pipelinering.'.
  • För att förenkla felsökning, kontrollera först funktionen i en enda process innan du parallelliserar.
  • För loggning, separera loggutmatning per process (t.ex. inkludera PID i filnamnet) för att underlätta felsökning.
  • Förbered omförsök- och timeout-mekanismer så att du kan återhämta dig säkert även om en process hänger sig.

Sammanfattning (Viktiga punkter du kan använda direkt)

Parallell bearbetning är kraftfullt, men det är viktigt att korrekt bedöma uppgifternas natur, datamängdens storlek och kostnaden för kommunikation mellan processer. multiprocessing är effektivt för CPU-intensiv bearbetning, men dålig design eller synkroniseringsfel kan försämra prestandan. Om du följer de grundläggande reglerna och mönstren kan du bygga säkra och effektiva parallella program.

Du kan följa med i artikeln ovan med hjälp av Visual Studio Code på vår YouTube-kanal. Vänligen kolla även in YouTube-kanalen.

YouTube Video