Python'daki `multiprocessing` modülü

Python'daki `multiprocessing` modülü

Bu makale, Python'daki multiprocessing modülünü açıklamaktadır.

multiprocessing modülünü kullanarak güvenli ve verimli paralel işleme kodu yazmak için pratik ipuçları bu makalede tanıtılmaktadır.

YouTube Video

Python'daki multiprocessing modülü

Temel Bilgiler: Neden multiprocessing kullanılır?

multiprocessing, işlemler bazında paralelleştirme sağlar; böylece, Python'un GIL'i (Global Interpreter Lock) ile sınırlı kalmadan CPU ağırlıklı görevleri paralel olarak çalıştırabilirsiniz. G/Ç (I/O) ağırlıklı görevler için, threading veya asyncio daha basit ve daha uygun olabilir.

Process'in Basit Kullanımı

Öncelikle, bir fonksiyonu Process kullanarak ayrı bir işlemde çalıştırmanın temel bir örneği burada verilmiştir. Bu, bir işlemi başlatma, tamamlanmasını bekleme ve argümanları iletme süreçlerini göstermektedir.

 1# Explanation:
 2# This example starts a separate process to run `worker` which prints messages.
 3# It demonstrates starting, joining, and passing arguments.
 4
 5from multiprocessing import Process
 6import time
 7
 8def worker(name, delay):
 9    # English comment in code per user's preference
10    for i in range(3):
11        print(f"Worker {name}: iteration {i}")
12        time.sleep(delay)
13
14if __name__ == "__main__":
15    p = Process(target=worker, args=("A", 0.5))
16    p.start()
17    print("Main: waiting for worker to finish")
18    p.join()
19    print("Main: worker finished")
  • Bu kod, ana işlemin bir alt işlem (worker) başlattığı ve join() ile bitmesini beklediği akışı göstermektedir. Argümanları args kullanarak iletebilirsiniz.

Pool ile Basit Paralelleştirme (yüksek seviyeli API)

Aynı fonksiyonu birden fazla bağımsız göreve uygulamak istediğinizde Pool.map faydalıdır. Çalışan işlemleri sizin için dahili olarak yönetir.

 1# Explanation:
 2# Use Pool.map to parallelize a CPU-bound function across available processes.
 3# Good for "embarrassingly parallel" workloads.
 4
 5from multiprocessing import Pool, cpu_count
 6import math
 7import time
 8
 9def is_prime(n):
10    # Check primality (inefficient but CPU-heavy for demo)
11    if n < 2:
12        return False
13    for i in range(2, int(math.sqrt(n)) + 1):
14        if n % i == 0:
15            return False
16    return True
17
18if __name__ == "__main__":
19    nums = [10_000_000 + i for i in range(50)]
20    start = time.time()
21    with Pool(processes=cpu_count()) as pool:
22        results = pool.map(is_prime, nums)
23    end = time.time()
24    print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")
  • Pool, çalışan sayısını otomatik olarak kontrol edebilir ve map, sonuçları orijinal sırasıyla döndürür.

İşlemler arası iletişim: Queue kullanarak Üretici/Tüketici deseni

Queue, nesneleri süreçler arasında güvenli bir şekilde aktaran bir İlk Giren İlk Çıkar (FIFO) kuyruğudur. Aşağıda bazı tipik desenler bulunmaktadır.

 1# Explanation:
 2# Demonstrates a producer putting items into a Queue
 3# and consumer reading them.
 4# This is useful for task pipelines between processes.
 5
 6from multiprocessing import Process, Queue
 7import time
 8import random
 9
10def producer(q, n):
11    for i in range(n):
12        item = f"item-{i}"
13        print("Producer: putting", item)
14        q.put(item)
15        time.sleep(random.random() * 0.5)
16    q.put(None)  # sentinel to signal consumer to stop
17
18def consumer(q):
19    while True:
20        item = q.get()
21        if item is None:
22            break
23        print("Consumer: got", item)
24        time.sleep(0.2)
25
26if __name__ == "__main__":
27    q = Queue()
28    p = Process(target=producer, args=(q, 5))
29    c = Process(target=consumer, args=(q,))
30    p.start()
31    c.start()
32    p.join()
33    c.join()
34    print("Main: done")
  • Queue, işlemler arasında verileri güvenli şekilde iletmenizi sağlar. Sonlandırmayı belirtmek için None gibi özel bir değer kullanmak yaygındır.

Paylaşılan Bellek: Value ve Array

