التحكم في التزامن في المعالجة غير المتزامنة في بايثون

التحكم في التزامن في المعالجة غير المتزامنة في بايثون

تشرح هذه المقالة التحكم في التزامن في المعالجة غير المتزامنة في بايثون۔

سوف تتعلم خطوة بخطوة، من أساسيات asyncio إلى الأنماط العملية الشائعة للتحكم في التزامن.۔

YouTube Video

التحكم في التزامن في المعالجة غير المتزامنة في بايثون

في المعالجة غير المتزامنة، يمكنك تشغيل عدة مهام في نفس الوقت بسهولة۔ ومع ذلك، في التطبيق العملي، هناك حاجة إلى تعديلات أكثر تقدماً مثل التحكم في التوازي، تنسيق المهام، التحكم الحصري في الموارد المشتركة، التعامل مع العمليات المتزامنة الثقيلة، والتنظيف بعد الإلغاءات۔

هنا، سنتعلم خطوة بخطوة من أساسيات asyncio إلى الأنماط العملية الشائعة المستخدمة في التحكم في التزامن۔

مقدمة: الأساسيات (async / await و create_task)

دعونا أولاً نلقي نظرة على بعض الشيفرات البسيطة غير المتزامنة۔ await تنتظر في تلك النقطة حتى تنتهي coroutine المستدعاة، و asyncio.create_task يحدد مهمة للتنفيذ المتوازي۔

 1import asyncio
 2
 3async def worker(name, delay):
 4    # Simulate async work
 5    await asyncio.sleep(delay)
 6    return f"{name} done after {delay}s"
 7
 8async def main():
 9    # Create two tasks that run concurrently.
10    t1 = asyncio.create_task(worker("A", 1))
11    t2 = asyncio.create_task(worker("B", 2))
12
13    # Await both results (this suspends until both are done).
14    result1 = await t1
15    result2 = await t2
16    print(result1, result2)
17
18if __name__ == "__main__":
19    asyncio.run(main())
  • هذه الشيفرة هي نمط نموذجي حيث يتم إنشاء المهام بشكل صريح وتشغيلها بالتوازي، ويتم استلام النتائج في النهاية باستخدام await۔ create_task تمكّن التنفيذ المتوازي۔

الاختلافات بين asyncio.gather و asyncio.wait و asyncio.as_completed

عند تشغيل عدة coroutines في وقت واحد، تختار أيها تستخدم حسب كيفية جلب النتائج التي تريدها۔ gather ينتظر حتى تنتهي كل المهام ويعيد النتائج بالترتيب المدخل، بينما as_completed يسمح بمعالجة النتائج فور انتهاءها بغض النظر عن الترتيب۔

 1import asyncio
 2import random
 3
 4async def job(i):
 5    delay = random.random() * 2
 6    await asyncio.sleep(delay)
 7    return (i, delay)
 8
 9async def gather_example():
10    # gather waits for all tasks and returns results in the same order as input
11    results = await asyncio.gather(*(job(i) for i in range(5)))
12    print("gather order:", results)
13
14async def as_completed_example():
15    # as_completed yields results as they finish (useful to process early results)
16    tasks = [asyncio.create_task(job(i)) for i in range(5)]
17    for coro in asyncio.as_completed(tasks):
18        res = await coro
19        print("completed:", res)
20
21async def main():
22    await gather_example()
23    await as_completed_example()
24
25if __name__ == "__main__":
26    asyncio.run(main())
  • كما هو موضح في هذه الشيفرة، يعيد gather النتائج وفق ترتيب الإدخال، مما يجعله مفيداً عندما ترغب بالحفاظ على الترتيب۔ يُستخدم as_completed عندما تريد معالجة النتائج بمجرد الانتهاء۔

التحكم في التوازي: تحديد عدد العمليات المتزامنة باستخدام asyncio.Semaphore

عندما توجد حدود لمعدل استدعاء واجهة برمجة التطبيقات أو حدود اتصال قاعدة البيانات، يمكنك التحكم في عدد العمليات المتزامنة باستخدام Semaphore۔

 1import asyncio
 2import random
 3
 4sem = asyncio.Semaphore(3)  # allow up to 3 concurrent workers
 5
 6async def limited_worker(i):
 7    async with sem:
 8        # Only 3 tasks can be inside this block concurrently
 9        delay = random.random() * 2
10        await asyncio.sleep(delay)
11        print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14    tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15    await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18    asyncio.run(main())
  • عند استخدام Semaphore مع async with، يمكنك بسهولة تحديد عدد العمليات المتزامنة۔ وهذا فعال في الحالات التي بها قيود خارجية۔

التحكم الحصري في الموارد المشتركة: asyncio.Lock

