Modul `multiprocessing` dalam Python

Modul `multiprocessing` dalam Python

Artikel ini menerangkan modul multiprocessing dalam Python.

Artikel ini memperkenalkan tip praktikal untuk menulis kod pemprosesan selari yang selamat dan cekap menggunakan modul multiprocessing.

YouTube Video

Modul multiprocessing dalam Python

Asas: Mengapa guna multiprocessing?

multiprocessing membolehkan pemprosesan selari di peringkat proses, jadi anda boleh selarikan tugas yang bergantung pada CPU tanpa dihadkan oleh GIL (Global Interpreter Lock) Python. Untuk tugas yang terikat I/O, threading atau asyncio mungkin lebih mudah dan sesuai.

Penggunaan asas Process

Pertama, berikut adalah contoh asas menjalankan satu fungsi dalam proses berasingan menggunakan Process. Ini menunjukkan cara untuk memulakan proses, menunggu sehingga proses selesai, dan melepaskan argumen.

 1# Explanation:
 2# This example starts a separate process to run `worker` which prints messages.
 3# It demonstrates starting, joining, and passing arguments.
 4
 5from multiprocessing import Process
 6import time
 7
 8def worker(name, delay):
 9    # English comment in code per user's preference
10    for i in range(3):
11        print(f"Worker {name}: iteration {i}")
12        time.sleep(delay)
13
14if __name__ == "__main__":
15    p = Process(target=worker, args=("A", 0.5))
16    p.start()
17    print("Main: waiting for worker to finish")
18    p.join()
19    print("Main: worker finished")
  • Kod ini menunjukkan aliran di mana proses utama melancarkan proses anak worker dan menunggu sehingga selesai dengan menggunakan join(). Anda boleh menghantar argumen menggunakan args.

Penjelasan selari mudah dengan Pool (API peringkat tinggi)

Pool.map berguna apabila anda ingin melaksanakan fungsi yang sama pada beberapa tugasan yang bebas. Ia mengurus proses pekerja secara dalaman untuk anda.

 1# Explanation:
 2# Use Pool.map to parallelize a CPU-bound function across available processes.
 3# Good for "embarrassingly parallel" workloads.
 4
 5from multiprocessing import Pool, cpu_count
 6import math
 7import time
 8
 9def is_prime(n):
10    # Check primality (inefficient but CPU-heavy for demo)
11    if n < 2:
12        return False
13    for i in range(2, int(math.sqrt(n)) + 1):
14        if n % i == 0:
15            return False
16    return True
17
18if __name__ == "__main__":
19    nums = [10_000_000 + i for i in range(50)]
20    start = time.time()
21    with Pool(processes=cpu_count()) as pool:
22        results = pool.map(is_prime, nums)
23    end = time.time()
24    print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")
  • Pool boleh mengawal bilangan pekerja secara automatik, dan map mengembalikan hasil mengikut susunan asal.

Komunikasi antara proses: Corak Pengeluar/Pengguna menggunakan Queue

Queue ialah barisan First-In-First-Out (FIFO) yang memindahkan objek antara proses dengan selamat. Di bawah adalah beberapa corak lazim.

 1# Explanation:
 2# Demonstrates a producer putting items into a Queue
 3# and consumer reading them.
 4# This is useful for task pipelines between processes.
 5
 6from multiprocessing import Process, Queue
 7import time
 8import random
 9
10def producer(q, n):
11    for i in range(n):
12        item = f"item-{i}"
13        print("Producer: putting", item)
14        q.put(item)
15        time.sleep(random.random() * 0.5)
16    q.put(None)  # sentinel to signal consumer to stop
17
18def consumer(q):
19    while True:
20        item = q.get()
21        if item is None:
22            break
23        print("Consumer: got", item)
24        time.sleep(0.2)
25
26if __name__ == "__main__":
27    q = Queue()
28    p = Process(target=producer, args=(q, 5))
29    c = Process(target=consumer, args=(q,))
30    p.start()
31    c.start()
32    p.join()
33    c.join()
34    print("Main: done")
  • Queue membolehkan anda menghantar data antara proses dengan selamat. Adalah perkara biasa untuk menggunakan nilai khas seperti None untuk menandakan penamatan.

Memori bersama: Value dan Array

