Modul `multiprocessing` di Python

Modul `multiprocessing` di Python

Artikel ini menjelaskan modul multiprocessing di Python.

Artikel ini memperkenalkan tips praktis untuk menulis kode pemrosesan paralel yang aman dan efisien menggunakan modul multiprocessing.

YouTube Video

Modul multiprocessing di Python

Dasar-dasar: Mengapa menggunakan multiprocessing?

multiprocessing memungkinkan paralelisasi berbasis proses, sehingga Anda dapat menjalankan tugas CPU-bound secara paralel tanpa dibatasi oleh GIL (Global Interpreter Lock) Python. Untuk tugas I/O-bound, threading atau asyncio mungkin lebih mudah dan lebih sesuai.

Penggunaan sederhana Process

Pertama, berikut adalah contoh dasar menjalankan fungsi di proses terpisah menggunakan Process. Ini menunjukkan bagaimana memulai proses, menunggu hingga selesai, dan mengirim argumen.

 1# Explanation:
 2# This example starts a separate process to run `worker` which prints messages.
 3# It demonstrates starting, joining, and passing arguments.
 4
 5from multiprocessing import Process
 6import time
 7
 8def worker(name, delay):
 9    # English comment in code per user's preference
10    for i in range(3):
11        print(f"Worker {name}: iteration {i}")
12        time.sleep(delay)
13
14if __name__ == "__main__":
15    p = Process(target=worker, args=("A", 0.5))
16    p.start()
17    print("Main: waiting for worker to finish")
18    p.join()
19    print("Main: worker finished")
  • Kode ini menunjukkan alur ketika proses utama menjalankan proses anak worker dan menunggu hingga selesai menggunakan join(). Anda dapat mengirim argumen menggunakan args.

Paralelisasi sederhana dengan Pool (API tingkat tinggi)

Pool.map berguna saat Anda ingin menerapkan fungsi yang sama ke beberapa tugas independen. Modul ini mengelola proses worker secara internal untuk Anda.

 1# Explanation:
 2# Use Pool.map to parallelize a CPU-bound function across available processes.
 3# Good for "embarrassingly parallel" workloads.
 4
 5from multiprocessing import Pool, cpu_count
 6import math
 7import time
 8
 9def is_prime(n):
10    # Check primality (inefficient but CPU-heavy for demo)
11    if n < 2:
12        return False
13    for i in range(2, int(math.sqrt(n)) + 1):
14        if n % i == 0:
15            return False
16    return True
17
18if __name__ == "__main__":
19    nums = [10_000_000 + i for i in range(50)]
20    start = time.time()
21    with Pool(processes=cpu_count()) as pool:
22        results = pool.map(is_prime, nums)
23    end = time.time()
24    print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")
  • Pool dapat mengatur jumlah worker secara otomatis, dan map mengembalikan hasil sesuai urutan aslinya.

Komunikasi antar-proses: Pola Producer/Consumer menggunakan Queue

Queue adalah antrian First-In-First-Out (FIFO) yang secara aman mentransfer objek antar proses. Di bawah ini adalah beberapa pola tipikal.

 1# Explanation:
 2# Demonstrates a producer putting items into a Queue
 3# and consumer reading them.
 4# This is useful for task pipelines between processes.
 5
 6from multiprocessing import Process, Queue
 7import time
 8import random
 9
10def producer(q, n):
11    for i in range(n):
12        item = f"item-{i}"
13        print("Producer: putting", item)
14        q.put(item)
15        time.sleep(random.random() * 0.5)
16    q.put(None)  # sentinel to signal consumer to stop
17
18def consumer(q):
19    while True:
20        item = q.get()
21        if item is None:
22            break
23        print("Consumer: got", item)
24        time.sleep(0.2)
25
26if __name__ == "__main__":
27    q = Queue()
28    p = Process(target=producer, args=(q, 5))
29    c = Process(target=consumer, args=(q,))
30    p.start()
31    c.start()
32    p.join()
33    c.join()
34    print("Main: done")
  • Queue memungkinkan Anda mentransfer data secara aman antar-proses. Umum menggunakan nilai khusus seperti None untuk menandakan penghentian.