Lock تُستخدم لمنع التحديثات المتزامنة للبيانات المشتركة۔ asyncio.Lock هي أداة حصرية مصممة للاستخدام غير المتزامن۔

 1import asyncio
 2
 3counter = 0
 4lock = asyncio.Lock()
 5
 6async def incrementer(n_times):
 7    global counter
 8    for _ in range(n_times):
 9        # Acquire lock to update shared counter safely
10        async with lock:
11            tmp = counter
12            await asyncio.sleep(0)  # yield control to increase race likelihood
13            counter = tmp + 1
14
15async def main():
16    tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17    await asyncio.gather(*tasks)
18    print("final counter:", counter)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • إذا قامت عدة مهام بتحديث متغير مشترك مثل counter عام، فقد تحدث تعارضات۔ بتغليف العمليات مع Lock، يمكنك الحفاظ على الاتساق۔

تنسيق المهام: asyncio.Event

يتم استخدام Event عندما تشير مهمة واحدة بأنها جاهزة وتنتظر المهام الأخرى هذا الإشارة۔ هذه طريقة بسيطة لجعل المهام تتشارك الإشارات وتتزامن فيما بينها۔

 1import asyncio
 2
 3event = asyncio.Event()
 4
 5async def waiter(name):
 6    print(f"{name} is waiting for event")
 7    await event.wait()
 8    print(f"{name} resumed after event set")
 9
10async def setter():
11    print("setter will sleep and then set the event")
12    await asyncio.sleep(1)
13    event.set()
14    print("event set by setter")
15
16async def main():
17    tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18    await asyncio.create_task(setter())
19    await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22    asyncio.run(main())
  • يحتوي Event على علم منطقي، وعند استدعاء set() تُستأنف جميع المهام المنتظرة۔ وهي مفيدة للتزامن البسيط۔

نمط المنتج-المستهلك: asyncio.Queue

من خلال استخدام Queue، يمكن للمنتجين (الذين ينشئون البيانات) والمستهلكين (الذين يعالجون البيانات) التنسيق بسلاسة وبشكل غير متزامن۔ أيضاً، عندما يكون الطابور ممتلئاً، ينتظر المنتجون تلقائياً، مما يقدم آلية ضغط عكسي طبيعية لمنع الإفراط في الإنتاج۔

 1import asyncio
 2import random
 3
 4async def producer(q, n_items):
 5    for i in range(n_items):
 6        await asyncio.sleep(random.random() * 0.5)
 7        item = f"item-{i}"
 8        await q.put(item)
 9        print("produced", item)
10    # signal consumers to stop
11    await q.put(None)
12
13async def consumer(q, name):
14    while True:
15        item = await q.get()
16        if item is None:
17            # put sentinel back for other consumers and break
18            await q.put(None)
19            break
20        await asyncio.sleep(random.random() * 1)
21        print(name, "consumed", item)
22        q.task_done()
23
24async def main():
25    q = asyncio.Queue(maxsize=5)  # bounded queue to apply backpressure
26    prod = asyncio.create_task(producer(q, 10))
27    cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28    await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31    asyncio.run(main())
  • Queue تساعد في تنسيق المنتجين والمستهلكين بطريقة غير متزامنة۔ بالإضافة إلى ذلك، عند ضبط maxsize، يجعل المنتج ينتظر عند تنفيذ put إذا كان الطابور ممتلئاً، مما يمنع الإفراط في الإنتاج۔

التعامل مع العمليات المتزامنة التي تعيق التنفيذ: run_in_executor

للمعالجة المكثفة للمعالج أو عند استخدام مكتبات لا تدعم async، استخدم run_in_executor لتفويض المعالجة إلى سلسلة أو عملية أخرى۔ يمنع ذلك حلقة الأحداث الرئيسية من التوقف، ويسمح للمهام غير المتزامنة الأخرى بالعمل بسلاسة۔

 1import asyncio
 2import time
 3
 4def blocking_io(x):
 5    # simulate blocking I/O or CPU-bound work
 6    time.sleep(2)
 7    return x * x
 8
 9async def main():
10    loop = asyncio.get_running_loop()
11    # run blocking_io in default thread pool
12    result = await loop.run_in_executor(None, blocking_io, 3)
13    print("blocking result:", result)
14
15if __name__ == "__main__":
16    asyncio.run(main())
  • استدعاء الدوال المتزامنة مباشرة سيؤدي إلى حجب حلقة الأحداث۔ باستخدام run_in_executor، يتم تشغيل الشيفرة في سلسلة منفصلة ويمكن للمهام غير المتزامنة الاستمرار بالتقدم بالتوازي۔

مثال: استدعاءات API محددة المعدل (الجمع بين Semaphore و run_in_executor)

