Moduł `multiprocessing` w Pythonie

Moduł `multiprocessing` w Pythonie

Ten artykuł wyjaśnia moduł multiprocessing w Pythonie.

Ten artykuł przedstawia praktyczne wskazówki dotyczące pisania bezpiecznego i wydajnego kodu z równoległym przetwarzaniem przy użyciu modułu multiprocessing.

YouTube Video

Moduł multiprocessing w Pythonie

Podstawy: Dlaczego używać multiprocessing?

multiprocessing pozwala na równoległość na poziomie procesów, dzięki czemu można równolegle wykonywać zadania obciążające procesor bez ograniczeń przez GIL (Global Interpreter Lock) w Pythonie. Dla zadań ograniczonych przez I/O, prostsze i bardziej odpowiednie mogą być threading lub asyncio.

Proste użycie Process

Oto podstawowy przykład uruchamiania funkcji w osobnym procesie za pomocą Process. Pokazuje to, jak uruchomić proces, poczekać na jego zakończenie i przekazać argumenty.

 1# Explanation:
 2# This example starts a separate process to run `worker` which prints messages.
 3# It demonstrates starting, joining, and passing arguments.
 4
 5from multiprocessing import Process
 6import time
 7
 8def worker(name, delay):
 9    # English comment in code per user's preference
10    for i in range(3):
11        print(f"Worker {name}: iteration {i}")
12        time.sleep(delay)
13
14if __name__ == "__main__":
15    p = Process(target=worker, args=("A", 0.5))
16    p.start()
17    print("Main: waiting for worker to finish")
18    p.join()
19    print("Main: worker finished")
  • Ten kod pokazuje, jak proces główny uruchamia proces potomny worker i czeka na jego zakończenie przy użyciu join(). Można przekazywać argumenty przy użyciu args.

Prosta paralelizacja przy użyciu Pool (interfejs wysokiego poziomu)

Pool.map jest przydatne, gdy chcesz zastosować tę samą funkcję do wielu niezależnych zadań. Automatycznie zarządza procesami roboczymi.

 1# Explanation:
 2# Use Pool.map to parallelize a CPU-bound function across available processes.
 3# Good for "embarrassingly parallel" workloads.
 4
 5from multiprocessing import Pool, cpu_count
 6import math
 7import time
 8
 9def is_prime(n):
10    # Check primality (inefficient but CPU-heavy for demo)
11    if n < 2:
12        return False
13    for i in range(2, int(math.sqrt(n)) + 1):
14        if n % i == 0:
15            return False
16    return True
17
18if __name__ == "__main__":
19    nums = [10_000_000 + i for i in range(50)]
20    start = time.time()
21    with Pool(processes=cpu_count()) as pool:
22        results = pool.map(is_prime, nums)
23    end = time.time()
24    print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")
  • Pool może automatycznie kontrolować liczbę procesów roboczych, a map zwraca wyniki w oryginalnej kolejności.

Komunikacja międzyprocesowa: Wzorzec producent/konsument z użyciem Queue

Queue to kolejka First-In-First-Out (FIFO), która bezpiecznie przekazuje obiekty pomiędzy procesami. Poniżej znajdują się niektóre typowe wzorce.

 1# Explanation:
 2# Demonstrates a producer putting items into a Queue
 3# and consumer reading them.
 4# This is useful for task pipelines between processes.
 5
 6from multiprocessing import Process, Queue
 7import time
 8import random
 9
10def producer(q, n):
11    for i in range(n):
12        item = f"item-{i}"
13        print("Producer: putting", item)
14        q.put(item)
15        time.sleep(random.random() * 0.5)
16    q.put(None)  # sentinel to signal consumer to stop
17
18def consumer(q):
19    while True:
20        item = q.get()
21        if item is None:
22            break
23        print("Consumer: got", item)
24        time.sleep(0.2)
25
26if __name__ == "__main__":
27    q = Queue()
28    p = Process(target=producer, args=(q, 5))
29    c = Process(target=consumer, args=(q,))
30    p.start()
31    c.start()
32    p.join()
33    c.join()
34    print("Main: done")
  • Queue umożliwia bezpieczne przekazywanie danych między procesami. Powszechną praktyką jest używanie specjalnej wartości, takiej jak None, aby zasygnalizować zakończenie działania.