Anda boleh menggunakan Value dan Array jika anda mahu berkongsi nombor atau tatasusunan kecil di antara proses. Anda perlu menggunakan kunci (locks) untuk mengelakkan konflik.

 1# Explanation:
 2# Use Value to share a single integer counter
 3# and Array for a small numeric array.
 4# Show how to use a Lock to avoid race conditions.
 5
 6from multiprocessing import Process, Value, Array, Lock
 7import time
 8
 9def increment(counter, lock, times):
10    for _ in range(times):
11        with lock:
12            counter.value += 1
13
14def update_array(arr):
15    for i in range(len(arr)):
16        arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19    lock = Lock()
20    counter = Value('i', 0)  # 'i' = signed int
21    shared_arr = Array('i', [0, 0, 0])
22
23    p1 = Process(target=increment, args=(counter, lock, 1000))
24    p2 = Process(target=increment, args=(counter, lock, 1000))
25    a = Process(target=update_array, args=(shared_arr,))
26
27    p1.start(); p2.start(); a.start()
28    p1.join(); p2.join(); a.join()
29
30    print("Counter:", counter.value)
31    print("Array:", list(shared_arr))
  • Value dan Array berkongsi data antara proses menggunakan mekanisme peringkat rendah (memori bersama di peringkat bahasa C), bukan Python itu sendiri. Oleh itu, ia sesuai untuk membaca dan menulis sejumlah kecil data dengan cepat, tetapi tidak sesuai untuk mengendalikan sejumlah besar data..

Perkongsian lanjutan: Objek bersama (dict, list) dengan Manager

Jika anda mahu menggunakan objek bersama yang lebih fleksibel seperti senarai atau kamus, gunakan Manager().

 1# Explanation:
 2# Manager provides proxy objects like dict/list
 3# that can be shared across processes.
 4# Good for moderate-size shared state and easier programming model.
 5
 6from multiprocessing import Process, Manager
 7import time
 8
 9def worker(shared_dict, key, value):
10    shared_dict[key] = value
11
12if __name__ == "__main__":
13    with Manager() as manager:
14        d = manager.dict()
15        processes = []
16        for i in range(5):
17            p = Process(target=worker, args=(d, f"k{i}", i*i))
18            p.start()
19            processes.append(p)
20        for p in processes:
21            p.join()
22        print("Shared dict:", dict(d))
  • Manager sangat mudah untuk berkongsi kamus (dictionary) dan senarai (list), tetapi setiap akses menghantar data antara proses dan memerlukan penukaran pickle. Oleh itu, kemas kini sejumlah besar data secara kerap akan memperlahankan pemprosesan.

Mekanisme penyegerakan: Cara menggunakan Lock dan Semaphore

Gunakan Lock atau Semaphore untuk mengawal akses serentak ke atas sumber bersama. Anda boleh menggunakannya dengan ringkas menggunakan pernyataan with.

 1# Explanation:
 2# Demonstrates using Lock to prevent simultaneous access to a critical section.
 3# Locks are necessary when shared resources are not atomic.
 4
 5from multiprocessing import Process, Lock, Value
 6
 7def safe_add(counter, lock):
 8    for _ in range(10000):
 9        with lock:
10            counter.value += 1
11
12if __name__ == "__main__":
13    lock = Lock()
14    counter = Value('i', 0)
15    p1 = Process(target=safe_add, args=(counter, lock))
16    p2 = Process(target=safe_add, args=(counter, lock))
17    p1.start(); p2.start()
18    p1.join(); p2.join()
19    print("Counter:", counter.value)
  • Kunci menghalang perlumbaan data, tetapi jika kawasan yang dikunci terlalu besar, prestasi pemprosesan selari akan berkurangan. Hanya bahagian yang perlu sahaja harus dilindungi sebagai seksyen kritikal.

Perbezaan antara fork di UNIX dan tingkah laku di Windows

Dalam sistem UNIX, proses diduplikasi menggunakan fork secara lalai, membolehkan copy-on-write yang cekap untuk memori. Windows memulakan proses dengan spawn (yang memuat semula modul), jadi anda perlu berhati-hati dengan perlindungan titik masuk dan inisialisasi global.

 1# Explanation: Check start method (fork/spawn) and set it if needed.
 2# Useful for debugging platform-dependent behavior.
 3
 4from multiprocessing import get_start_method, set_start_method
 5
 6if __name__ == "__main__":
 7    print("Start method:", get_start_method())
 8
 9    # uncomment to force spawn on Unix for testing
