`multiprocessing`-modulet i Python
Denne artikel forklarer multiprocessing-modulet i Python.
Denne artikel introducerer praktiske tips til at skrive sikker og effektiv parallel kode med multiprocessing-modulet.
YouTube Video
multiprocessing-modulet i Python
Grundlæggende: Hvorfor bruge multiprocessing?
multiprocessing muliggør parallelisering på procesniveau, så du kan parallelisere CPU-tunge opgaver uden at blive begrænset af Pythons GIL (Global Interpreter Lock). Til I/O-tunge opgaver kan threading eller asyncio være enklere og mere velegnet.
Simpel brug af Process
Først er her et grundlæggende eksempel på at køre en funktion i en separat proces med Process. Dette demonstrerer, hvordan du starter en proces, venter på at den afslutter, og overfører argumenter.
1# Explanation:
2# This example starts a separate process to run `worker` which prints messages.
3# It demonstrates starting, joining, and passing arguments.
4
5from multiprocessing import Process
6import time
7
8def worker(name, delay):
9 # English comment in code per user's preference
10 for i in range(3):
11 print(f"Worker {name}: iteration {i}")
12 time.sleep(delay)
13
14if __name__ == "__main__":
15 p = Process(target=worker, args=("A", 0.5))
16 p.start()
17 print("Main: waiting for worker to finish")
18 p.join()
19 print("Main: worker finished")- Denne kode viser flowet, hvor hovedprocessen starter en underproces
workerog venter på dens afslutning medjoin(). Du kan overføre argumenter ved hjælp afargs.
Simpel parallelisering med Pool (høj-niveau API)
Pool.map er nyttig, når du vil anvende den samme funktion på flere uafhængige opgaver. Den håndterer arbejdersprocesser internt for dig.
1# Explanation:
2# Use Pool.map to parallelize a CPU-bound function across available processes.
3# Good for "embarrassingly parallel" workloads.
4
5from multiprocessing import Pool, cpu_count
6import math
7import time
8
9def is_prime(n):
10 # Check primality (inefficient but CPU-heavy for demo)
11 if n < 2:
12 return False
13 for i in range(2, int(math.sqrt(n)) + 1):
14 if n % i == 0:
15 return False
16 return True
17
18if __name__ == "__main__":
19 nums = [10_000_000 + i for i in range(50)]
20 start = time.time()
21 with Pool(processes=cpu_count()) as pool:
22 results = pool.map(is_prime, nums)
23 end = time.time()
24 print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")Poolkan automatisk styre antallet af arbejdsprocesser, ogmapreturnerer resultaterne i den oprindelige rækkefølge.
Interproces-kommunikation: Producer/Consumer-mønster ved brug af Queue
Queue er en Først-ind-først-ud (FIFO) kø, der sikkert overfører objekter mellem processer. Nedenfor er nogle typiske mønstre.
1# Explanation:
2# Demonstrates a producer putting items into a Queue
3# and consumer reading them.
4# This is useful for task pipelines between processes.
5
6from multiprocessing import Process, Queue
7import time
8import random
9
10def producer(q, n):
11 for i in range(n):
12 item = f"item-{i}"
13 print("Producer: putting", item)
14 q.put(item)
15 time.sleep(random.random() * 0.5)
16 q.put(None) # sentinel to signal consumer to stop
17
18def consumer(q):
19 while True:
20 item = q.get()
21 if item is None:
22 break
23 print("Consumer: got", item)
24 time.sleep(0.2)
25
26if __name__ == "__main__":
27 q = Queue()
28 p = Process(target=producer, args=(q, 5))
29 c = Process(target=consumer, args=(q,))
30 p.start()
31 c.start()
32 p.join()
33 c.join()
34 print("Main: done")Queuegiver dig mulighed for sikkert at overføre data mellem processer. Det er almindeligt at bruge en særlig værdi såsomNonefor at signalere afslutning.
Delt hukommelse: Value og Array
Du kan bruge Value og Array, når du vil dele små tal eller arrays mellem processer. Du skal bruge låse for at undgå konflikter.
1# Explanation:
2# Use Value to share a single integer counter
3# and Array for a small numeric array.
4# Show how to use a Lock to avoid race conditions.
5
6from multiprocessing import Process, Value, Array, Lock
7import time
8
9def increment(counter, lock, times):
10 for _ in range(times):
11 with lock:
12 counter.value += 1
13
14def update_array(arr):
15 for i in range(len(arr)):
16 arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19 lock = Lock()
20 counter = Value('i', 0) # 'i' = signed int
21 shared_arr = Array('i', [0, 0, 0])
22
23 p1 = Process(target=increment, args=(counter, lock, 1000))
24 p2 = Process(target=increment, args=(counter, lock, 1000))
25 a = Process(target=update_array, args=(shared_arr,))
26
27 p1.start(); p2.start(); a.start()
28 p1.join(); p2.join(); a.join()
29
30 print("Counter:", counter.value)
31 print("Array:", list(shared_arr))ValueogArraydeler data mellem processer ved hjælp af lavniveau-mekanismer (delt hukommelse på C-sprogniveau), ikke via Python selv. Derfor er det velegnet til hurtigt at læse og skrive små mængder data, men det er ikke egnet til håndtering af store datamængder..
Avanceret deling: Delte objekter (dicts, lister) med Manager
Hvis du vil bruge mere fleksible delte objekter som lister eller ordbøger, kan du bruge Manager().
1# Explanation:
2# Manager provides proxy objects like dict/list
3# that can be shared across processes.
4# Good for moderate-size shared state and easier programming model.
5
6from multiprocessing import Process, Manager
7import time
8
9def worker(shared_dict, key, value):
10 shared_dict[key] = value
11
12if __name__ == "__main__":
13 with Manager() as manager:
14 d = manager.dict()
15 processes = []
16 for i in range(5):
17 p = Process(target=worker, args=(d, f"k{i}", i*i))
18 p.start()
19 processes.append(p)
20 for p in processes:
21 p.join()
22 print("Shared dict:", dict(d))Managerer praktisk til at dele ordbøger og lister, men hver adgang sender data mellem processer og kræverpickle-konvertering. Derfor vil hyppig opdatering af store datamængder nedsætte behandlingshastigheden.
Synkroniseringsmekanismer: Sådan bruger du Lock og Semaphore
Brug Lock eller Semaphore til at styre samtidig adgang til delte ressourcer. Du kan bruge dem enkelt med with-sætningen.
1# Explanation:
2# Demonstrates using Lock to prevent simultaneous access to a critical section.
3# Locks are necessary when shared resources are not atomic.
4
5from multiprocessing import Process, Lock, Value
6
7def safe_add(counter, lock):
8 for _ in range(10000):
9 with lock:
10 counter.value += 1
11
12if __name__ == "__main__":
13 lock = Lock()
14 counter = Value('i', 0)
15 p1 = Process(target=safe_add, args=(counter, lock))
16 p2 = Process(target=safe_add, args=(counter, lock))
17 p1.start(); p2.start()
18 p1.join(); p2.join()
19 print("Counter:", counter.value)- Låse forhindrer datakapløb, men hvis det låste område er for stort, vil ydeevnen ved parallel behandling falde. Kun de nødvendige dele bør beskyttes som en kritisk sektion.
Forskelle mellem fork på UNIX og adfærd på Windows
På UNIX-systemer duplikeres processer som standard ved brug af fork, hvilket gør copy-on-write effektivt for hukommelsen. Windows starter processer med spawn (som genimporterer moduler), så du skal være opmærksom på entry point-beskyttelse og global initialisering.
1# Explanation: Check start method (fork/spawn) and set it if needed.
2# Useful for debugging platform-dependent behavior.
3
4from multiprocessing import get_start_method, set_start_method
5
6if __name__ == "__main__":
7 print("Start method:", get_start_method())
8
9 # uncomment to force spawn on Unix for testing
10 # set_start_method('spawn')set_start_methodkan kun kaldes én gang i begyndelsen af dit program. Det er sikrere ikke at ændre dette vilkårligt i biblioteker.
Praktisk eksempel: Benchmark af CPU-tunge arbejdsbelastninger (sammenligning)
Nedenfor er et script, der enkelt sammenligner, hvor meget hurtigere behandling kan være med parallelisering ved brug af multiprocessing. Her bruger vi Pool.
1# Explanation:
2# Compare sequential vs parallel execution times for CPU-bound task.
3# Helps understand speedup and overhead.
4
5import time
6from multiprocessing import Pool, cpu_count
7import math
8
9def heavy_task(n):
10 s = 0
11 for i in range(1, n):
12 s += math.sqrt(i)
13 return s
14
15def run_sequential(nums):
16 return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19 with Pool(processes=cpu_count()) as p:
20 return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23 nums = [2000000] * 8 # heavy tasks
24 t0 = time.time()
25 run_sequential(nums)
26 seq = time.time() - t0
27 t1 = time.time()
28 run_parallel(nums)
29 par = time.time() - t1
30 print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")- Dette eksempel viser, at afhængigt af opgavens belastning og antallet af processer, kan parallelisering nogle gange være ineffektiv på grund af overhead. Jo større ('tungere') og jo mere uafhængige opgaverne er, desto større er fordelen.
Vigtige grundregler
Nedenfor er de grundlæggende punkter for at bruge multiprocessing sikkert og effektivt.
- På Windows genimporteres moduler, når underprocesser starter, så du skal beskytte dit scripts entry point med
if __name__ == "__main__":. - Kommunikation mellem processer er seriel (med
pickle-konvertering), så overførsel af store objekter bliver dyrt. - Da
multiprocessingopretter processer, er det almindeligt at bestemme antallet af processer baseret påmultiprocessing.cpu_count(). - Det bliver komplekst at oprette endnu en
Poolinden for en worker, så du bør så vidt muligt undgå at læggePool-instanser ind i hinanden. - Da undtagelser, der opstår i underprocesser, er svære at detektere fra hovedprocessen, er det nødvendigt eksplicit at implementere logning og fejlhåndtering.
- Indstil antallet af processer efter CPU'en, og overvej at bruge tråde til I/O-tunge opgaver.
Praktiske designråd
Nedenfor er nogle nyttige begreber og mønstre til design af parallel behandling.
- Det er effektivt at opdele processer i roller som inputlæsning (I/O), forbehandling (multi-CPU) og aggregering (seriel) via 'pipelining'.
- For at forenkle fejlfinding skal du først kontrollere funktionen i en enkelt proces, før du paralleliserer.
- For logning bør du adskille loguddata pr. proces (f.eks. inkluder PID i filnavne) for nemmere at isolere problemer.
- Forbered retry- og timeout-mekanismer, så du kan komme dig sikkert, selvom en proces hænger.
Opsummering (Nøglepunkter du straks kan anvende)
Parallel behandling er kraftfuld, men det er vigtigt korrekt at vurdere opgavernes karakter, datastørrelse og omkostningen ved interproces-kommunikation. multiprocessing er effektivt til CPU-tung behandling, men dårlig design eller synkroniseringsfejl kan forringe ydeevnen. Hvis du følger de grundlæggende regler og mønstre, kan du bygge sikre og effektive parallelle programmer.
Du kan følge med i ovenstående artikel ved hjælp af Visual Studio Code på vores YouTube-kanal. Husk også at tjekke YouTube-kanalen.