Das `multiprocessing`-Modul in Python

Das `multiprocessing`-Modul in Python

Dieser Artikel erklärt das multiprocessing-Modul in Python.

Dieser Artikel stellt praktische Tipps für das Schreiben von sicherem und effizientem Parallelverarbeitungscode mit dem multiprocessing-Modul vor.

YouTube Video

Das multiprocessing-Modul in Python

Grundlagen: Warum sollte man multiprocessing verwenden?

multiprocessing ermöglicht eine Parallelisierung auf Prozessebene, sodass Sie CPU-intensive Aufgaben parallelisieren können, ohne durch den Global Interpreter Lock (GIL) von Python eingeschränkt zu werden. Für I/O-lastige Aufgaben sind threading oder asyncio oft einfacher und besser geeignet.

Einfache Verwendung von Process

Hier zunächst ein einfaches Beispiel, wie man eine Funktion in einem separaten Prozess mit Process ausführt. Dies zeigt, wie ein Prozess gestartet, auf seine Fertigstellung gewartet und Argumente übergeben werden können.

 1# Explanation:
 2# This example starts a separate process to run `worker` which prints messages.
 3# It demonstrates starting, joining, and passing arguments.
 4
 5from multiprocessing import Process
 6import time
 7
 8def worker(name, delay):
 9    # English comment in code per user's preference
10    for i in range(3):
11        print(f"Worker {name}: iteration {i}")
12        time.sleep(delay)
13
14if __name__ == "__main__":
15    p = Process(target=worker, args=("A", 0.5))
16    p.start()
17    print("Main: waiting for worker to finish")
18    p.join()
19    print("Main: worker finished")
  • Dieser Code zeigt den Ablauf, bei dem der Hauptprozess einen Kindprozess (worker) startet und mit join() auf dessen Abschluss wartet. Sie können Argumente mit args übergeben.

Einfache Parallelisierung mit Pool (High-Level-API)

Pool.map ist nützlich, wenn Sie die gleiche Funktion auf mehrere unabhängige Aufgaben anwenden möchten. Die Verwaltung der Arbeitsprozesse erfolgt intern automatisch.

 1# Explanation:
 2# Use Pool.map to parallelize a CPU-bound function across available processes.
 3# Good for "embarrassingly parallel" workloads.
 4
 5from multiprocessing import Pool, cpu_count
 6import math
 7import time
 8
 9def is_prime(n):
10    # Check primality (inefficient but CPU-heavy for demo)
11    if n < 2:
12        return False
13    for i in range(2, int(math.sqrt(n)) + 1):
14        if n % i == 0:
15            return False
16    return True
17
18if __name__ == "__main__":
19    nums = [10_000_000 + i for i in range(50)]
20    start = time.time()
21    with Pool(processes=cpu_count()) as pool:
22        results = pool.map(is_prime, nums)
23    end = time.time()
24    print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")
  • Pool steuert die Anzahl der Worker automatisch, und map liefert die Ergebnisse in der ursprünglichen Reihenfolge zurück.

Interprozesskommunikation: Produzent/Konsument-Muster mit Queue

Queue ist eine First-In-First-Out (FIFO)-Warteschlange, die Objekte sicher zwischen Prozessen überträgt. Im Folgenden sind einige typische Muster aufgeführt.

 1# Explanation:
 2# Demonstrates a producer putting items into a Queue
 3# and consumer reading them.
 4# This is useful for task pipelines between processes.
 5
 6from multiprocessing import Process, Queue
 7import time
 8import random
 9
10def producer(q, n):
11    for i in range(n):
12        item = f"item-{i}"
13        print("Producer: putting", item)
14        q.put(item)
15        time.sleep(random.random() * 0.5)
16    q.put(None)  # sentinel to signal consumer to stop
17
18def consumer(q):
19    while True:
20        item = q.get()
21        if item is None:
22            break
23        print("Consumer: got", item)
24        time.sleep(0.2)
25
26if __name__ == "__main__":
27    q = Queue()
28    p = Process(target=producer, args=(q, 5))
29    c = Process(target=consumer, args=(q,))
30    p.start()
31    c.start()
32    p.join()
33    c.join()
34    print("Main: done")
  • Queue ermöglicht das sichere Übergeben von Daten zwischen Prozessen. Es ist üblich, einen speziellen Wert wie None zu verwenden, um das Ende zu signalisieren.

Gemeinsam genutzter Speicher: Value und Array

