Module `multiprocessing` trong Python

Module `multiprocessing` trong Python

Bài viết này giải thích về module multiprocessing trong Python.

Bài viết này giới thiệu các mẹo thực tiễn để viết mã xử lý song song an toàn và hiệu quả bằng cách sử dụng mô-đun multiprocessing.

YouTube Video

Module multiprocessing trong Python

Cơ bản: Tại sao sử dụng multiprocessing?

multiprocessing cho phép song song hóa dựa trên tiến trình, vì vậy bạn có thể xử lý song song các tác vụ tốn CPU mà không bị ràng buộc bởi GIL (Global Interpreter Lock) của Python. Đối với các tác vụ bị giới hạn bởi I/O, threading hoặc asyncio có thể đơn giản và phù hợp hơn.

Cách sử dụng đơn giản của Process

Trước tiên, đây là một ví dụ cơ bản về cách chạy một hàm trong tiến trình riêng biệt bằng Process. Điều này minh họa cách khởi động một tiến trình, chờ nó hoàn thành và truyền đối số.

 1# Explanation:
 2# This example starts a separate process to run `worker` which prints messages.
 3# It demonstrates starting, joining, and passing arguments.
 4
 5from multiprocessing import Process
 6import time
 7
 8def worker(name, delay):
 9    # English comment in code per user's preference
10    for i in range(3):
11        print(f"Worker {name}: iteration {i}")
12        time.sleep(delay)
13
14if __name__ == "__main__":
15    p = Process(target=worker, args=("A", 0.5))
16    p.start()
17    print("Main: waiting for worker to finish")
18    p.join()
19    print("Main: worker finished")
  • Đoạn mã này cho thấy quy trình mà tiến trình chính khởi chạy tiến trình con worker và chờ hoàn thành bằng cách sử dụng join(). Bạn có thể truyền đối số bằng cách sử dụng args.

Song song hóa đơn giản với Pool (API cấp cao)

Pool.map hữu ích khi bạn muốn áp dụng cùng một hàm cho nhiều tác vụ độc lập. Nó tự động quản lý các tiến trình công nhân cho bạn.

 1# Explanation:
 2# Use Pool.map to parallelize a CPU-bound function across available processes.
 3# Good for "embarrassingly parallel" workloads.
 4
 5from multiprocessing import Pool, cpu_count
 6import math
 7import time
 8
 9def is_prime(n):
10    # Check primality (inefficient but CPU-heavy for demo)
11    if n < 2:
12        return False
13    for i in range(2, int(math.sqrt(n)) + 1):
14        if n % i == 0:
15            return False
16    return True
17
18if __name__ == "__main__":
19    nums = [10_000_000 + i for i in range(50)]
20    start = time.time()
21    with Pool(processes=cpu_count()) as pool:
22        results = pool.map(is_prime, nums)
23    end = time.time()
24    print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")
  • Pool có thể tự động kiểm soát số lượng tiến trình công nhân, và map trả về kết quả theo thứ tự ban đầu.

Giao tiếp giữa các tiến trình: Mẫu Producer/Consumer sử dụng Queue

Queue là hàng đợi First-In-First-Out (FIFO) dùng để truyền đối tượng an toàn giữa các tiến trình. Dưới đây là một số mẫu điển hình.

 1# Explanation:
 2# Demonstrates a producer putting items into a Queue
 3# and consumer reading them.
 4# This is useful for task pipelines between processes.
 5
 6from multiprocessing import Process, Queue
 7import time
 8import random
 9
10def producer(q, n):
11    for i in range(n):
12        item = f"item-{i}"
13        print("Producer: putting", item)
14        q.put(item)
15        time.sleep(random.random() * 0.5)
16    q.put(None)  # sentinel to signal consumer to stop
17
18def consumer(q):
19    while True:
20        item = q.get()
21        if item is None:
22            break
23        print("Consumer: got", item)
24        time.sleep(0.2)
25
26if __name__ == "__main__":
27    q = Queue()
28    p = Process(target=producer, args=(q, 5))
29    c = Process(target=consumer, args=(q,))
30    p.start()
31    c.start()
32    p.join()
33    c.join()
34    print("Main: done")
  • Queue giúp bạn truyền dữ liệu giữa các tiến trình một cách an toàn. Việc sử dụng một giá trị đặc biệt như None để báo hiệu kết thúc là điều phổ biến.

Bộ nhớ chia sẻ: ValueArray

