`multiprocessing` โมดูลใน Python

`multiprocessing` โมดูลใน Python

บทความนี้อธิบายเกี่ยวกับโมดูล multiprocessing ใน Python

บทความนี้แนะนำเคล็ดลับที่ใช้ได้จริงในการเขียนโค้ดประมวลผลแบบขนานอย่างปลอดภัยและมีประสิทธิภาพด้วยโมดูล multiprocessing

YouTube Video

multiprocessing โมดูลใน Python

พื้นฐาน: ทำไมต้องใช้ multiprocessing?

multiprocessing ช่วยให้สามารถทำงานแบบขนานในระดับโปรเซส ทำให้คุณสามารถประมวลผลงานที่ใช้ CPU ได้พร้อมกันโดยไม่ถูกจำกัดด้วย GIL (Global Interpreter Lock) ของ Python สำหรับงานที่ใช้ I/O เป็นหลัก การใช้ threading หรือ asyncio อาจง่ายกว่าและเหมาะสมกว่า

การใช้งาน Process เบื้องต้น

ตัวอย่างเบื้องต้นในการเรียกใช้ฟังก์ชันในโปรเซสแยกโดยใช้ Process ตัวอย่างนี้แสดงวิธีการเริ่มโปรเซส รอให้ทำงานเสร็จ และส่งอาร์กิวเมนต์

 1# Explanation:
 2# This example starts a separate process to run `worker` which prints messages.
 3# It demonstrates starting, joining, and passing arguments.
 4
 5from multiprocessing import Process
 6import time
 7
 8def worker(name, delay):
 9    # English comment in code per user's preference
10    for i in range(3):
11        print(f"Worker {name}: iteration {i}")
12        time.sleep(delay)
13
14if __name__ == "__main__":
15    p = Process(target=worker, args=("A", 0.5))
16    p.start()
17    print("Main: waiting for worker to finish")
18    p.join()
19    print("Main: worker finished")
  • รหัสตัวอย่างนี้แสดงลำดับการทำงานที่โปรเซสหลักเรียกโปรเซสลูก worker และรอจนงานเสร็จโดยใช้ join() คุณสามารถส่งอาร์กิวเมนต์ได้โดยใช้ args

การประมวลผลแบบขนานอย่างง่ายด้วย Pool (API ระดับสูง)

Pool.map มีประโยชน์เมื่อคุณต้องการใช้ฟังก์ชันเดียวกันกับงานอิสระหลายงาน มันจะจัดการโปรเซสผู้ช่วย (worker) ให้โดยอัตโนมัติ

 1# Explanation:
 2# Use Pool.map to parallelize a CPU-bound function across available processes.
 3# Good for "embarrassingly parallel" workloads.
 4
 5from multiprocessing import Pool, cpu_count
 6import math
 7import time
 8
 9def is_prime(n):
10    # Check primality (inefficient but CPU-heavy for demo)
11    if n < 2:
12        return False
13    for i in range(2, int(math.sqrt(n)) + 1):
14        if n % i == 0:
15            return False
16    return True
17
18if __name__ == "__main__":
19    nums = [10_000_000 + i for i in range(50)]
20    start = time.time()
21    with Pool(processes=cpu_count()) as pool:
22        results = pool.map(is_prime, nums)
23    end = time.time()
24    print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")
  • Pool สามารถจัดการจำนวน worker อัตโนมัติ และ map จะคืนค่าผลลัพธ์ตามลำดับเดิม

การสื่อสารระหว่างโปรเซส: รูปแบบ Producer/Consumer ด้วย Queue

Queue คือคิวแบบ First-In-First-Out (FIFO) ที่ช่วยถ่ายโอนวัตถุระหว่างโปรเซสอย่างปลอดภัย ด้านล่างนี้คือลักษณะรูปแบบที่พบได้บ่อย

 1# Explanation:
 2# Demonstrates a producer putting items into a Queue
 3# and consumer reading them.
 4# This is useful for task pipelines between processes.
 5
 6from multiprocessing import Process, Queue
 7import time
 8import random
 9