Value und Array können verwendet werden, um kleine Zahlen oder Arrays zwischen Prozessen zu teilen. Sie müssen Locks verwenden, um Konflikte zu vermeiden.

 1# Explanation:
 2# Use Value to share a single integer counter
 3# and Array for a small numeric array.
 4# Show how to use a Lock to avoid race conditions.
 5
 6from multiprocessing import Process, Value, Array, Lock
 7import time
 8
 9def increment(counter, lock, times):
10    for _ in range(times):
11        with lock:
12            counter.value += 1
13
14def update_array(arr):
15    for i in range(len(arr)):
16        arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19    lock = Lock()
20    counter = Value('i', 0)  # 'i' = signed int
21    shared_arr = Array('i', [0, 0, 0])
22
23    p1 = Process(target=increment, args=(counter, lock, 1000))
24    p2 = Process(target=increment, args=(counter, lock, 1000))
25    a = Process(target=update_array, args=(shared_arr,))
26
27    p1.start(); p2.start(); a.start()
28    p1.join(); p2.join(); a.join()
29
30    print("Counter:", counter.value)
31    print("Array:", list(shared_arr))
  • Value und Array teilen Daten zwischen Prozessen mittels Low-Level-Mechanismen (Shared Memory auf C-Ebene), nicht auf Python-Ebene. Daher eignet es sich zum schnellen Lesen und Schreiben kleiner Datenmengen, ist jedoch für die Verarbeitung großer Datenmengen nicht geeignet..

Fortgeschrittenes Teilen: Gemeinsame Objekte (dicts, lists) mit Manager

Wenn Sie flexiblere gemeinsame Objekte wie Listen oder Dictionaries verwenden möchten, nutzen Sie Manager().

 1# Explanation:
 2# Manager provides proxy objects like dict/list
 3# that can be shared across processes.
 4# Good for moderate-size shared state and easier programming model.
 5
 6from multiprocessing import Process, Manager
 7import time
 8
 9def worker(shared_dict, key, value):
10    shared_dict[key] = value
11
12if __name__ == "__main__":
13    with Manager() as manager:
14        d = manager.dict()
15        processes = []
16        for i in range(5):
17            p = Process(target=worker, args=(d, f"k{i}", i*i))
18            p.start()
19            processes.append(p)
20        for p in processes:
21            p.join()
22        print("Shared dict:", dict(d))
  • Manager ist praktisch, um Dictionaries und Listen zu teilen, aber jeder Zugriff sendet Daten zwischen den Prozessen und erfordert eine pickle-Umwandlung. Daher verlangsamt das häufige Aktualisieren großer Datenmengen die Verarbeitung.

Synchronisationsmechanismen: Die Verwendung von Lock und Semaphore

Verwenden Sie Lock oder Semaphore, um gleichzeitigen Zugriff auf gemeinsame Ressourcen zu steuern. Sie können diese mit der with-Anweisung elegant einsetzen.

 1# Explanation:
 2# Demonstrates using Lock to prevent simultaneous access to a critical section.
 3# Locks are necessary when shared resources are not atomic.
 4
 5from multiprocessing import Process, Lock, Value
 6
 7def safe_add(counter, lock):
 8    for _ in range(10000):
 9        with lock:
10            counter.value += 1
11
12if __name__ == "__main__":
13    lock = Lock()
14    counter = Value('i', 0)
15    p1 = Process(target=safe_add, args=(counter, lock))
16    p2 = Process(target=safe_add, args=(counter, lock))
17    p1.start(); p2.start()
18    p1.join(); p2.join()
19    print("Counter:", counter.value)
  • Sperren verhindern Datenkonflikte, aber wenn der gesperrte Bereich zu groß ist, nimmt die Parallelverarbeitungsleistung ab. Nur die notwendigen Teile sollten als kritischer Abschnitt geschützt werden.

Unterschiede zwischen fork auf UNIX und dem Verhalten unter Windows

Auf UNIX-Systemen werden Prozesse standardmäßig per fork dupliziert, wodurch Speicher effizient per Copy-on-Write geteilt wird. Unter Windows werden Prozesse mit spawn gestartet (dabei werden Module neu importiert), daher müssen Sie besonders auf den Programmeintritt und die Initialisierung globaler Variablen achten.

 1# Explanation: Check start method (fork/spawn) and set it if needed.
 2# Useful for debugging platform-dependent behavior.
 3
 4from multiprocessing import get_start_method, set_start_method
 5
 6if __name__ == "__main__":
 7    print("Start method:", get_start_method())
 8
 9    # uncomment to force spawn on Unix for testing