İşlemler arasında küçük sayılar veya diziler paylaşmak istediğinizde Value ve Array kullanabilirsiniz. Çakışmaları önlemek için kilitler kullanmanız gerekir.

 1# Explanation:
 2# Use Value to share a single integer counter
 3# and Array for a small numeric array.
 4# Show how to use a Lock to avoid race conditions.
 5
 6from multiprocessing import Process, Value, Array, Lock
 7import time
 8
 9def increment(counter, lock, times):
10    for _ in range(times):
11        with lock:
12            counter.value += 1
13
14def update_array(arr):
15    for i in range(len(arr)):
16        arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19    lock = Lock()
20    counter = Value('i', 0)  # 'i' = signed int
21    shared_arr = Array('i', [0, 0, 0])
22
23    p1 = Process(target=increment, args=(counter, lock, 1000))
24    p2 = Process(target=increment, args=(counter, lock, 1000))
25    a = Process(target=update_array, args=(shared_arr,))
26
27    p1.start(); p2.start(); a.start()
28    p1.join(); p2.join(); a.join()
29
30    print("Counter:", counter.value)
31    print("Array:", list(shared_arr))
  • Value ve Array, verileri işlemler arasında daha düşük seviyeli mekanizmalarla (C dilinde paylaşılan bellek), yani Python'un kendisiyle değil, paylaşır. Bu nedenle, küçük miktarlarda veriyi hızlıca okumak ve yazmak için uygundur, ancak büyük miktarda veriyle başa çıkmak için uygun değildir..

Gelişmiş Paylaşım: Manager ile Paylaşılan Nesneler (dict, list)

Liste veya sözlük gibi daha esnek ortak nesneler kullanmak istiyorsanız, Manager() kullanın.

 1# Explanation:
 2# Manager provides proxy objects like dict/list
 3# that can be shared across processes.
 4# Good for moderate-size shared state and easier programming model.
 5
 6from multiprocessing import Process, Manager
 7import time
 8
 9def worker(shared_dict, key, value):
10    shared_dict[key] = value
11
12if __name__ == "__main__":
13    with Manager() as manager:
14        d = manager.dict()
15        processes = []
16        for i in range(5):
17            p = Process(target=worker, args=(d, f"k{i}", i*i))
18            p.start()
19            processes.append(p)
20        for p in processes:
21            p.join()
22        print("Shared dict:", dict(d))
  • Manager, sözlükleri ve listeleri paylaşmak için kullanışlıdır, ancak her erişimde veriler süreçler arasında gönderilir ve pickle dönüştürmesi gerektirir. Bu nedenle, büyük miktarda verinin sık sık güncellenmesi işlemi yavaşlatacaktır.

Senkronizasyon Mekanizmaları: Lock ve Semaphore Nasıl Kullanılır?

Lock veya Semaphore kullanarak paylaşılan kaynaklara eşzamanlı erişimi kontrol edin. with deyimiyle bunları kolayca kullanabilirsiniz.

 1# Explanation:
 2# Demonstrates using Lock to prevent simultaneous access to a critical section.
 3# Locks are necessary when shared resources are not atomic.
 4
 5from multiprocessing import Process, Lock, Value
 6
 7def safe_add(counter, lock):
 8    for _ in range(10000):
 9        with lock:
10            counter.value += 1
11
12if __name__ == "__main__":
13    lock = Lock()
14    counter = Value('i', 0)
15    p1 = Process(target=safe_add, args=(counter, lock))
16    p2 = Process(target=safe_add, args=(counter, lock))
17    p1.start(); p2.start()
18    p1.join(); p2.join()
19    print("Counter:", counter.value)
  • Kilitler veri yarışlarını önler, ancak kilitli alan çok büyükse paralel işlem performansı azalır. Sadece gerekli bölümler kritik bölüm olarak korunmalıdır.

UNIX'te fork ile Windows'taki davranış arasındaki farklar

UNIX sistemlerinde, işlemler varsayılan olarak fork ile çoğaltılır ve bu, bellek verimliliği için copy-on-write sağlar. Windows, işlemleri spawn ile başlatır (modülleri yeniden içe aktarır), bu yüzden giriş noktası koruması ve global başlatma konusunda dikkatli olmalısınız.

 1# Explanation: Check start method (fork/spawn) and set it if needed.
 2# Useful for debugging platform-dependent behavior.
 3
 4from multiprocessing import get_start_method, set_start_method
 5
 6if __name__ == "__main__":
 7    print("Start method:", get_start_method())
 8
 9    # uncomment to force spawn on Unix for testing