Memori bersama: Value dan Array

Gunakan Value dan Array jika Anda ingin berbagi nilai kecil atau array antar-proses. Anda perlu menggunakan kunci (lock) untuk menghindari konflik.

 1# Explanation:
 2# Use Value to share a single integer counter
 3# and Array for a small numeric array.
 4# Show how to use a Lock to avoid race conditions.
 5
 6from multiprocessing import Process, Value, Array, Lock
 7import time
 8
 9def increment(counter, lock, times):
10    for _ in range(times):
11        with lock:
12            counter.value += 1
13
14def update_array(arr):
15    for i in range(len(arr)):
16        arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19    lock = Lock()
20    counter = Value('i', 0)  # 'i' = signed int
21    shared_arr = Array('i', [0, 0, 0])
22
23    p1 = Process(target=increment, args=(counter, lock, 1000))
24    p2 = Process(target=increment, args=(counter, lock, 1000))
25    a = Process(target=update_array, args=(shared_arr,))
26
27    p1.start(); p2.start(); a.start()
28    p1.join(); p2.join(); a.join()
29
30    print("Counter:", counter.value)
31    print("Array:", list(shared_arr))
  • Value dan Array berbagi data antar-proses dengan mekanisme tingkat rendah (shared memory di level bahasa C), bukan Python itu sendiri. Oleh karena itu, cocok untuk membaca dan menulis sejumlah kecil data dengan cepat, tetapi tidak cocok untuk menangani sejumlah besar data..

Berbagi tingkat lanjut: Objek berbagi (dict, list) dengan Manager

Jika Anda ingin membagikan objek yang lebih fleksibel seperti list atau dictionary, gunakan Manager().

 1# Explanation:
 2# Manager provides proxy objects like dict/list
 3# that can be shared across processes.
 4# Good for moderate-size shared state and easier programming model.
 5
 6from multiprocessing import Process, Manager
 7import time
 8
 9def worker(shared_dict, key, value):
10    shared_dict[key] = value
11
12if __name__ == "__main__":
13    with Manager() as manager:
14        d = manager.dict()
15        processes = []
16        for i in range(5):
17            p = Process(target=worker, args=(d, f"k{i}", i*i))
18            p.start()
19            processes.append(p)
20        for p in processes:
21            p.join()
22        print("Shared dict:", dict(d))
  • Manager memudahkan untuk berbagi kamus dan daftar, namun setiap akses mengirim data antar-proses dan memerlukan konversi pickle. Oleh karena itu, sering memperbarui sejumlah besar data akan memperlambat pemrosesan.

Mekanisme sinkronisasi: Cara menggunakan Lock dan Semaphore

Gunakan Lock atau Semaphore untuk mengontrol akses bersamaan ke sumber daya bersama. Anda dapat menggunakannya secara ringkas dengan pernyataan with.

 1# Explanation:
 2# Demonstrates using Lock to prevent simultaneous access to a critical section.
 3# Locks are necessary when shared resources are not atomic.
 4
 5from multiprocessing import Process, Lock, Value
 6
 7def safe_add(counter, lock):
 8    for _ in range(10000):
 9        with lock:
10            counter.value += 1
11
12if __name__ == "__main__":
13    lock = Lock()
14    counter = Value('i', 0)
15    p1 = Process(target=safe_add, args=(counter, lock))
16    p2 = Process(target=safe_add, args=(counter, lock))
17    p1.start(); p2.start()
18    p1.join(); p2.join()
19    print("Counter:", counter.value)
  • Kunci (locks) mencegah terjadinya data race, namun jika area yang dikunci terlalu besar, kinerja pemrosesan paralel akan menurun. Hanya bagian yang diperlukan saja yang harus dilindungi sebagai critical section.

Perbedaan antara fork di UNIX dan perilaku di Windows