Bạn có thể sử dụng ValueArray khi muốn chia sẻ các số nhỏ hoặc mảng nhỏ giữa các tiến trình. Bạn cần sử dụng khóa (lock) để tránh xung đột.

 1# Explanation:
 2# Use Value to share a single integer counter
 3# and Array for a small numeric array.
 4# Show how to use a Lock to avoid race conditions.
 5
 6from multiprocessing import Process, Value, Array, Lock
 7import time
 8
 9def increment(counter, lock, times):
10    for _ in range(times):
11        with lock:
12            counter.value += 1
13
14def update_array(arr):
15    for i in range(len(arr)):
16        arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19    lock = Lock()
20    counter = Value('i', 0)  # 'i' = signed int
21    shared_arr = Array('i', [0, 0, 0])
22
23    p1 = Process(target=increment, args=(counter, lock, 1000))
24    p2 = Process(target=increment, args=(counter, lock, 1000))
25    a = Process(target=update_array, args=(shared_arr,))
26
27    p1.start(); p2.start(); a.start()
28    p1.join(); p2.join(); a.join()
29
30    print("Counter:", counter.value)
31    print("Array:", list(shared_arr))
  • ValueArray chia sẻ dữ liệu giữa các tiến trình bằng cơ chế cấp thấp (bộ nhớ chia sẻ ở cấp độ ngôn ngữ C), không phải ở chính Python. Do đó, nó phù hợp để đọc và ghi nhanh các lượng dữ liệu nhỏ, nhưng không phù hợp để xử lý các lượng dữ liệu lớn..

Chia sẻ nâng cao: Đối tượng chia sẻ (dict, list) với Manager

Nếu bạn muốn sử dụng đối tượng chia sẻ linh hoạt hơn như list hoặc dictionary, hãy sử dụng Manager().

 1# Explanation:
 2# Manager provides proxy objects like dict/list
 3# that can be shared across processes.
 4# Good for moderate-size shared state and easier programming model.
 5
 6from multiprocessing import Process, Manager
 7import time
 8
 9def worker(shared_dict, key, value):
10    shared_dict[key] = value
11
12if __name__ == "__main__":
13    with Manager() as manager:
14        d = manager.dict()
15        processes = []
16        for i in range(5):
17            p = Process(target=worker, args=(d, f"k{i}", i*i))
18            p.start()
19            processes.append(p)
20        for p in processes:
21            p.join()
22        print("Shared dict:", dict(d))
  • Manager rất tiện lợi để chia sẻ dictionary và list, nhưng mỗi lần truy cập đều truyền dữ liệu giữa các tiến trình và cần chuyển đổi bằng pickle. Do đó, việc cập nhật số lượng lớn dữ liệu thường xuyên sẽ làm chậm quá trình xử lý.

Cơ chế đồng bộ hóa: Cách sử dụng LockSemaphore

Sử dụng Lock hoặc Semaphore để kiểm soát việc truy cập đồng thời vào tài nguyên chia sẻ. Bạn có thể sử dụng chúng một cách ngắn gọn với câu lệnh with.

 1# Explanation:
 2# Demonstrates using Lock to prevent simultaneous access to a critical section.
 3# Locks are necessary when shared resources are not atomic.
 4
 5from multiprocessing import Process, Lock, Value
 6
 7def safe_add(counter, lock):
 8    for _ in range(10000):
 9        with lock:
10            counter.value += 1
11
12if __name__ == "__main__":
13    lock = Lock()
14    counter = Value('i', 0)
15    p1 = Process(target=safe_add, args=(counter, lock))
16    p2 = Process(target=safe_add, args=(counter, lock))
17    p1.start(); p2.start()
18    p1.join(); p2.join()
19    print("Counter:", counter.value)
  • Khóa giúp ngăn chặn tranh chấp dữ liệu, nhưng nếu vùng được khóa quá lớn thì hiệu suất xử lý song song sẽ giảm. Chỉ nên bảo vệ những phần cần thiết như là vùng quan trọng (critical section).

Sự khác biệt giữa fork trên UNIX và hành vi trên Windows

Trên hệ thống UNIX, các tiến trình được nhân bản bằng fork theo mặc định, giúp tiết kiệm bộ nhớ nhờ copy-on-write. Windows khởi tạo tiến trình bằng spawn (tái nhập các mô-đun), do đó bạn cần chú ý đến bảo vệ điểm khởi động và khởi tạo biến toàn cục.

 1# Explanation: Check start method (fork/spawn) and set it if needed.
 2# Useful for debugging platform-dependent behavior.
 3
 4from multiprocessing import get_start_method, set_start_method
 5
 6if __name__ == "__main__":
 7    print("Start method:", get_start_method())
 8
 9    # uncomment to force spawn on Unix for testing