10    # set_start_method('spawn')
  • set_start_method kann nur einmal zu Beginn des Programms aufgerufen werden. Es ist sicherer, dies nicht beliebig innerhalb von Bibliotheken zu ändern.

Praktisches Beispiel: Benchmarking von CPU-intensiven Workloads (Vergleich)

Unten ist ein Skript, das einfach vergleicht, wie viel schneller die Verarbeitung durch Parallelisierung mit multiprocessing sein kann. Hier verwenden wir Pool.

 1# Explanation:
 2# Compare sequential vs parallel execution times for CPU-bound task.
 3# Helps understand speedup and overhead.
 4
 5import time
 6from multiprocessing import Pool, cpu_count
 7import math
 8
 9def heavy_task(n):
10    s = 0
11    for i in range(1, n):
12        s += math.sqrt(i)
13    return s
14
15def run_sequential(nums):
16    return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19    with Pool(processes=cpu_count()) as p:
20        return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23    nums = [2000000] * 8  # heavy tasks
24    t0 = time.time()
25    run_sequential(nums)
26    seq = time.time() - t0
27    t1 = time.time()
28    run_parallel(nums)
29    par = time.time() - t1
30    print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")
  • Dieses Beispiel zeigt, dass die Parallelisierung je nach Aufgabenlast und Anzahl der Prozesse durch Overhead manchmal ineffektiv sein kann. Je größer und unabhängiger die Aufgaben, desto größer der Vorteil durch Parallelisierung.

Wichtige Grundregeln

Unten sind die grundlegenden Punkte für die sichere und effiziente Nutzung von multiprocessing aufgeführt.

  • Unter Windows werden beim Start von Kindprozessen die Module neu importiert. Daher muss der Programmeinstieg mit if __name__ == "__main__": geschützt werden.
  • Die Kommunikation zwischen Prozessen wird serialisiert (mit pickle-Umwandlung), daher wird die Übertragung großer Objekte teuer.
  • Da multiprocessing echte Prozesse erzeugt, sollte die Anzahl der Prozesse meist an multiprocessing.cpu_count() orientiert werden.
  • Das Erstellen eines weiteren Pool innerhalb eines Workers wird komplex, daher sollten Sie geschachtelte Pool-Instanzen möglichst vermeiden.
  • Da Ausnahmen, die in Kindprozessen auftreten, vom Hauptprozess nur schwer erkannt werden können, ist es notwendig, Logging und Fehlerbehandlung explizit zu implementieren.
  • Wählen Sie die Anzahl der Prozesse passend zur CPU und überlegen Sie bei I/O-lastigen Aufgaben den Einsatz von Threads.

Praktische Gestaltungstipps

Im Folgenden sind einige nützliche Konzepte und Muster für die Gestaltung der Parallelverarbeitung aufgeführt.

  • Es ist effizient, Prozesse über ein 'Pipelining' in Rollen wie Eingabeverarbeitung (I/O), Vorverarbeitung (Multi-CPU) und Aggregation (seriell) zu unterteilen.
  • Um das Debugging zu vereinfachen, testen Sie die Ausführung zuerst in einem einzelnen Prozess, bevor Sie parallelisieren.
  • Beim Logging sollten Sie pro Prozess getrennte Protokolle schreiben (z. B. die PID im Dateinamen verwenden), um Probleme besser eingrenzen zu können.
  • Implementieren Sie Wiederholungs- und Timeout-Mechanismen, um sich abzusichern, falls ein Prozess hängen bleibt.

Zusammenfassung (praktische Schlüsselpunkte)

Parallelverarbeitung ist leistungsstark, aber es ist wichtig, die Art der Aufgaben, die Datenmenge und die Kosten der Interprozesskommunikation richtig einzuschätzen. multiprocessing ist für CPU-lastige Aufgaben effektiv, doch eine schlechte Architektur oder Synchronisationsfehler können die Leistung erheblich mindern. Wenn Sie die grundlegenden Regeln und Muster befolgen, können Sie sichere und effiziente Parallelprogramme erstellen.

Sie können den obigen Artikel mit Visual Studio Code auf unserem YouTube-Kanal verfolgen. Bitte schauen Sie sich auch den YouTube-Kanal an.

YouTube Video