Pada sistem UNIX, proses diduplikasi menggunakan fork secara default, sehingga memory copy-on-write menjadi efisien. Windows memulai proses dengan spawn (yang mengimpor ulang modul), jadi Anda harus berhati-hati dengan proteksi entry point dan inisialisasi global.

 1# Explanation: Check start method (fork/spawn) and set it if needed.
 2# Useful for debugging platform-dependent behavior.
 3
 4from multiprocessing import get_start_method, set_start_method
 5
 6if __name__ == "__main__":
 7    print("Start method:", get_start_method())
 8
 9    # uncomment to force spawn on Unix for testing
10    # set_start_method('spawn')
  • set_start_method hanya dapat dipanggil sekali di awal program Anda. Lebih aman untuk tidak mengubah ini secara sembarangan di dalam library.

Contoh praktis: Benchmarking beban kerja CPU-bound (perbandingan)

Di bawah ini adalah skrip yang membandingkan secara sederhana seberapa cepat pemrosesan dapat dilakukan dengan paralelisasi menggunakan multiprocessing. Di sini kami menggunakan Pool.

 1# Explanation:
 2# Compare sequential vs parallel execution times for CPU-bound task.
 3# Helps understand speedup and overhead.
 4
 5import time
 6from multiprocessing import Pool, cpu_count
 7import math
 8
 9def heavy_task(n):
10    s = 0
11    for i in range(1, n):
12        s += math.sqrt(i)
13    return s
14
15def run_sequential(nums):
16    return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19    with Pool(processes=cpu_count()) as p:
20        return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23    nums = [2000000] * 8  # heavy tasks
24    t0 = time.time()
25    run_sequential(nums)
26    seq = time.time() - t0
27    t1 = time.time()
28    run_parallel(nums)
29    par = time.time() - t1
30    print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")
  • Contoh ini menunjukkan bahwa, tergantung pada beban tugas dan jumlah proses, paralelisasi terkadang tidak efektif karena overhead. Semakin besar dan mandiri tugasnya, semakin besar keuntungannya.

Aturan dasar penting

Berikut adalah poin-poin dasar untuk menggunakan multiprocessing dengan aman dan efisien.

  • Di Windows, modul diimpor ulang saat proses anak dimulai, jadi Anda harus melindungi entry point script dengan if __name__ == "__main__":.
  • Komunikasi antar-proses diserialisasi (dengan konversi pickle), sehingga memindahkan objek besar menjadi mahal.
  • Karena multiprocessing membuat proses, biasanya jumlah proses ditentukan berdasarkan multiprocessing.cpu_count().
  • Membuat Pool lain di dalam pekerja menjadi kompleks, jadi sebaiknya hindari membuat Pool bertingkat (nested) sebisa mungkin.
  • Karena pengecualian yang terjadi di proses anak sulit dideteksi dari proses utama, perlu secara eksplisit mengimplementasikan logging dan penanganan kesalahan.
  • Atur jumlah proses sesuai dengan CPU, dan pertimbangkan menggunakan thread untuk tugas I/O-bound.

Saran desain praktis

Di bawah ini adalah beberapa konsep dan pola yang berguna untuk merancang pemrosesan paralel.

  • Efisien untuk membagi proses menjadi peran seperti pembacaan input (I/O), pra-pemrosesan (multi-CPU), dan agregasi (serial) melalui 'pipelining'.
  • Untuk mempermudah debugging, periksa operasi dalam satu proses terlebih dahulu sebelum melakukan paralelisasi.
  • Untuk logging, pisahkan output log per proses (misalnya, sertakan PID pada nama file) agar lebih mudah mengisolasi masalah.
  • Siapkan mekanisme retry dan timeout agar Anda bisa melakukan recovery jika sebuah proses macet.

Ringkasan (Poin-poin utama yang bisa langsung digunakan)

Pemrosesan paralel sangat kuat, tetapi penting untuk menilai dengan benar sifat tugas, ukuran data, dan biaya komunikasi antar-proses. multiprocessing efektif untuk pemrosesan CPU-bound, namun desain yang buruk atau kesalahan sinkronisasi dapat menurunkan performa. Jika Anda mengikuti aturan dan pola dasar, Anda dapat membangun program paralel yang aman dan efisien.

Anda dapat mengikuti artikel di atas menggunakan Visual Studio Code di saluran YouTube kami. Silakan periksa juga saluran YouTube kami.

YouTube Video