מודול ה-`multiprocessing` בפייתון

מודול ה-`multiprocessing` בפייתון

מאמר זה מסביר את מודול ה-multiprocessing בפייתון.

מאמר זה מציג טיפים מעשיים לכתיבת קוד עיבוד מקבילי ובטוח בשימוש ב־multiprocessing.

YouTube Video

מודול ה-multiprocessing בפייתון

יסודות: מדוע להשתמש ב־multiprocessing?

multiprocessing מאפשר הרצה מקבילית של תהליכים, כך שניתן להאיץ משימות תלויות־מעבד (CPU-bound) מבלי להיות מוגבלים על ידי GIL של פייתון. עבור משימות תלויות־קלט/פלט, ייתכן ש־threading או asyncio יהיו פשוטים ומתאימים יותר.

שימוש פשוט במחלקה Process

ראשית, דוגמה בסיסית להרצת פונקציה בתהליך נפרד בעזרת Process. זה מדגים כיצד להפעיל תהליך, להמתין לסיום שלו ולהעביר לו ארגומנטים.

 1# Explanation:
 2# This example starts a separate process to run `worker` which prints messages.
 3# It demonstrates starting, joining, and passing arguments.
 4
 5from multiprocessing import Process
 6import time
 7
 8def worker(name, delay):
 9    # English comment in code per user's preference
10    for i in range(3):
11        print(f"Worker {name}: iteration {i}")
12        time.sleep(delay)
13
14if __name__ == "__main__":
15    p = Process(target=worker, args=("A", 0.5))
16    p.start()
17    print("Main: waiting for worker to finish")
18    p.join()
19    print("Main: worker finished")
  • הקוד הזה מציג את התהליך שבו התהליך הראשי מפעיל תהליך משנה בשם worker וממתין לסיום שלו עם join(). ניתן להעביר ארגומנטים בעזרת הפרמטר args.

הרצה מקבילית פשוטה עם Pool (API ברמת־על)

Pool.map שימושי כאשר רוצים להפעיל את אותה פונקציה על משימות עצמאיות רבות. הוא מנהל עבורך את תהליכי־העבודה (workers) באופן פנימי.

 1# Explanation:
 2# Use Pool.map to parallelize a CPU-bound function across available processes.
 3# Good for "embarrassingly parallel" workloads.
 4
 5from multiprocessing import Pool, cpu_count
 6import math
 7import time
 8
 9def is_prime(n):
10    # Check primality (inefficient but CPU-heavy for demo)
11    if n < 2:
12        return False
13    for i in range(2, int(math.sqrt(n)) + 1):
14        if n % i == 0:
15            return False
16    return True
17
18if __name__ == "__main__":
19    nums = [10_000_000 + i for i in range(50)]
20    start = time.time()
21    with Pool(processes=cpu_count()) as pool:
22        results = pool.map(is_prime, nums)
23    end = time.time()
24    print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")
  • Pool יכול לשלוט אוטומטית על מספר התהליכים, ו־map מחזיר את התוצאות בסדר המקורי.

תקשורת בין־תהליכית: תבנית יצרן/צרכן (Producer/Consumer) בעזרת Queue

תור הוא תור FIFO (ראשון נכנס, ראשון יוצא) שמעביר אובייקטים בין תהליכים בצורה בטוחה. להלן כמה דפוסים טיפוסיים.

 1# Explanation:
 2# Demonstrates a producer putting items into a Queue
 3# and consumer reading them.
 4# This is useful for task pipelines between processes.
 5
 6from multiprocessing import Process, Queue
 7import time
 8import random
 9
10def producer(q, n):
11    for i in range(n):
12        item = f"item-{i}"
13        print("Producer: putting", item)
14        q.put(item)
15        time.sleep(random.random() * 0.5)
16    q.put(None)  # sentinel to signal consumer to stop
17
18def consumer(q):
19    while True:
20        item = q.get()
21        if item is None:
22            break
23        print("Consumer: got", item)
24        time.sleep(0.2)
25
26if __name__ == "__main__":
27    q = Queue()
28    p = Process(target=producer, args=(q, 5))
29    c = Process(target=consumer, args=(q,))
30    p.start()
31    c.start()
32    p.join()
33    c.join()
34    print("Main: done")
  • Queue מאפשר לשלוח נתונים בצורה בטוחה בין תהליכים. נהוג להשתמש בערך מיוחד כמו None כדי לסמן סיום.

זיכרון משותף: Value ו־Array