Pamięć współdzielona: Value i Array

Można używać Value i Array, gdy chcesz współdzielić małe liczby lub tablice między procesami. Aby uniknąć konfliktów, musisz używać blokad (locków).

 1# Explanation:
 2# Use Value to share a single integer counter
 3# and Array for a small numeric array.
 4# Show how to use a Lock to avoid race conditions.
 5
 6from multiprocessing import Process, Value, Array, Lock
 7import time
 8
 9def increment(counter, lock, times):
10    for _ in range(times):
11        with lock:
12            counter.value += 1
13
14def update_array(arr):
15    for i in range(len(arr)):
16        arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19    lock = Lock()
20    counter = Value('i', 0)  # 'i' = signed int
21    shared_arr = Array('i', [0, 0, 0])
22
23    p1 = Process(target=increment, args=(counter, lock, 1000))
24    p2 = Process(target=increment, args=(counter, lock, 1000))
25    a = Process(target=update_array, args=(shared_arr,))
26
27    p1.start(); p2.start(); a.start()
28    p1.join(); p2.join(); a.join()
29
30    print("Counter:", counter.value)
31    print("Array:", list(shared_arr))
  • Value i Array współdzielą dane między procesami przy użyciu mechanizmów niskiego poziomu (pamięci współdzielonej na poziomie języka C), a nie samego Pythona. Dlatego nadaje się do szybkiego odczytu i zapisu niewielkich ilości danych, ale nie nadaje się do obsługi dużych ilości danych..

Zaawansowane współdzielenie: Obiekty współdzielone (słowniki, listy) z Manager

Jeśli chcesz używać bardziej elastycznych obiektów współdzielonych, takich jak listy lub słowniki, użyj Manager().

 1# Explanation:
 2# Manager provides proxy objects like dict/list
 3# that can be shared across processes.
 4# Good for moderate-size shared state and easier programming model.
 5
 6from multiprocessing import Process, Manager
 7import time
 8
 9def worker(shared_dict, key, value):
10    shared_dict[key] = value
11
12if __name__ == "__main__":
13    with Manager() as manager:
14        d = manager.dict()
15        processes = []
16        for i in range(5):
17            p = Process(target=worker, args=(d, f"k{i}", i*i))
18            p.start()
19            processes.append(p)
20        for p in processes:
21            p.join()
22        print("Shared dict:", dict(d))
  • Manager jest wygodny do współdzielenia słowników i list, ale każde odwołanie przesyła dane między procesami i wymaga konwersji za pomocą pickle. Dlatego częste aktualizowanie dużych ilości danych spowolni przetwarzanie.

Mechanizmy synchronizacji: Jak używać Lock i Semaphore

Używaj Lock lub Semaphore, aby kontrolować współbieżny dostęp do współdzielonych zasobów. Możesz ich używać w wygodny sposób za pomocą instrukcji with.

 1# Explanation:
 2# Demonstrates using Lock to prevent simultaneous access to a critical section.
 3# Locks are necessary when shared resources are not atomic.
 4
 5from multiprocessing import Process, Lock, Value
 6
 7def safe_add(counter, lock):
 8    for _ in range(10000):
 9        with lock:
10            counter.value += 1
11
12if __name__ == "__main__":
13    lock = Lock()
14    counter = Value('i', 0)
15    p1 = Process(target=safe_add, args=(counter, lock))
16    p2 = Process(target=safe_add, args=(counter, lock))
17    p1.start(); p2.start()
18    p1.join(); p2.join()
19    print("Counter:", counter.value)
  • Blokady zapobiegają konfliktom danych, ale jeśli zablokowany obszar jest zbyt duży, wydajność przetwarzania równoległego się zmniejszy. Tylko niezbędne części powinny być chronione jako sekcja krytyczna.

Różnice między fork w UNIX a zachowaniem w Windows

W systemach UNIX procesy są domyślnie duplikowane przez fork, co sprawia, że kopiowanie przy zapisie (copy-on-write) jest wydajne dla pamięci. Windows uruchamia procesy przy użyciu spawn (który ponownie importuje moduły), więc należy zadbać o ochronę punktu wejścia i inicjalizację globalną.

 1# Explanation: Check start method (fork/spawn) and set it if needed.
 2# Useful for debugging platform-dependent behavior.
 3
 4from multiprocessing import get_start_method, set_start_method
 5
 6if __name__ == "__main__":
 7    print("Start method:", get_start_method())
 8
 9    # uncomment to force spawn on Unix for testing