10def producer(q, n):
11    for i in range(n):
12        item = f"item-{i}"
13        print("Producer: putting", item)
14        q.put(item)
15        time.sleep(random.random() * 0.5)
16    q.put(None)  # sentinel to signal consumer to stop
17
18def consumer(q):
19    while True:
20        item = q.get()
21        if item is None:
22            break
23        print("Consumer: got", item)
24        time.sleep(0.2)
25
26if __name__ == "__main__":
27    q = Queue()
28    p = Process(target=producer, args=(q, 5))
29    c = Process(target=consumer, args=(q,))
30    p.start()
31    c.start()
32    p.join()
33    c.join()
34    print("Main: done")
  • Queue ช่วยให้คุณส่งข้อมูลระหว่างโปรเซสได้อย่างปลอดภัย โดยปกติจะใช้ค่าพิเศษเช่น None เพื่อบอกว่าการทำงานสิ้นสุดแล้ว

หน่วยความจำที่ใช้ร่วมกัน: Value และ Array

คุณสามารถใช้ Value และ Array เพื่อแบ่งปันตัวเลขหรืออาร์เรย์ขนาดเล็กระหว่างโปรเซสได้ คุณจำเป็นต้องใช้ล็อกเพื่อหลีกเลี่ยงความขัดแย้ง

 1# Explanation:
 2# Use Value to share a single integer counter
 3# and Array for a small numeric array.
 4# Show how to use a Lock to avoid race conditions.
 5
 6from multiprocessing import Process, Value, Array, Lock
 7import time
 8
 9def increment(counter, lock, times):
10    for _ in range(times):
11        with lock:
12            counter.value += 1
13
14def update_array(arr):
15    for i in range(len(arr)):
16        arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19    lock = Lock()
20    counter = Value('i', 0)  # 'i' = signed int
21    shared_arr = Array('i', [0, 0, 0])
22
23    p1 = Process(target=increment, args=(counter, lock, 1000))
24    p2 = Process(target=increment, args=(counter, lock, 1000))
25    a = Process(target=update_array, args=(shared_arr,))
26
27    p1.start(); p2.start(); a.start()
28    p1.join(); p2.join(); a.join()
29
30    print("Counter:", counter.value)
31    print("Array:", list(shared_arr))
  • Value และ Array ใช้กลไกระดับล่าง (หน่วยความจำร่วมที่ระดับภาษา C) ในการแบ่งปันข้อมูลระหว่างโปรเซส ไม่ใช่กลไกของ Python ดังนั้น จึงเหมาะสำหรับการอ่านและเขียนข้อมูลปริมาณน้อยอย่างรวดเร็ว แต่ไม่เหมาะสำหรับการจัดการข้อมูลปริมาณมาก

การใช้ข้อมูลร่วมกันขั้นสูง: แชร์อ็อบเจกต์ (dicts, lists) ด้วย Manager

หากคุณต้องการใช้อ็อบเจกต์ที่ยืดหยุ่นกว่าสำหรับการแชร์ เช่น list หรือ dictionary ให้ใช้ Manager()

 1# Explanation:
 2# Manager provides proxy objects like dict/list
 3# that can be shared across processes.
 4# Good for moderate-size shared state and easier programming model.
 5
 6from multiprocessing import Process, Manager
 7import time
 8
 9def worker(shared_dict, key, value):
10    shared_dict[key] = value
11
12if __name__ == "__main__":
13    with Manager() as manager:
14        d = manager.dict()
15        processes = []
16        for i in range(5):
17            p = Process(target=worker, args=(d, f"k{i}", i*i))
18            p.start()
19            processes.append(p)
20        for p in processes:
21            p.join()
22        print("Shared dict:", dict(d))
  • Manager สะดวกสำหรับการใช้งานแบ่งปัน dictionary และ list แต่ทุกการเข้าถึงจะส่งข้อมูลระหว่าง process และต้องมีการแปลง pickle ดังนั้น การอัปเดตข้อมูลจำนวนมากบ่อยๆ จะทำให้การประมวลผลช้าลง

