وحدة `multiprocessing` في Python
تشرح هذه المقالة وحدة multiprocessing في Python۔
تقدم هذه المقالة نصائح عملية لكتابة كود المعالجة المتوازية بشكل آمن وفعال باستخدام وحدة multiprocessing۔
YouTube Video
وحدة multiprocessing في Python
الأساسيات: لماذا نستخدم multiprocessing؟
تتيح وحدة multiprocessing التنفيذ المتوازي على مستوى العمليات، بحيث يمكنك تنفيذ المهام المعتمدة على المعالج بالتوازي دون قيود قفل المترجم العالمي (GIL) في بايثون۔ بالنسبة للمهام المعتمدة على الإدخال/الإخراج، قد تكون threading أو asyncio أبسط وأكثر ملاءمة۔
الاستخدام البسيط لـ Process
أولاً، هنا مثال أساسي لتشغيل دالة في عملية منفصلة باستخدام Process۔ يوضح هذا كيفية بدء عملية، الانتظار حتى تنتهي، وتمرير الوسائط۔
1# Explanation:
2# This example starts a separate process to run `worker` which prints messages.
3# It demonstrates starting, joining, and passing arguments.
4
5from multiprocessing import Process
6import time
7
8def worker(name, delay):
9 # English comment in code per user's preference
10 for i in range(3):
11 print(f"Worker {name}: iteration {i}")
12 time.sleep(delay)
13
14if __name__ == "__main__":
15 p = Process(target=worker, args=("A", 0.5))
16 p.start()
17 print("Main: waiting for worker to finish")
18 p.join()
19 print("Main: worker finished")- يوضح هذا الكود تدفق العملية الرئيسية التي تطلق عملية فرعية باسم
workerوتنتظر اكتمالها باستخدامjoin()۔ يمكنك تمرير الوسائط باستخدامargs۔
التوازي البسيط باستخدام Pool (واجهة برمجة التطبيقات عالية المستوى)
Pool.map مفيدة عند رغبتك في تطبيق نفس الدالة على العديد من المهام المستقلة۔ يقوم بإدارة العمليات العاملة داخلياً نيابة عنك۔
1# Explanation:
2# Use Pool.map to parallelize a CPU-bound function across available processes.
3# Good for "embarrassingly parallel" workloads.
4
5from multiprocessing import Pool, cpu_count
6import math
7import time
8
9def is_prime(n):
10 # Check primality (inefficient but CPU-heavy for demo)
11 if n < 2:
12 return False
13 for i in range(2, int(math.sqrt(n)) + 1):
14 if n % i == 0:
15 return False
16 return True
17
18if __name__ == "__main__":
19 nums = [10_000_000 + i for i in range(50)]
20 start = time.time()
21 with Pool(processes=cpu_count()) as pool:
22 results = pool.map(is_prime, nums)
23 end = time.time()
24 print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")- يمكن لـ
Poolالتحكم تلقائياً في عدد العمليات العاملة، وتعيدmapالنتائج بنفس الترتيب الأصلي۔
الاتصال بين العمليات: نمط المنتج/المستهلك باستخدام Queue
قائمة الانتظار (Queue) هي قائمة انتظار الأول في الدخول، الأول في الخروج (FIFO) تقوم بنقل الكائنات بأمان بين العمليات۔ فيما يلي بعض الأنماط النموذجية۔
1# Explanation:
2# Demonstrates a producer putting items into a Queue
3# and consumer reading them.
4# This is useful for task pipelines between processes.
5
6from multiprocessing import Process, Queue
7import time
8import random
9
10def producer(q, n):
11 for i in range(n):
12 item = f"item-{i}"
13 print("Producer: putting", item)
14 q.put(item)
15 time.sleep(random.random() * 0.5)
16 q.put(None) # sentinel to signal consumer to stop
17
18def consumer(q):
19 while True:
20 item = q.get()
21 if item is None:
22 break
23 print("Consumer: got", item)
24 time.sleep(0.2)
25
26if __name__ == "__main__":
27 q = Queue()
28 p = Process(target=producer, args=(q, 5))
29 c = Process(target=consumer, args=(q,))
30 p.start()
31 c.start()
32 p.join()
33 c.join()
34 print("Main: done")Queueتتيح لك تمرير البيانات بين العمليات بشكل آمن۔ من الشائع استخدام قيمة خاصة مثلNoneللإشارة إلى انتهاء التنفيذ۔
الذاكرة المشتركة: Value و Array
يمكنك استخدام Value وArray عندما تريد مشاركة أرقام أو مصفوفات صغيرة بين العمليات۔ تحتاج إلى استخدام الأقفال لتجنب التعارضات۔
1# Explanation:
2# Use Value to share a single integer counter
3# and Array for a small numeric array.
4# Show how to use a Lock to avoid race conditions.
5
6from multiprocessing import Process, Value, Array, Lock
7import time
8
9def increment(counter, lock, times):
10 for _ in range(times):
11 with lock:
12 counter.value += 1
13
14def update_array(arr):
15 for i in range(len(arr)):
16 arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19 lock = Lock()
20 counter = Value('i', 0) # 'i' = signed int
21 shared_arr = Array('i', [0, 0, 0])
22
23 p1 = Process(target=increment, args=(counter, lock, 1000))
24 p2 = Process(target=increment, args=(counter, lock, 1000))
25 a = Process(target=update_array, args=(shared_arr,))
26
27 p1.start(); p2.start(); a.start()
28 p1.join(); p2.join(); a.join()
29
30 print("Counter:", counter.value)
31 print("Array:", list(shared_arr))- تستخدم
ValueوArrayآليات منخفضة المستوى (ذاكرة مشتركة على مستوى لغة C) لمشاركة البيانات بين العمليات، وليس بايثون نفسها۔ لذلك، فهو مناسب للقراءة والكتابة السريعة لكميات صغيرة من البيانات، ولكنه غير مناسب للتعامل مع كميات كبيرة من البيانات.۔
المشاركة المتقدمة: الكائنات المشتركة (القواميس والمصفوفات) باستخدام Manager
إذا أردت استخدام كائنات مشتركة أكثر مرونة مثل القوائم أو القواميس، استخدم Manager()۔
1# Explanation:
2# Manager provides proxy objects like dict/list
3# that can be shared across processes.
4# Good for moderate-size shared state and easier programming model.
5
6from multiprocessing import Process, Manager
7import time
8
9def worker(shared_dict, key, value):
10 shared_dict[key] = value
11
12if __name__ == "__main__":
13 with Manager() as manager:
14 d = manager.dict()
15 processes = []
16 for i in range(5):
17 p = Process(target=worker, args=(d, f"k{i}", i*i))
18 p.start()
19 processes.append(p)
20 for p in processes:
21 p.join()
22 print("Shared dict:", dict(d))Managerمناسب لمشاركة القواميس والقوائم، لكن كل عملية وصول ترسل البيانات بين العمليات وتتطلب تحويل بـpickle۔ لذلك، فإن تحديث كميات كبيرة من البيانات بشكل متكرر سيبطئ المعالجة۔
آليات التزامن: كيفية استخدام Lock وSemaphore
استخدم Lock أو Semaphore للتحكم في الوصول المتزامن للموارد المشتركة۔ يمكنك استخدامها بشكل مختصر مع عبارة with۔
1# Explanation:
2# Demonstrates using Lock to prevent simultaneous access to a critical section.
3# Locks are necessary when shared resources are not atomic.
4
5from multiprocessing import Process, Lock, Value
6
7def safe_add(counter, lock):
8 for _ in range(10000):
9 with lock:
10 counter.value += 1
11
12if __name__ == "__main__":
13 lock = Lock()
14 counter = Value('i', 0)
15 p1 = Process(target=safe_add, args=(counter, lock))
16 p2 = Process(target=safe_add, args=(counter, lock))
17 p1.start(); p2.start()
18 p1.join(); p2.join()
19 print("Counter:", counter.value)- الأقفال تمنع تضارب البيانات، ولكن إذا كانت المنطقة المقفلة كبيرة جدًا، فإن أداء المعالجة المتوازية سينخفض۔ يجب حماية الأجزاء الضرورية فقط كمناطق حرجة۔
الاختلافات بين fork على أنظمة UNIX والسلوك على ويندوز
في أنظمة UNIX، يتم تكرار العمليات افتراضيًا باستخدام fork ما يجعل تقنية النسخ عند الكتابة (copy-on-write) فعالة من حيث الذاكرة۔ يقوم نظام ويندوز ببدء العمليات باستخدام spawn (ما يؤدي إلى إعادة استيراد الوحدات)، لذا عليك الانتباه إلى حماية نقطة الدخول وتهيئة المتغيرات العامة۔
1# Explanation: Check start method (fork/spawn) and set it if needed.
2# Useful for debugging platform-dependent behavior.
3
4from multiprocessing import get_start_method, set_start_method
5
6if __name__ == "__main__":
7 print("Start method:", get_start_method())
8
9 # uncomment to force spawn on Unix for testing
10 # set_start_method('spawn')- يمكن استدعاء
set_start_methodمرة واحدة فقط في بداية برنامجك۔ من الأكثر أماناً عدم تغيير ذلك بشكل عشوائي داخل المكتبات۔
مثال عملي: قياس أداء المهام المعتمدة على وحدة المعالجة المركزية (مقارنة)
فيما يلي نص برمجي يقارن ببساطة مدى سرعة المعالجة عند استخدام التوازي مع multiprocessing۔ هنا نستخدم Pool۔
1# Explanation:
2# Compare sequential vs parallel execution times for CPU-bound task.
3# Helps understand speedup and overhead.
4
5import time
6from multiprocessing import Pool, cpu_count
7import math
8
9def heavy_task(n):
10 s = 0
11 for i in range(1, n):
12 s += math.sqrt(i)
13 return s
14
15def run_sequential(nums):
16 return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19 with Pool(processes=cpu_count()) as p:
20 return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23 nums = [2000000] * 8 # heavy tasks
24 t0 = time.time()
25 run_sequential(nums)
26 seq = time.time() - t0
27 t1 = time.time()
28 run_parallel(nums)
29 par = time.time() - t1
30 print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")- يوضح هذا المثال أنه حسب حجم المهام وعدد العمليات، قد يكون التنفيذ المتوازي غير فعال أحيانًا بسبب الحمل الإضافي۔ كلما كانت المهام أكبر (أثقل) وأكثر استقلالية، زادت الفائدة۔
قواعد أساسية هامة
فيما يلي النقاط الأساسية لاستخدام multiprocessing بأمان وكفاءة۔
- على نظام ويندوز، يتم إعادة استيراد الوحدات عند بدء العمليات الفرعية، ولذلك يجب حماية نقطة دخول السكربت بـ
if __name__ == "__main__":۔ - يتم تسلسل الاتصال بين العمليات (مع تحويل بـ
pickle)، لذا تصبح عملية نقل الكائنات الكبيرة مكلفة۔ - نظرًا لأن
multiprocessingينشئ عمليات مستقلة، فمن الشائع تحديد عدد العمليات استنادًا إلىmultiprocessing.cpu_count()۔ - إنشاء
Poolأخرى داخل عامل يصبح معقدًا، لذا يجب تجنب تداخل مثيلاتPoolقدر الإمكان۔ - نظرًا لأن الاستثناءات التي تحدث في العمليات الفرعية يصعب اكتشافها من العملية الرئيسية، فمن الضروري تنفيذ التسجيل ومعالجة الأخطاء بشكل صريح۔
- اضبط عدد العمليات وفقًا للمعالج، وفكر في استخدام الخيوط (threads) للمهام المعتمدة على الإدخال/الإخراج۔
نصائح تصميم عملية
فيما يلي بعض المفاهيم والأنماط المفيدة لتصميم المعالجة المتوازية۔
- من الفعال تقسيم العمليات إلى أدوار مثل قراءة الإدخال (I/O)، المعالجة المسبقة (متعددة المعالجات)، والتجميع (تسلسلي) من خلال 'pipelining' (تسلسل التنفيذ)۔
- لتبسيط التصحيح، تحقق أولاً من عملية التنفيذ في عملية واحدة قبل بدء التوازي۔
- بالنسبة للسجلات، افصل نواتج السجل لكل عملية (مثلاً، ضمّن PID في أسماء الملفات) لتسهيل عزل المشكلات۔
- جهز آليات لإعادة المحاولة وتحديد المهلات الزمنية حتى تتمكن من الاسترداد بأمان إذا توقفت إحدى العمليات۔
الملخص (نقاط رئيسية يمكنك استخدامها فوراً)
المعالجة المتوازية قوية، ولكن من المهم الحكم بشكل صحيح على طبيعة المهام وحجم البيانات وتكلفة الاتصال بين العمليات۔ تعد وحدة multiprocessing فعالة لمعالجة المهام المعتمدة على المعالج، لكن التصميم السيء أو أخطاء التزامن قد تقلل الأداء۔ إذا اتبعت القواعد والأنماط الأساسية، يمكنك بناء برامج متوازية آمنة وفعالة۔
يمكنك متابعة المقالة أعلاه باستخدام Visual Studio Code على قناتنا على YouTube.۔ يرجى التحقق من القناة على YouTube أيضًا.۔