10    # set_start_method('spawn')
  • set_start_method chỉ có thể được gọi một lần khi bắt đầu chương trình. An toàn hơn nếu không thay đổi tuỳ tiện giá trị này trong các thư viện.

Ví dụ thực tế: Đo hiệu năng các tác vụ tốn CPU (so sánh)

Dưới đây là một script đơn giản so sánh việc xử lý có thể nhanh hơn như thế nào khi sử dụng song song hóa với multiprocessing. Ở đây, chúng ta sử dụng Pool.

 1# Explanation:
 2# Compare sequential vs parallel execution times for CPU-bound task.
 3# Helps understand speedup and overhead.
 4
 5import time
 6from multiprocessing import Pool, cpu_count
 7import math
 8
 9def heavy_task(n):
10    s = 0
11    for i in range(1, n):
12        s += math.sqrt(i)
13    return s
14
15def run_sequential(nums):
16    return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19    with Pool(processes=cpu_count()) as p:
20        return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23    nums = [2000000] * 8  # heavy tasks
24    t0 = time.time()
25    run_sequential(nums)
26    seq = time.time() - t0
27    t1 = time.time()
28    run_parallel(nums)
29    par = time.time() - t1
30    print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")
  • Ví dụ này cho thấy, tuỳ vào tải tác vụ và số lượng tiến trình, đôi khi song song hóa không hiệu quả do chi phí bổ sung. Các tác vụ càng lớn và độc lập thì lợi ích đạt được càng cao.

Những quy tắc cơ bản quan trọng

Dưới đây là những điểm cơ bản để sử dụng multiprocessing một cách an toàn và hiệu quả.

  • Trên Windows, các mô-đun sẽ được nhập lại khi tiến trình con khởi chạy, vì vậy bạn phải bảo vệ điểm khởi động script bằng if __name__ == "__main__":.
  • Giao tiếp giữa các tiến trình được tuần tự hóa (chuyển đổi bằng pickle), do đó việc truyền các đối tượng lớn sẽ tốn kém.
  • multiprocessing tạo ra các tiến trình, thông thường số lượng tiến trình sẽ dựa trên multiprocessing.cpu_count().
  • Việc tạo một Pool khác bên trong worker sẽ trở nên phức tạp, vì vậy bạn nên tránh lồng các instance Pool càng nhiều càng tốt.
  • Vì các ngoại lệ xảy ra trong tiến trình con khó được phát hiện từ tiến trình chính, nên cần phải thực hiện ghi log và xử lý lỗi một cách rõ ràng.
  • Hãy đặt số tiến trình phù hợp với số CPU, và cân nhắc sử dụng luồng cho các tác vụ tốn I/O.

Lời khuyên về thiết kế thực tế

Dưới đây là một số khái niệm và mẫu hữu ích để thiết kế xử lý song song.

  • Việc tách tiến trình thành các vai trò như đọc dữ liệu đầu vào (I/O), tiền xử lý (đa CPU) và tổng hợp (tuần tự) thông qua 'pipelining' sẽ hiệu quả hơn.
  • Để đơn giản hóa việc debug, trước tiên hãy kiểm tra hoạt động trong một tiến trình đơn trước khi song song hóa.
  • Đối với ghi nhật ký, hãy tách nhật ký theo từng tiến trình (ví dụ: thêm PID vào tên tệp) để dễ dàng khoanh vùng sự cố.
  • Chuẩn bị cơ chế thử lại và timeout để có thể khôi phục an toàn ngay cả khi tiến trình bị treo.

Tóm tắt (Những điểm chính bạn có thể áp dụng ngay)

Xử lý song song rất mạnh mẽ, nhưng điều quan trọng là phải đánh giá đúng bản chất tác vụ, kích thước dữ liệu và chi phí giao tiếp giữa các tiến trình. multiprocessing hiệu quả cho các tác vụ tốn CPU, nhưng nếu thiết kế kém hoặc đồng bộ hóa sai có thể làm giảm hiệu suất. Nếu bạn tuân theo các quy tắc và mẫu cơ bản, bạn có thể xây dựng chương trình song song an toàn và hiệu quả.

Bạn có thể làm theo bài viết trên bằng cách sử dụng Visual Studio Code trên kênh YouTube của chúng tôi. Vui lòng ghé thăm kênh YouTube.

YouTube Video