กลไกซิงโครไนซ์: วิธีใช้ Lock และ Semaphore

ใช้ Lock หรือ Semaphore ในการควบคุมการเข้าถึงทรัพยากรร่วมกันแบบพร้อมกัน คุณสามารถใช้งานได้อย่างกระชับร่วมกับคำสั่ง with

 1# Explanation:
 2# Demonstrates using Lock to prevent simultaneous access to a critical section.
 3# Locks are necessary when shared resources are not atomic.
 4
 5from multiprocessing import Process, Lock, Value
 6
 7def safe_add(counter, lock):
 8    for _ in range(10000):
 9        with lock:
10            counter.value += 1
11
12if __name__ == "__main__":
13    lock = Lock()
14    counter = Value('i', 0)
15    p1 = Process(target=safe_add, args=(counter, lock))
16    p2 = Process(target=safe_add, args=(counter, lock))
17    p1.start(); p2.start()
18    p1.join(); p2.join()
19    print("Counter:", counter.value)
  • Lock จะป้องกัน data race แต่ถ้าขอบเขตที่ล็อคไว้กว้างเกินไป ประสิทธิภาพการประมวลผลแบบขนานจะลดลง ควรปกป้องเฉพาะส่วนที่จำเป็นเป็น critical section เท่านั้น

ความแตกต่างระหว่าง fork บน UNIX และลักษณะการทำงานบน Windows

บนระบบ UNIX จะมีการสร้างโปรเซสโดยใช้ fork โดยค่าเริ่มต้น ซึ่งช่วยให้ memory มีประสิทธิภาพด้วย copy-on-write บน Windows จะเริ่มโปรเซสด้วยวิธี spawn (ซึ่งจะ import โมดูลใหม่อีกครั้ง) ดังนั้นจึงต้องระวังการป้องกัน entry point และ initialization ส่วน global

 1# Explanation: Check start method (fork/spawn) and set it if needed.
 2# Useful for debugging platform-dependent behavior.
 3
 4from multiprocessing import get_start_method, set_start_method
 5
 6if __name__ == "__main__":
 7    print("Start method:", get_start_method())
 8
 9    # uncomment to force spawn on Unix for testing
10    # set_start_method('spawn')
  • set_start_method สามารถเรียกใช้ได้เพียงครั้งเดียวตอนเริ่มโปรแกรม การเปลี่ยนค่านี้ในไลบรารีโดยไม่ชัดเจนอาจไม่ปลอดภัย

ตัวอย่างการใช้งานจริง: ทดสอบประสิทธิภาพงานที่เน้นใช้ CPU (การเปรียบเทียบ)

ด้านล่างนี้คือสคริปต์ที่เปรียบเทียบอย่างง่ายว่าการประมวลผลจะเร็วขึ้นเพียงใดเมื่อใช้ multiprocessing แบบขนาน ในตัวอย่างนี้ เราใช้ Pool

 1# Explanation:
 2# Compare sequential vs parallel execution times for CPU-bound task.
 3# Helps understand speedup and overhead.
 4
 5import time
 6from multiprocessing import Pool, cpu_count
 7import math
 8
 9def heavy_task(n):
10    s = 0
11    for i in range(1, n):
12        s += math.sqrt(i)
13    return s
14
15def run_sequential(nums):
16    return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19    with Pool(processes=cpu_count()) as p:
20        return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23    nums = [2000000] * 8  # heavy tasks
24    t0 = time.time()
25    run_sequential(nums)
26    seq = time.time() - t0
27    t1 = time.time()
28    run_parallel(nums)
29    par = time.time() - t1
30    print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")
  • ตัวอย่างนี้แสดงให้เห็นว่าตามภาระงานและจำนวนโปรเซส การประมวลผลแบบขนานอาจไม่คุ้มค่าเพราะ overhead งานที่มีขนาดใหญ่และเป็นอิสระต่อกันมาก ยิ่งได้รับประโยชน์จากการประมวลผลแบบขนาน

