`Multiprocessing`-modulen i Python
Den här artikeln förklarar multiprocessing-modulen i Python.
Denna artikel introducerar praktiska tips för att skriva säker och effektiv parallell bearbetningskod med hjälp av multiprocessing-modulen.
YouTube Video
Multiprocessing-modulen i Python
Grunder: Varför använda multiprocessing?
multiprocessing möjliggör parallellisering på processnivå, så du kan parallellisera CPU-intensiva uppgifter utan att begränsas av Pythons GIL (Global Interpreter Lock). För I/O-intensiva uppgifter kan threading eller asyncio vara enklare och mer lämpligt.
Enkel användning av Process
Först, här är ett grundläggande exempel på att köra en funktion i en separat process med Process. Detta visar hur man startar en process, väntar på att den ska slutföras och skickar argument.
1# Explanation:
2# This example starts a separate process to run `worker` which prints messages.
3# It demonstrates starting, joining, and passing arguments.
4
5from multiprocessing import Process
6import time
7
8def worker(name, delay):
9 # English comment in code per user's preference
10 for i in range(3):
11 print(f"Worker {name}: iteration {i}")
12 time.sleep(delay)
13
14if __name__ == "__main__":
15 p = Process(target=worker, args=("A", 0.5))
16 p.start()
17 print("Main: waiting for worker to finish")
18 p.join()
19 print("Main: worker finished")- Denna kod visar flödet där huvudprocessen startar en underprocess
workeroch väntar på att den ska slutföras med hjälp avjoin(). Du kan skicka argument medargs.
Enkel parallellisering med Pool (högnivå-API)
Pool.map är användbart när du vill applicera samma funktion på flera oberoende uppgifter. Den hanterar arbetsprocesser internt åt dig.
1# Explanation:
2# Use Pool.map to parallelize a CPU-bound function across available processes.
3# Good for "embarrassingly parallel" workloads.
4
5from multiprocessing import Pool, cpu_count
6import math
7import time
8
9def is_prime(n):
10 # Check primality (inefficient but CPU-heavy for demo)
11 if n < 2:
12 return False
13 for i in range(2, int(math.sqrt(n)) + 1):
14 if n % i == 0:
15 return False
16 return True
17
18if __name__ == "__main__":
19 nums = [10_000_000 + i for i in range(50)]
20 start = time.time()
21 with Pool(processes=cpu_count()) as pool:
22 results = pool.map(is_prime, nums)
23 end = time.time()
24 print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")Poolkan automatiskt styra antalet arbetare, ochmapreturnerar resultat i ursprunglig ordning.
Kommunikation mellan processer: Producer/Consumer-mönster med Queue
Queue är en först in, först ut (FIFO) kö som säkert överför objekt mellan processer. Nedan följer några typiska mönster.
1# Explanation:
2# Demonstrates a producer putting items into a Queue
3# and consumer reading them.
4# This is useful for task pipelines between processes.
5
6from multiprocessing import Process, Queue
7import time
8import random
9
10def producer(q, n):
11 for i in range(n):
12 item = f"item-{i}"
13 print("Producer: putting", item)
14 q.put(item)
15 time.sleep(random.random() * 0.5)
16 q.put(None) # sentinel to signal consumer to stop
17
18def consumer(q):
19 while True:
20 item = q.get()
21 if item is None:
22 break
23 print("Consumer: got", item)
24 time.sleep(0.2)
25
26if __name__ == "__main__":
27 q = Queue()
28 p = Process(target=producer, args=(q, 5))
29 c = Process(target=consumer, args=(q,))
30 p.start()
31 c.start()
32 p.join()
33 c.join()
34 print("Main: done")Queuegör att du säkert kan skicka data mellan processer. Det är vanligt att använda ett speciellt värde såsomNoneför att signalera avslut.
Delat minne: Value och Array
Du kan använda Value och Array när du vill dela små tal eller arrayer mellan processer. Du måste använda lås för att undvika konflikter.
1# Explanation:
2# Use Value to share a single integer counter
3# and Array for a small numeric array.
4# Show how to use a Lock to avoid race conditions.
5
6from multiprocessing import Process, Value, Array, Lock
7import time
8
9def increment(counter, lock, times):
10 for _ in range(times):
11 with lock:
12 counter.value += 1
13
14def update_array(arr):
15 for i in range(len(arr)):
16 arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19 lock = Lock()
20 counter = Value('i', 0) # 'i' = signed int
21 shared_arr = Array('i', [0, 0, 0])
22
23 p1 = Process(target=increment, args=(counter, lock, 1000))
24 p2 = Process(target=increment, args=(counter, lock, 1000))
25 a = Process(target=update_array, args=(shared_arr,))
26
27 p1.start(); p2.start(); a.start()
28 p1.join(); p2.join(); a.join()
29
30 print("Counter:", counter.value)
31 print("Array:", list(shared_arr))ValueochArraydelar data mellan processer med hjälp av lågnivåmekanismer (delat minne på C-språksnivå), inte via Python själv. Därför är det lämpligt för snabb läsning och skrivning av små datamängder, men det är inte lämpligt för hantering av stora datamängder.
Avancerad delning: Delade objekt (dictionaries, listor) med Manager
Om du vill använda mer flexibla delade objekt som listor eller dictionaries, använd Manager().
1# Explanation:
2# Manager provides proxy objects like dict/list
3# that can be shared across processes.
4# Good for moderate-size shared state and easier programming model.
5
6from multiprocessing import Process, Manager
7import time
8
9def worker(shared_dict, key, value):
10 shared_dict[key] = value
11
12if __name__ == "__main__":
13 with Manager() as manager:
14 d = manager.dict()
15 processes = []
16 for i in range(5):
17 p = Process(target=worker, args=(d, f"k{i}", i*i))
18 p.start()
19 processes.append(p)
20 for p in processes:
21 p.join()
22 print("Shared dict:", dict(d))Managerär bekvämt för att dela ordböcker och listor, men varje åtkomst skickar data mellan processer och kräverpickle-konvertering. Därför kommer frekventa uppdateringar av stora datamängder att sakta ner bearbetningen.
Synkroniseringsmekanismer: Hur man använder Lock och Semaphore
Använd Lock eller Semaphore för att styra samtidig åtkomst till delade resurser. Du kan använda dem koncist med with-satsen.
1# Explanation:
2# Demonstrates using Lock to prevent simultaneous access to a critical section.
3# Locks are necessary when shared resources are not atomic.
4
5from multiprocessing import Process, Lock, Value
6
7def safe_add(counter, lock):
8 for _ in range(10000):
9 with lock:
10 counter.value += 1
11
12if __name__ == "__main__":
13 lock = Lock()
14 counter = Value('i', 0)
15 p1 = Process(target=safe_add, args=(counter, lock))
16 p2 = Process(target=safe_add, args=(counter, lock))
17 p1.start(); p2.start()
18 p1.join(); p2.join()
19 print("Counter:", counter.value)- Lås förhindrar datakonflikter, men om det låsta området är för stort kommer prestandan vid parallell bearbetning att minska. Endast de nödvändiga delarna bör skyddas som en kritisk sektion.
Skillnader mellan fork på UNIX och beteendet på Windows
På UNIX-system dupliceras processer med fork som standard vilket gör copy-on-write för minne effektivt. Windows startar processer med spawn (vilket återimporterar moduler), så du måste vara försiktig med skydd av startpunkt och global initialisering.
1# Explanation: Check start method (fork/spawn) and set it if needed.
2# Useful for debugging platform-dependent behavior.
3
4from multiprocessing import get_start_method, set_start_method
5
6if __name__ == "__main__":
7 print("Start method:", get_start_method())
8
9 # uncomment to force spawn on Unix for testing
10 # set_start_method('spawn')set_start_methodkan bara anropas en gång i början av ditt program. Det är säkrare att inte ändra detta godtyckligt inne i bibliotek.
Praktiskt exempel: Benchmarking av CPU-intensiva arbetslaster (jämförelse)
Nedan finns ett skript som enkelt jämför hur mycket snabbare bearbetningen kan bli med parallellisering med hjälp av multiprocessing. Här använder vi Pool.
1# Explanation:
2# Compare sequential vs parallel execution times for CPU-bound task.
3# Helps understand speedup and overhead.
4
5import time
6from multiprocessing import Pool, cpu_count
7import math
8
9def heavy_task(n):
10 s = 0
11 for i in range(1, n):
12 s += math.sqrt(i)
13 return s
14
15def run_sequential(nums):
16 return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19 with Pool(processes=cpu_count()) as p:
20 return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23 nums = [2000000] * 8 # heavy tasks
24 t0 = time.time()
25 run_sequential(nums)
26 seq = time.time() - t0
27 t1 = time.time()
28 run_parallel(nums)
29 par = time.time() - t1
30 print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")- Detta exempel visar att beroende på uppgiftsbelastning och antal processer kan parallellisering ibland vara ineffektiv på grund av overhead. Ju större (“tyngre”) och mer oberoende uppgifterna är, desto större fördel.
Viktiga grundläggande regler
Nedan är de grundläggande punkterna för att använda multiprocessing säkert och effektivt.
- På Windows återimporteras moduler när underprocesser startar, så du måste skydda din scripts startpunkt med
if __name__ == "__main__":. - Kommunikation mellan processer är serialiserad (med
pickle-konvertering), så överföring av stora objekt blir kostsamt. - Eftersom
multiprocessingskapar processer är det vanligt att bestämma antalet processer baserat påmultiprocessing.cpu_count(). - Att skapa en annan
Poolinom en arbetare blir komplicerat, så du bör undvika att nästlaPool-instanser så mycket som möjligt. - Eftersom undantag som uppstår i barnprocesser är svåra att upptäcka från huvudprocessen, är det nödvändigt att uttryckligen implementera loggning och felhantering.
- Sätt antalet processer enligt CPU:n och överväg att använda trådar för I/O-intensiva uppgifter.
Praktiska designråd
Nedan följer några användbara koncept och mönster för att designa parallell bearbetning.
- Det är effektivt att dela upp processer i roller som inläsning (I/O), förbearbetning (multi-CPU) och aggregering (seriell) via 'pipelinering.'.
- För att förenkla felsökning, kontrollera först funktionen i en enda process innan du parallelliserar.
- För loggning, separera loggutmatning per process (t.ex. inkludera PID i filnamnet) för att underlätta felsökning.
- Förbered omförsök- och timeout-mekanismer så att du kan återhämta dig säkert även om en process hänger sig.
Sammanfattning (Viktiga punkter du kan använda direkt)
Parallell bearbetning är kraftfullt, men det är viktigt att korrekt bedöma uppgifternas natur, datamängdens storlek och kostnaden för kommunikation mellan processer. multiprocessing är effektivt för CPU-intensiv bearbetning, men dålig design eller synkroniseringsfel kan försämra prestandan. Om du följer de grundläggande reglerna och mönstren kan du bygga säkra och effektiva parallella program.
Du kan följa med i artikeln ovan med hjälp av Visual Studio Code på vår YouTube-kanal. Vänligen kolla även in YouTube-kanalen.