ניתן להשתמש ב־Value וב־Array כדי לשתף מספרים או מערכים קטנים בין תהליכים. יש צורך להשתמש במנעולים (locks) כדי להימנע מעימותים.

 1# Explanation:
 2# Use Value to share a single integer counter
 3# and Array for a small numeric array.
 4# Show how to use a Lock to avoid race conditions.
 5
 6from multiprocessing import Process, Value, Array, Lock
 7import time
 8
 9def increment(counter, lock, times):
10    for _ in range(times):
11        with lock:
12            counter.value += 1
13
14def update_array(arr):
15    for i in range(len(arr)):
16        arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19    lock = Lock()
20    counter = Value('i', 0)  # 'i' = signed int
21    shared_arr = Array('i', [0, 0, 0])
22
23    p1 = Process(target=increment, args=(counter, lock, 1000))
24    p2 = Process(target=increment, args=(counter, lock, 1000))
25    a = Process(target=update_array, args=(shared_arr,))
26
27    p1.start(); p2.start(); a.start()
28    p1.join(); p2.join(); a.join()
29
30    print("Counter:", counter.value)
31    print("Array:", list(shared_arr))
  • Value ו־Array משתפים נתונים בין תהליכים באמצעות מנגנונים נמוכים (shared memory ברמת שפת C) ולא ברמת פייתון עצמה. לכן, זה מתאים לקריאה וכתיבה מהירה של כמויות קטנות של נתונים, אך אינו מתאים לטיפול בכמויות גדולות של נתונים..

שיתוף מתקדם: אובייקטים משותפים (מילונים, רשימות) בעזרת Manager

אם אתם רוצים לשתף אובייקטים גמישים כמו רשימות או מילונים, השתמשו ב־Manager().

 1# Explanation:
 2# Manager provides proxy objects like dict/list
 3# that can be shared across processes.
 4# Good for moderate-size shared state and easier programming model.
 5
 6from multiprocessing import Process, Manager
 7import time
 8
 9def worker(shared_dict, key, value):
10    shared_dict[key] = value
11
12if __name__ == "__main__":
13    with Manager() as manager:
14        d = manager.dict()
15        processes = []
16        for i in range(5):
17            p = Process(target=worker, args=(d, f"k{i}", i*i))
18            p.start()
19            processes.append(p)
20        for p in processes:
21            p.join()
22        print("Shared dict:", dict(d))
  • Manager נוח לשיתוף מילונים ורשימות, אך כל גישה שולחת נתונים בין תהליכים ודורשת המרה בעזרת pickle. לכן, עדכון תדיר של כמויות גדולות של נתונים יאט את תהליך העיבוד.

מנגנוני סנכרון: כיצד להשתמש ב־Lock ו־Semaphore

השתמשו ב־Lock או Semaphore כדי לשלוט בגישה מקבילית למשאבים משותפים. ניתן להשתמש בהם בצורה תמציתית עם משפט with.

 1# Explanation:
 2# Demonstrates using Lock to prevent simultaneous access to a critical section.
 3# Locks are necessary when shared resources are not atomic.
 4
 5from multiprocessing import Process, Lock, Value
 6
 7def safe_add(counter, lock):
 8    for _ in range(10000):
 9        with lock:
10            counter.value += 1
11
12if __name__ == "__main__":
13    lock = Lock()
14    counter = Value('i', 0)
15    p1 = Process(target=safe_add, args=(counter, lock))
16    p2 = Process(target=safe_add, args=(counter, lock))
17    p1.start(); p2.start()
18    p1.join(); p2.join()
19    print("Counter:", counter.value)
  • מנעולים מונעים מירוצי נתונים, אך אם האזור הננעל גדול מדי, ביצועי העיבוד המקבילי ייפגעו. רק החלקים הדרושים צריכים להיות מוגנים כחלק קריטי (critical section).

הבדלים בין fork ב־UNIX לבין ההתנהגות ב־Windows

במערכות UNIX, התהליכים משוכפלים בברירת־מחדל על ידי fork, מה שמאפשר ניהול זיכרון יעיל בשיטת copy-on-write. Windows מפעילה תהליכים באמצעות spawn (שמבצע ייבוא־מחדש של מודולים), לכן יש להיזהר בהגנה על נקודת התחלה ואתחול משתנים גלובליים.

 1# Explanation: Check start method (fork/spawn) and set it if needed.
 2# Useful for debugging platform-dependent behavior.
 3
 4from multiprocessing import get_start_method, set_start_method
 5
 6if __name__ == "__main__":
 7    print("Start method:", get_start_method())
 8
 9    # uncomment to force spawn on Unix for testing