กฎพื้นฐานที่สำคัญ

ด้านล่างนี้คือข้อควรพิจารณาพื้นฐานสำหรับการใช้ multiprocessing อย่างปลอดภัยและมีประสิทธิภาพ

  • บน Windows จะมีการ import โมดูลใหม่เมื่อโปรเซสลูกเริ่มทำงาน ดังนั้นต้องป้องกันจุดเริ่มต้นของสคริปต์ด้วย if __name__ == "__main__":
  • การสื่อสารระหว่าง process จะถูก serialize (ด้วยการแปลง pickle) ดังนั้นการส่งวัตถุขนาดใหญ่จะมีต้นทุนสูง
  • เนื่องจาก multiprocessing สร้างโปรเซสใหม่ จึงควรกำหนดจำนวนโปรเซสตามผลลัพธ์ของ multiprocessing.cpu_count()
  • การสร้าง Pool อีกอันภายใน worker จะซับซ้อนขึ้น ดังนั้นควรหลีกเลี่ยงการซ้อน Pool เท่าที่จะทำได้
  • เนื่องจากข้อผิดพลาดที่เกิดใน child process ตรวจจับได้ยากจาก main process จึงจำเป็นต้องเขียน logging และการจัดการข้อผิดพลาดอย่างชัดเจน
  • กำหนดจำนวนโปรเซสตามจำนวน CPU และพิจารณาใช้ thread แทนหากงานเป็น I/O-bound

คำแนะนำด้านการออกแบบเชิงปฏิบัติ

ด้านล่างนี้คือแนวคิดและรูปแบบที่มีประโยชน์สำหรับการออกแบบการประมวลผลแบบขนาน

  • การแยกโปรเซสออกเป็นบทบาทต่าง ๆ เช่น อ่านข้อมูลเข้า (I/O), ประมวลผลหลาย CPU (preprocessing), และสรุปผล (aggregation) แบบลำดับ (serial) ด้วย ‘pipelining’ จะช่วยเพิ่มประสิทธิภาพ
  • เพื่อให้ง่ายต่อการดีบั๊ก ให้ตรวจสอบการทำงานในโหมดโปรเซสเดียวก่อนค่อยขยายสู่แบบขนาน
  • สำหรับการบันทึก log ควรแยกไฟล์ log ต่อโปรเซส (เช่น ใส่ PID ในชื่อไฟล์) เพื่อให้ง่ายต่อการแยกปัญหา
  • เตรียมกลไกสำหรับ retry และ timeout เพื่อแก้ไขและกู้คืนงานได้แม้บาง process จะค้าง

สรุป (ประเด็นสำคัญที่นำไปใช้ได้ทันที)

การประมวลผลแบบขนานทรงพลัง แต่ควรประเมินลักษณะงาน ขนาดข้อมูล และต้นทุนการสื่อสารระหว่างโปรเซสให้เหมาะสม multiprocessing มีประสิทธิภาพสำหรับงานที่ใช้ CPU สูง แต่หากออกแบบผิดหรือซิงโครไนซ์ผิดพลาดจะทำให้ประสิทธิภาพลดลง หากปฏิบัติตามกฎและรูปแบบพื้นฐาน คุณจะสามารถสร้างโปรแกรมแบบขนานที่ปลอดภัยและมีประสิทธิภาพได้

คุณสามารถติดตามบทความข้างต้นโดยใช้ Visual Studio Code บนช่อง YouTube ของเรา กรุณาตรวจสอบช่อง YouTube ด้วย

YouTube Video