10    # set_start_method('spawn')
  • set_start_method można wywołać tylko raz na początku programu. Bezpieczniej jest nie zmieniać tego dowolnie w bibliotekach.

Praktyczny przykład: Testy porównawcze obciążeń procesora (porównanie)

Poniżej znajduje się skrypt, który w prosty sposób porównuje, o ile szybsze może być przetwarzanie dzięki równoległości przy użyciu multiprocessing. Tutaj używamy Pool.

 1# Explanation:
 2# Compare sequential vs parallel execution times for CPU-bound task.
 3# Helps understand speedup and overhead.
 4
 5import time
 6from multiprocessing import Pool, cpu_count
 7import math
 8
 9def heavy_task(n):
10    s = 0
11    for i in range(1, n):
12        s += math.sqrt(i)
13    return s
14
15def run_sequential(nums):
16    return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19    with Pool(processes=cpu_count()) as p:
20        return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23    nums = [2000000] * 8  # heavy tasks
24    t0 = time.time()
25    run_sequential(nums)
26    seq = time.time() - t0
27    t1 = time.time()
28    run_parallel(nums)
29    par = time.time() - t1
30    print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")
  • Ten przykład pokazuje, że przy określonym obciążeniu i liczbie procesów, równoległość czasem może być nieefektywna z powodu narzutu. Im większe („cięższe”) i bardziej niezależne zadania, tym większa korzyść.

Ważne podstawowe zasady

Poniżej przedstawiono podstawowe zasady bezpiecznego i wydajnego korzystania z multiprocessing.

  • W systemie Windows moduły są ponownie importowane przy uruchomieniu procesów potomnych, więc należy chronić punkt wejścia skryptu przy użyciu if __name__ == "__main__":.
  • Komunikacja międzyprocesowa jest serializowana (z użyciem konwersji pickle), więc przesyłanie dużych obiektów staje się kosztowne.
  • Ponieważ multiprocessing tworzy procesy, liczbę procesów zwykle ustala się na podstawie multiprocessing.cpu_count().
  • Tworzenie kolejnego Pool wewnątrz procesu roboczego staje się skomplikowane, dlatego należy unikać zagnieżdżania instancji Pool w miarę możliwości.
  • Ponieważ wyjątki występujące w procesach podrzędnych są trudne do wykrycia z poziomu procesu głównego, konieczne jest jawne zaimplementowanie logowania i obsługi błędów.
  • Ustal liczbę procesów zgodnie z liczbą rdzeni CPU i rozważ użycie wątków dla zadań ograniczonych przez I/O.

Praktyczne wskazówki dotyczące projektowania

Poniżej znajdują się przydatne koncepcje i wzorce do projektowania przetwarzania równoległego.

  • Efektywne jest podzielenie procesów na role, takie jak odczyt wejścia (I/O), wstępne przetwarzanie (wielordzeniowe) i agregacja (szeregowa) poprzez „pipelining”.
  • Aby uprościć debugowanie, najpierw sprawdź działanie programu w jednym procesie, zanim wdrożysz równoległość.
  • Dla logowania rozdzielaj pliki logów na proces (np. dodawaj PID do nazw plików), aby łatwiej izolować problemy.
  • Przygotuj mechanizmy ponownej próby i limitów czasu, aby bezpiecznie odzyskać kontrolę nawet w przypadku zawieszenia procesu.

Podsumowanie (Najważniejsze punkty do natychmiastowego użycia)

Równoległe przetwarzanie jest potężne, ale ważne jest właściwe ocenienie charakteru zadań, rozmiaru danych i kosztu komunikacji międzyprocesowej. multiprocessing jest skuteczny w zadaniach obciążających procesor, ale zła konstrukcja lub błędy w synchronizacji mogą zmniejszyć wydajność. Stosując podstawowe zasady i wzorce, można budować bezpieczne i wydajne programy równoległe.

Możesz śledzić ten artykuł, korzystając z Visual Studio Code na naszym kanale YouTube. Proszę również sprawdzić nasz kanał YouTube.

YouTube Video