10    # set_start_method('spawn')
  • ניתן לקרוא ל־set_start_method רק פעם אחת בתחילת התוכנית. בטוח יותר לא לשנות זאת באופן שרירותי בתוך ספריות.

דוגמה מעשית: השוואת ביצועים (benchmarking) של עומסי עבודה תלויי־מעבד (CPU-bound)

להלן סקריפט שמשווה בפשטות כמה עיבוד יכול להיות מהיר יותר בעזרת עיבוד מקבילי באמצעות multiprocessing. כאן אנו משתמשים ב־Pool.

 1# Explanation:
 2# Compare sequential vs parallel execution times for CPU-bound task.
 3# Helps understand speedup and overhead.
 4
 5import time
 6from multiprocessing import Pool, cpu_count
 7import math
 8
 9def heavy_task(n):
10    s = 0
11    for i in range(1, n):
12        s += math.sqrt(i)
13    return s
14
15def run_sequential(nums):
16    return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19    with Pool(processes=cpu_count()) as p:
20        return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23    nums = [2000000] * 8  # heavy tasks
24    t0 = time.time()
25    run_sequential(nums)
26    seq = time.time() - t0
27    t1 = time.time()
28    run_parallel(nums)
29    par = time.time() - t1
30    print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")
  • הדוגמה מראה שלפעמים, תלוי בעומס המשימה ובמספר התהליכים, הרצה מקבילית איננה יעילה בגלל overhead. ככל שהמשימות גדולות (״כבדות״) ועצמאיות יותר, כך התועלת רבה יותר.

כללי יסוד חשובים

להלן הנקודות הבסיסיות לשימוש בטוח ויעיל ב-multiprocessing.

  • ב־Windows, מופעל ייבוא־מחדש של מודולים כאשר תהליכים צאצאים מתחילים, לכן חובה להגן על נקודת ההתחלה של הסקריפט באמצעות if __name__ == "__main__":.
  • תקשורת בין תהליכים עוברת סידור (serialization) עם המרה של pickle, ולכן העברת אובייקטים גדולים הופכת יקרה.
  • מאחר ש־multiprocessing יוצר תהליכים, נהוג להחליט על מספר התהליכים לפי multiprocessing.cpu_count().
  • יצירת Pool נוסף בתוך תהליך עובד מסבכת את העניינים, לכן יש להימנע מנעיצת מופעים של Pool ככל האפשר.
  • מאחר שקשה לזהות חריגות בתהליכי משנה מתוך התהליך הראשי, יש ליישם לוגינג וטיפול בשגיאות בצורה מפורשת.
  • קבע את מספר התהליכים לפי מספר המעבדים, ושקול להשתמש בתהליכי משנה (threads) עבור משימות תלויות I/O.

טיפים לעיצוב מעשי

להלן כמה מושגים ודפוסים שימושיים לתכנון עיבוד מקבילי.

  • יעיל להפריד תהליכים לפי תפקיד: קריאת קלט (I/O), עיבוד מקדים (במקביל), ואגרגציה (בשרשור), באמצעות 'צנרת' (pipelining).
  • לפני ריצה מקבילית, בדוק קודם כל את תקינות הקוד בתהליך יחיד לטובת דיבוג פשוט.
  • למטרות logging, הפרד את קבצי־הלוג בין תהליכים (כגון: הוסף את מזהה התהליך (PID) בשם הקובץ) כדי להקל על בדיקת תקלות.
  • הכן מנגנוני retry ו־timeout כדי לאפשר שחזור בטוח במקרה שתהליך נתקע.

סיכום (נקודות מפתח לשימוש מיידי)

עיבוד מקבילי הוא כלי חזק, אך חשוב לאפיין נכון את סוגי המשימות, גודל הנתונים ועלות התקשורת בין התהליכים. multiprocessing יעיל לעיבוד תלווי־מעבד, אך תכנון לא נכון או טעויות בסנכרון עלולים להקטין את הביצועים. אם תקפיד על הכללים והדפוסים הבסיסיים, תוכל לבנות תוכניות מקביליות בטוחות ויעילות.

תוכלו לעקוב אחר המאמר שלמעלה באמצעות Visual Studio Code בערוץ היוטיוב שלנו. נא לבדוק גם את ערוץ היוטיוב.

YouTube Video