10    # set_start_method('spawn')
  • set_start_method sadece programınızın başında bir kere çağrılabilir. Bunu kütüphaneler içinde rastgele değiştirmemek daha güvenlidir.

Uygulamalı Örnek: CPU ağırlıklı iş yükleri için karşılaştırmalı ölçüm (benchmarking)

Aşağıda, multiprocessing kullanılarak yapılan paralelleştirmenin işlemleri ne kadar hızlandırabileceğini basitçe karşılaştıran bir betik bulunmaktadır. Burada Pool kullanıyoruz.

 1# Explanation:
 2# Compare sequential vs parallel execution times for CPU-bound task.
 3# Helps understand speedup and overhead.
 4
 5import time
 6from multiprocessing import Pool, cpu_count
 7import math
 8
 9def heavy_task(n):
10    s = 0
11    for i in range(1, n):
12        s += math.sqrt(i)
13    return s
14
15def run_sequential(nums):
16    return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19    with Pool(processes=cpu_count()) as p:
20        return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23    nums = [2000000] * 8  # heavy tasks
24    t0 = time.time()
25    run_sequential(nums)
26    seq = time.time() - t0
27    t1 = time.time()
28    run_parallel(nums)
29    par = time.time() - t1
30    print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")
  • Bu örnek, görev yüküne ve işlem sayısına bağlı olarak, paralelleştirmenin bazen ek yük nedeniyle etkisiz kalabileceğini göstermektedir. Görevler ne kadar büyük (“ağır”) ve birbirinden bağımsız olursa, kazanç da o kadar fazla olur.

Önemli Temel Kurallar

Multiprocessing'i güvenli ve verimli bir şekilde kullanmak için temel noktalar aşağıda verilmiştir.

  • Windows'ta alt işlemler başlatıldığında modüller yeniden içe aktarılır; bu nedenle betiğinizin giriş noktasını if __name__ == "__main__": ile korumalısınız.
  • Süreçler arası iletişim seri hale getirilir (pickle dönüştürmesiyle), bu nedenle büyük nesnelerin aktarılması maliyetli hale gelir.
  • multiprocessing işlemler oluşturduğundan, işlem sayısını genellikle multiprocessing.cpu_count() ile belirlemek yaygındır.
  • Bir işçi içinde başka bir Pool oluşturmak karmaşık hale gelir, bu yüzden mümkün olduğunca Pool örneklerini iç içe kullanmaktan kaçınmalısınız.
  • Alt süreçlerde meydana gelen istisnalar ana süreçten tespit edilmesi zor olduğundan, açıkça kayıt ve hata yönetimi uygulanması gerekmektedir.
  • İşlem sayısını CPU'ya göre ayarlayın ve G/Ç (I/O) ağırlıklı görevler için thread (iş parçacığı) kullanımını değerlendirin.

Uygulamalı Tasarım Tavsiyeleri

Aşağıda paralel işlem tasarımı için bazı faydalı kavramlar ve desenler bulunmaktadır.

  • Süreçleri 'pipeline' ile giriş okuma (I/O), ön işleme (çoklu CPU) ve toplama (seri) gibi rollere ayırmak verimlidir.
  • Hata ayıklamayı basitleştirmek için, önce işlemi tek bir işlemde deneyin ve sonra paralelleştirin.
  • Loglama için, her işlem için ayrı log çıktıları kullanın (örn. dosya adlarında PID ekleyin) ve sorunları izole etmeyi kolaylaştırın.
  • Bir işlem takılsa bile güvenli şekilde kurtulabilmek için tekrar deneme ve zaman aşımı mekanizmaları hazırlayın.

Özet (Hemen Kullanabileceğiniz Temel Noktalar)

Paralel işlem gücü yüksektir, ancak görev türü, veri boyutu ve işlemler arası iletişim maliyetini doğru değerlendirmek önemlidir. multiprocessing, CPU ağırlıklı işlemler için etkilidir; ancak kötü tasarım veya senkronizasyon hataları performansı azaltabilir. Temel kurallara ve desenlere uyarsanız, güvenli ve verimli paralel programlar geliştirebilirsiniz.

Yukarıdaki makaleyi, YouTube kanalımızda Visual Studio Code'u kullanarak takip edebilirsiniz. Lütfen YouTube kanalını da kontrol edin.

YouTube Video