Modul `multiprocessing` dalam Python
Artikel ini menerangkan modul multiprocessing dalam Python.
Artikel ini memperkenalkan tip praktikal untuk menulis kod pemprosesan selari yang selamat dan cekap menggunakan modul multiprocessing.
YouTube Video
Modul multiprocessing dalam Python
Asas: Mengapa guna multiprocessing?
multiprocessing membolehkan pemprosesan selari di peringkat proses, jadi anda boleh selarikan tugas yang bergantung pada CPU tanpa dihadkan oleh GIL (Global Interpreter Lock) Python. Untuk tugas yang terikat I/O, threading atau asyncio mungkin lebih mudah dan sesuai.
Penggunaan asas Process
Pertama, berikut adalah contoh asas menjalankan satu fungsi dalam proses berasingan menggunakan Process. Ini menunjukkan cara untuk memulakan proses, menunggu sehingga proses selesai, dan melepaskan argumen.
1# Explanation:
2# This example starts a separate process to run `worker` which prints messages.
3# It demonstrates starting, joining, and passing arguments.
4
5from multiprocessing import Process
6import time
7
8def worker(name, delay):
9 # English comment in code per user's preference
10 for i in range(3):
11 print(f"Worker {name}: iteration {i}")
12 time.sleep(delay)
13
14if __name__ == "__main__":
15 p = Process(target=worker, args=("A", 0.5))
16 p.start()
17 print("Main: waiting for worker to finish")
18 p.join()
19 print("Main: worker finished")- Kod ini menunjukkan aliran di mana proses utama melancarkan proses anak
workerdan menunggu sehingga selesai dengan menggunakanjoin(). Anda boleh menghantar argumen menggunakanargs.
Penjelasan selari mudah dengan Pool (API peringkat tinggi)
Pool.map berguna apabila anda ingin melaksanakan fungsi yang sama pada beberapa tugasan yang bebas. Ia mengurus proses pekerja secara dalaman untuk anda.
1# Explanation:
2# Use Pool.map to parallelize a CPU-bound function across available processes.
3# Good for "embarrassingly parallel" workloads.
4
5from multiprocessing import Pool, cpu_count
6import math
7import time
8
9def is_prime(n):
10 # Check primality (inefficient but CPU-heavy for demo)
11 if n < 2:
12 return False
13 for i in range(2, int(math.sqrt(n)) + 1):
14 if n % i == 0:
15 return False
16 return True
17
18if __name__ == "__main__":
19 nums = [10_000_000 + i for i in range(50)]
20 start = time.time()
21 with Pool(processes=cpu_count()) as pool:
22 results = pool.map(is_prime, nums)
23 end = time.time()
24 print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")Poolboleh mengawal bilangan pekerja secara automatik, danmapmengembalikan hasil mengikut susunan asal.
Komunikasi antara proses: Corak Pengeluar/Pengguna menggunakan Queue
Queue ialah barisan First-In-First-Out (FIFO) yang memindahkan objek antara proses dengan selamat. Di bawah adalah beberapa corak lazim.
1# Explanation:
2# Demonstrates a producer putting items into a Queue
3# and consumer reading them.
4# This is useful for task pipelines between processes.
5
6from multiprocessing import Process, Queue
7import time
8import random
9
10def producer(q, n):
11 for i in range(n):
12 item = f"item-{i}"
13 print("Producer: putting", item)
14 q.put(item)
15 time.sleep(random.random() * 0.5)
16 q.put(None) # sentinel to signal consumer to stop
17
18def consumer(q):
19 while True:
20 item = q.get()
21 if item is None:
22 break
23 print("Consumer: got", item)
24 time.sleep(0.2)
25
26if __name__ == "__main__":
27 q = Queue()
28 p = Process(target=producer, args=(q, 5))
29 c = Process(target=consumer, args=(q,))
30 p.start()
31 c.start()
32 p.join()
33 c.join()
34 print("Main: done")Queuemembolehkan anda menghantar data antara proses dengan selamat. Adalah perkara biasa untuk menggunakan nilai khas sepertiNoneuntuk menandakan penamatan.
Memori bersama: Value dan Array
Anda boleh menggunakan Value dan Array jika anda mahu berkongsi nombor atau tatasusunan kecil di antara proses. Anda perlu menggunakan kunci (locks) untuk mengelakkan konflik.
1# Explanation:
2# Use Value to share a single integer counter
3# and Array for a small numeric array.
4# Show how to use a Lock to avoid race conditions.
5
6from multiprocessing import Process, Value, Array, Lock
7import time
8
9def increment(counter, lock, times):
10 for _ in range(times):
11 with lock:
12 counter.value += 1
13
14def update_array(arr):
15 for i in range(len(arr)):
16 arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19 lock = Lock()
20 counter = Value('i', 0) # 'i' = signed int
21 shared_arr = Array('i', [0, 0, 0])
22
23 p1 = Process(target=increment, args=(counter, lock, 1000))
24 p2 = Process(target=increment, args=(counter, lock, 1000))
25 a = Process(target=update_array, args=(shared_arr,))
26
27 p1.start(); p2.start(); a.start()
28 p1.join(); p2.join(); a.join()
29
30 print("Counter:", counter.value)
31 print("Array:", list(shared_arr))ValuedanArrayberkongsi data antara proses menggunakan mekanisme peringkat rendah (memori bersama di peringkat bahasa C), bukan Python itu sendiri. Oleh itu, ia sesuai untuk membaca dan menulis sejumlah kecil data dengan cepat, tetapi tidak sesuai untuk mengendalikan sejumlah besar data..
Perkongsian lanjutan: Objek bersama (dict, list) dengan Manager
Jika anda mahu menggunakan objek bersama yang lebih fleksibel seperti senarai atau kamus, gunakan Manager().
1# Explanation:
2# Manager provides proxy objects like dict/list
3# that can be shared across processes.
4# Good for moderate-size shared state and easier programming model.
5
6from multiprocessing import Process, Manager
7import time
8
9def worker(shared_dict, key, value):
10 shared_dict[key] = value
11
12if __name__ == "__main__":
13 with Manager() as manager:
14 d = manager.dict()
15 processes = []
16 for i in range(5):
17 p = Process(target=worker, args=(d, f"k{i}", i*i))
18 p.start()
19 processes.append(p)
20 for p in processes:
21 p.join()
22 print("Shared dict:", dict(d))Managersangat mudah untuk berkongsi kamus (dictionary) dan senarai (list), tetapi setiap akses menghantar data antara proses dan memerlukan penukaranpickle. Oleh itu, kemas kini sejumlah besar data secara kerap akan memperlahankan pemprosesan.
Mekanisme penyegerakan: Cara menggunakan Lock dan Semaphore
Gunakan Lock atau Semaphore untuk mengawal akses serentak ke atas sumber bersama. Anda boleh menggunakannya dengan ringkas menggunakan pernyataan with.
1# Explanation:
2# Demonstrates using Lock to prevent simultaneous access to a critical section.
3# Locks are necessary when shared resources are not atomic.
4
5from multiprocessing import Process, Lock, Value
6
7def safe_add(counter, lock):
8 for _ in range(10000):
9 with lock:
10 counter.value += 1
11
12if __name__ == "__main__":
13 lock = Lock()
14 counter = Value('i', 0)
15 p1 = Process(target=safe_add, args=(counter, lock))
16 p2 = Process(target=safe_add, args=(counter, lock))
17 p1.start(); p2.start()
18 p1.join(); p2.join()
19 print("Counter:", counter.value)- Kunci menghalang perlumbaan data, tetapi jika kawasan yang dikunci terlalu besar, prestasi pemprosesan selari akan berkurangan. Hanya bahagian yang perlu sahaja harus dilindungi sebagai seksyen kritikal.
Perbezaan antara fork di UNIX dan tingkah laku di Windows
Dalam sistem UNIX, proses diduplikasi menggunakan fork secara lalai, membolehkan copy-on-write yang cekap untuk memori. Windows memulakan proses dengan spawn (yang memuat semula modul), jadi anda perlu berhati-hati dengan perlindungan titik masuk dan inisialisasi global.
1# Explanation: Check start method (fork/spawn) and set it if needed.
2# Useful for debugging platform-dependent behavior.
3
4from multiprocessing import get_start_method, set_start_method
5
6if __name__ == "__main__":
7 print("Start method:", get_start_method())
8
9 # uncomment to force spawn on Unix for testing
10 # set_start_method('spawn')set_start_methodhanya boleh dipanggil sekali sahaja pada permulaan program anda. Lebih selamat jika anda tidak menukar tetapan ini secara sewenang-wenang dalam perpustakaan (libraries).
Contoh praktikal: Penanda aras beban tugas berasaskan CPU (perbandingan)
Di bawah ialah skrip yang secara ringkas membandingkan seberapa pantas pemprosesan boleh menjadi dengan penggunaan paralel melalui multiprocessing. Di sini, kami menggunakan Pool.
1# Explanation:
2# Compare sequential vs parallel execution times for CPU-bound task.
3# Helps understand speedup and overhead.
4
5import time
6from multiprocessing import Pool, cpu_count
7import math
8
9def heavy_task(n):
10 s = 0
11 for i in range(1, n):
12 s += math.sqrt(i)
13 return s
14
15def run_sequential(nums):
16 return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19 with Pool(processes=cpu_count()) as p:
20 return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23 nums = [2000000] * 8 # heavy tasks
24 t0 = time.time()
25 run_sequential(nums)
26 seq = time.time() - t0
27 t1 = time.time()
28 run_parallel(nums)
29 par = time.time() - t1
30 print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")- Contoh ini menunjukkan bahawa bergantung kepada beban tugas dan bilangan proses, pemprosesan selari kadangkala tidak berkesan disebabkan oleh beban lebihan. Semakin besar (“berat”) dan bebas tugas-tugas tersebut, semakin besar keuntungannya.
Peraturan asas yang penting
Di bawah adalah perkara asas untuk menggunakan multiprocessing secara selamat dan berkesan.
- Di Windows, modul akan diimport semula apabila proses anak bermula, jadi anda mesti melindungi titik masuk skrip dengan
if __name__ == "__main__":. - Komunikasi antara proses disirikan (dengan penukaran
pickle), jadi pemindahan objek besar menjadi mahal dari segi prestasi. - Oleh kerana
multiprocessingmencipta proses, adalah biasa untuk menentukan bilangan proses berdasarkanmultiprocessing.cpu_count(). - Mewujudkan
Poollain dalam satu pekerja menjadi rumit, jadi anda harus mengelakkan sarang (nesting) contohPoolsebanyak mungkin. - Oleh kerana pengecualian yang berlaku dalam proses anak sukar dikesan daripada proses utama, adalah perlu untuk melaksanakan log dan pengendalian ralat secara jelas.
- Tetapkan bilangan proses mengikut CPU, dan pertimbangkan penggunaan threads untuk tugas I/O.
Nasihat rekabentuk praktikal
Di bawah adalah beberapa konsep dan corak yang berguna untuk mereka bentuk pemprosesan selari.
- Adalah cekap jika memisahkan proses mengikut peranan seperti bacaan input (I/O), pra-pemprosesan (multi-CPU), dan pengagregatan (bersiri) melalui 'pipelining'.
- Untuk memudahkan penyahpepijatan, semak dahulu operasi dalam satu proses sebelum membuat pemprosesan selari.
- Untuk log, asingkan keluaran log mengikut proses (contohnya, sertakan PID dalam nama fail) bagi memudahkan pengesanan masalah.
- Sediakan mekanisme ulang-cuba (retry) dan had masa (timeout) supaya anda boleh pulihkan dengan selamat walaupun proses tergantung.
Ringkasan (Perkara utama yang boleh anda gunakan terus)
Pemprosesan selari adalah berkuasa, namun penting untuk menilai sifat tugas, saiz data, dan kos komunikasi antara proses dengan betul. multiprocessing berkesan untuk pemprosesan yang terikat CPU, tetapi reka bentuk yang lemah atau kesilapan penyegerakan boleh mengurangkan prestasi. Jika anda mengikuti peraturan dan corak asas, anda boleh membina program selari yang selamat dan cekap.
Anda boleh mengikuti artikel di atas menggunakan Visual Studio Code di saluran YouTube kami. Sila lihat juga saluran YouTube kami.