فيما يلي سيناريو نموذجي يتم فيه تحديد معدل استدعاءات API ويتم تنفيذ معالجة ثقيلة على النتائج۔ الجمع بين Semaphore و run_in_executor يسمح باستمرار المعالجة بشكل آمن وفعال۔

 1import asyncio
 2import time
 3import random
 4
 5sem = asyncio.Semaphore(5)
 6
 7def heavy_sync_processing(data):
 8    # simulate heavy CPU-bound work
 9    time.sleep(1)
10    return f"processed-{data}"
11
12async def api_call(i):
13    await asyncio.sleep(random.random() * 0.5)  # simulate network latency
14    return f"data-{i}"
15
16async def worker(i):
17    async with sem:
18        data = await api_call(i)
19        # offload CPU-bound work to threadpool
20        loop = asyncio.get_running_loop()
21        result = await loop.run_in_executor(None, heavy_sync_processing, data)
22        print(result)
23
24async def main():
25    tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26    await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29    asyncio.run(main())
  • نستخدم Semaphore للحد من عدد استدعاءات API المتزامنة، ويتم تفويض المعالجة الثقيلة للبيانات الناتجة إلى تجمع سلاسل العمليات۔ فصل معالجة الشبكة ومعالجة المعالج يحسن الكفاءة۔

إلغاء المهام والتنظيف

عند إلغاء مهمة ما، من المهم جداً التعامل بشكل صحيح مع finally و asyncio.CancelledError۔ وهذا يضمن تحرير الملفات والاتصالات والتعامل بشكل سليم مع الحالات الوسيطة، مما يحافظ على الاتساق داخل التطبيق۔

 1import asyncio
 2
 3async def long_running():
 4    try:
 5        print("started long_running")
 6        await asyncio.sleep(10)
 7        print("finished long_running")
 8    except asyncio.CancelledError:
 9        print("long_running was cancelled, cleaning up")
10        # perform cleanup here
11        raise
12
13async def main():
14    task = asyncio.create_task(long_running())
15    await asyncio.sleep(1)
16    task.cancel()
17    try:
18        await task
19    except asyncio.CancelledError:
20        print("task cancelled in main")
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • يتم تسليم الإلغاء على شكل استثناء (CancelledError)، لذا قم بالتنظيف اللازم داخل كتلة except وأعد إطلاق الاستثناء إذا لزم الأمر۔

نقاط رئيسية للتصميم العملي

النقاط التالية عملية ومفيدة لتصميم المعالجة غير المتزامنة۔

  • تحكم في التوازي بشكل صريح عندما توجد حدود للموارد مثل واجهات برمجة التطبيقات أو قواعد البيانات، يمكنك تحديد عدد العمليات المتزامنة باستخدام Semaphore۔

  • تعامل بأمان مع الموارد المشتركة إذا كنت بحاجة لتحديث الحالة من عدة مهام، استخدم Lock۔ تقليل حالة الموارد المشتركة وتصميم النظام حول البيانات غير القابلة للتغيير يزيد الأمان۔

  • اختر كيف تستقبل النتائج إذا كنت تريد معالجة المهام حال اكتمالها، استخدم asyncio.as_completed؛ وإذا كنت تريد معالجة النتائج حسب ترتيب الإدخال، استخدم gather۔

  • اعزل عمليات المعالجة المتزامنة الثقيلة للمكتبات المتزامنة أو المعالجة الكثيفة للمعالج، استخدم run_in_executor أو ProcessPoolExecutor لتجنب حجب حلقة الأحداث۔

  • خطط للإلغاء والاستثناءات اكتب معالجة استثناءات مناسبة لتنظيف الموارد بأمان حتى لو تم إلغاء المهمة أثناء التنفيذ۔

  • سهل عملية الاختبار قم بتجريد التأثيرات الجانبية مثل الإدخال/الإخراج، والوقت، والعشوائية بحيث يمكن استبدالها، لتسهيل اختبار الشيفرة غير المتزامنة۔

الملخص

asyncio قوية، ولكن إذا ركزت فقط على "تشغيل الأشياء بالتوازي"، فقد تواجه مشاكل مثل التنافس على الموارد المشتركة، تجاوز حدود الموارد، أو حجب حلقة الأحداث۔ عند الجمع بين Semaphore و Lock و Event و Queue و run_in_executor مع التعامل الصحيح مع الإلغاء، يمكنك تصميم تطبيقات غير متزامنة آمنة وفعالة۔ من خلال استخدام آليات مثل نمط المنتج-المستهلك، تحديد التوازي، أو فصل المعالجة غير المتزامنة عن المعالجة التي تحجب التنفيذ، يمكن بناء تدفقات عمل غير متزامنة بشكل أكثر أماناً وكفاءة۔

يمكنك متابعة المقالة أعلاه باستخدام Visual Studio Code على قناتنا على YouTube.۔ يرجى التحقق من القناة على YouTube أيضًا.۔

YouTube Video