10    # set_start_method('spawn')
  • set_start_method hanya boleh dipanggil sekali sahaja pada permulaan program anda. Lebih selamat jika anda tidak menukar tetapan ini secara sewenang-wenang dalam perpustakaan (libraries).

Contoh praktikal: Penanda aras beban tugas berasaskan CPU (perbandingan)

Di bawah ialah skrip yang secara ringkas membandingkan seberapa pantas pemprosesan boleh menjadi dengan penggunaan paralel melalui multiprocessing. Di sini, kami menggunakan Pool.

 1# Explanation:
 2# Compare sequential vs parallel execution times for CPU-bound task.
 3# Helps understand speedup and overhead.
 4
 5import time
 6from multiprocessing import Pool, cpu_count
 7import math
 8
 9def heavy_task(n):
10    s = 0
11    for i in range(1, n):
12        s += math.sqrt(i)
13    return s
14
15def run_sequential(nums):
16    return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19    with Pool(processes=cpu_count()) as p:
20        return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23    nums = [2000000] * 8  # heavy tasks
24    t0 = time.time()
25    run_sequential(nums)
26    seq = time.time() - t0
27    t1 = time.time()
28    run_parallel(nums)
29    par = time.time() - t1
30    print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")
  • Contoh ini menunjukkan bahawa bergantung kepada beban tugas dan bilangan proses, pemprosesan selari kadangkala tidak berkesan disebabkan oleh beban lebihan. Semakin besar (“berat”) dan bebas tugas-tugas tersebut, semakin besar keuntungannya.

Peraturan asas yang penting

Di bawah adalah perkara asas untuk menggunakan multiprocessing secara selamat dan berkesan.

  • Di Windows, modul akan diimport semula apabila proses anak bermula, jadi anda mesti melindungi titik masuk skrip dengan if __name__ == "__main__":.
  • Komunikasi antara proses disirikan (dengan penukaran pickle), jadi pemindahan objek besar menjadi mahal dari segi prestasi.
  • Oleh kerana multiprocessing mencipta proses, adalah biasa untuk menentukan bilangan proses berdasarkan multiprocessing.cpu_count().
  • Mewujudkan Pool lain dalam satu pekerja menjadi rumit, jadi anda harus mengelakkan sarang (nesting) contoh Pool sebanyak mungkin.
  • Oleh kerana pengecualian yang berlaku dalam proses anak sukar dikesan daripada proses utama, adalah perlu untuk melaksanakan log dan pengendalian ralat secara jelas.
  • Tetapkan bilangan proses mengikut CPU, dan pertimbangkan penggunaan threads untuk tugas I/O.

Nasihat rekabentuk praktikal

Di bawah adalah beberapa konsep dan corak yang berguna untuk mereka bentuk pemprosesan selari.

  • Adalah cekap jika memisahkan proses mengikut peranan seperti bacaan input (I/O), pra-pemprosesan (multi-CPU), dan pengagregatan (bersiri) melalui 'pipelining'.
  • Untuk memudahkan penyahpepijatan, semak dahulu operasi dalam satu proses sebelum membuat pemprosesan selari.
  • Untuk log, asingkan keluaran log mengikut proses (contohnya, sertakan PID dalam nama fail) bagi memudahkan pengesanan masalah.
  • Sediakan mekanisme ulang-cuba (retry) dan had masa (timeout) supaya anda boleh pulihkan dengan selamat walaupun proses tergantung.

Ringkasan (Perkara utama yang boleh anda gunakan terus)

Pemprosesan selari adalah berkuasa, namun penting untuk menilai sifat tugas, saiz data, dan kos komunikasi antara proses dengan betul. multiprocessing berkesan untuk pemprosesan yang terikat CPU, tetapi reka bentuk yang lemah atau kesilapan penyegerakan boleh mengurangkan prestasi. Jika anda mengikuti peraturan dan corak asas, anda boleh membina program selari yang selamat dan cekap.

Anda boleh mengikuti artikel di atas menggunakan Visual Studio Code di saluran YouTube kami. Sila lihat juga saluran YouTube kami.

YouTube Video