الإدخال/الإخراج غير المتزامن

الإدخال/الإخراج غير المتزامن

تشرح هذه المقالة الإدخال/الإخراج غير المتزامن۔

يوضح هذا الدليل، خطوة بخطوة، مفاهيم وأنماط الإدخال/الإخراج غير المتزامن المفيدة عمليًا في بايثون۔

YouTube Video

الإدخال/الإخراج غير المتزامن (I/O)

مفهوم الإدخال/الإخراج غير المتزامن

الإدخال/الإخراج غير المتزامن هو آلية تسمح بتنفيذ عمليات أخرى بالتوازي أثناء انتظار عمليات إدخال/إخراج تستغرق وقتاً طويلاً، مثل عمليات الملفات أو الاتصال بالشبكة۔ في بايثون، يتم توفير asyncio كإطار عمل قياسي للبرمجة غير المتزامنة، وقد تم تصميم العديد من المكتبات لتتبع هذه الآلية۔

الأساسيات: async / await وحلقة الأحداث

أولاً، إليك كيفية كتابة الكوروتينات الأساسية ومثال على تشغيل عدة كوروتينات في نفس الوقت باستخدام asyncio.gather۔

الكود أدناه هو مثال بسيط على تعريف وتشغيل الدوال غير المتزامنة بشكل متزامن۔ تُستخدم دالة sleep لتوضيح التنفيذ المتوازي۔

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • يبدأ هذا الكود حلقة الأحداث باستخدام asyncio.run() ويقوم بتشغيل ثلاث كوروتينات بشكل متزامن۔

async with ومديرو السياق غير المتزامنين

في المعالجة غير المتزامنة، يمكن أن يصبح إدارة الموارد مثل فتح الاتصالات وإغلاق الملفات معقدًا بسهولة۔ هنا تصبح مديري السياق غير المتزامنين باستخدام async with مفيدين۔ يتم استخدام هذه البنية تمامًا مثل عبارة with المتزامنة، ولكن المعالجة الداخلية غير متزامنة، مما يجعلها تندمج بسلاسة مع تدفق async/await۔

هناك سببين رئيسيين لاستخدام async with

  • لتنظيف الموارد مثل الاتصالات، ومقابض الملفات، أو الجلسات بشكل موثوق۔ يمكنك أن تطمئن إلى أن الموارد سيتم تحريرها بشكل صحيح حتى في حالة حدوث إنهاء غير طبيعي۔
  • لأتمتة مهام التهيئة والتنظيف مثل إنشاء أو إغلاق الاتصالات وتفريغ البيانات بطريقة غير متزامنة۔ يوفر هذا عناء الترميز اليدوي ويجعل كودك أكثر وضوحًا۔

فيما يلي مثال على إنشاء مدير سياق غير متزامن بسيط من الصفر۔

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • عن طريق تعريف __aenter__ و __aexit__، يمكنك استخدام async with۔
  • يتم تنفيذ المعالجة عند الدخول والخروج من كتلة async with بشكل غير متزامن وآمن۔

إدخال/إخراج الملفات غير المتزامن (aiofiles)

تعد عمليات الملفات مثالًا كلاسيكيًا على الحجب۔ باستخدام aiofiles، يمكنك التعامل مع عمليات الملفات بشكل غير متزامن وآمن۔ داخلياً، يستخدم مجموعة من مؤشرات الترابط ويضمن إغلاق الملفات بشكل صحيح باستخدام async with۔

يوضح المثال التالي قراءة ملفات متعددة بشكل غير متزامن ومتوازي۔ يجب عليك تثبيت aiofiles باستخدام pip install aiofiles قبل تشغيل هذا الكود۔

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • يقوم هذا الكود بموازاة قراءة كل ملف۔ في كثير من الأحيان يستخدم aiofiles مجموعة من مؤشرات الترابط داخليًا، مما يتيح لك معالجة إدخال/إخراج الملفات المحجوبة من خلال واجهة غير متزامنة۔

عميل HTTP غير متزامن (aiohttp)

كمثال كلاسيكي على إدخال/إخراج الشبكة، إليك كيفية تنفيذ طلبات HTTP بشكل غير متزامن۔ يكون قويًا بشكل خاص عند الحاجة لتنفيذ عدد كبير من طلبات HTTP بالتوازي۔

فيما يلي مثال على جلب عدة عناوين URL بالتوازي باستخدام aiohttp۔ سيتعين عليك تثبيت aiohttp باستخدام الأمر pip install aiohttp۔

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • عن طريق استخدام asyncio.as_completed، يمكنك معالجة النتائج حسب ترتيب انتهاء المهام۔ هذا مفيد للتعامل بكفاءة مع العديد من الطلبات۔

التعايش مع الإدخال/الإخراج المحجوب: run_in_executor

عند التعامل مع المهام التي تستهلك وحدة المعالجة المركزية بكثرة أو واجهات البرمجة المعيقة الموجودة في الكود غير المتزامن، استخدم ThreadPoolExecutor أو ProcessPoolExecutor عبر loop.run_in_executor۔

الكود التالي هو مثال لتشغيل مهام تفترض وجود عملية إدخال/إخراج معيقة بشكل متزامن باستخدام مجموعة مؤشرات۔

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • من خلال استخدام run_in_executor، يمكنك دمج الكود المتزامن الحالي في تدفقات غير متزامنة دون تعديلات كبيرة۔ ومع ذلك، يجب الانتباه إلى عدد مؤشرات الترابط وحمل المعالج۔
  • ProcessPoolExecutor مناسب للمهام التي تستهلك وحدة المعالجة المركزية۔

الخادم غير المتزامن: خادم Echo TCP يعتمد على asyncio

إذا كنت ترغب في التعامل مع المقابس (Sockets) مباشرةً، يمكنك بسهولة بناء خادم غير متزامن باستخدام asyncio.start_server۔

المثال التالي هو خادم echo بسيط يعيد البيانات كما استلمها من العميل۔

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • في الاتصالات عبر TCP باستخدام asyncio، يلعب كل من StreamReader وStreamWriter دورًا أساسيًا في عمليات الإدخال والإخراج غير المتزامنة.۔ StreamReader يقرأ البيانات المرسلة من العميل بشكل غير متزامن، بينما يتم استخدام StreamWriter لإرسال الردود من الخادم إلى العميل.۔

  • حتى من دون التعامل بنفسك مع العمليات التفصيلية للمقابس، يمكنك تشغيل خادم غير متزامن ببساطة وكفاءة باستخدام asyncio.start_server

  • عند تمرير دالة معالج إلى asyncio.start_server، تستقبل تلك الدالة كلاً من reader وwriter كوسيطين.۔ باستخدام هذه الأدوات، يمكنك تنفيذ عمليات الاتصال بطريقة أكثر أمانًا ووضوحًا من التعامل المباشر مع واجهات برمجة التطبيقات للمقابس منخفضة المستوى.۔ على سبيل المثال، من خلال استقبال البيانات باستخدام reader.read() ودمج writer.write() مع writer.drain()، يمكنك تنفيذ إرسال غير متزامن يضمن اكتمال النقل.۔

  • هذا الإعداد مناسب للتعامل مع عدد كبير من الاتصالات المتزامنة وهو مثالي للبروتوكولات البسيطة أو خدمات TCP ذات النطاق الصغير۔

التعامل مع بيانات البث الكبيرة

عند معالجة الملفات الكبيرة أو الاستجابات بشكل متتالٍ، اقرأ واكتب البيانات على شكل أجزاء (chunks) للحفاظ على انخفاض استخدام الذاكرة۔ فيما يلي مثال على قراءة البيانات بطريقة التدفق (streaming) باستخدام aiohttp۔

الكود التالي يعالج استجابات HTTP على شكل أجزاء ويكتبها إلى القرص مع وصول البيانات۔

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • هذا الكود لا يحمل ملفًا كبيرًا دفعة واحدة؛ بل يستقبل البيانات على شكل أجزاء (قطع صغيرة) ويكتبها إلى ملف بشكل غير متزامن.۔ وبالتالي، يمكنه إجراء التحميلات بسرعة وكفاءة مع الحفاظ على استهلاك منخفض للذاكرة.۔ aiohttp يسترجع البيانات بشكل غير متزامن وaiofiles يكتب إلى الملف دون حجب، مما يجعل من السهل تشغيل العمليات إلى جانب عمليات أخرى.۔

  • هذا النمط مناسب لتنزيل وحفظ الملفات الكبيرة بكفاءة مع تقليل استخدام الذاكرة۔

تنفيذ العمليات الفرعية (subprocesses) بشكل غير متزامن

إذا كنت ترغب في تشغيل أوامر خارجية بشكل غير متزامن وقراءة مخرجاتها في الوقت الحقيقي، فإن asyncio.create_subprocess_exec مفيد۔

فيما يلي مثال على تشغيل أمر خارجي وقراءة مخرجاته القياسية في الوقت الحقيقي۔

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • من خلال التحكم في العمليات الفرعية بشكل غير متزامن، يمكنك التعامل مع سجلات الأدوات الخارجية في الوقت الحقيقي أو تشغيل عدة عمليات بالتوازي۔

التعامل مع الإلغاء وحدود الوقت

يمكن إلغاء المهام غير المتزامنة۔ عند تنفيذ حد وقت، من السهل استخدام asyncio.wait_for۔

فيما يلي مثال على تشغيل مهمة بحد وقت۔

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • ترمي wait_for استثناء TimeoutError إذا تم تجاوز الحد الزمني وتلغي المهمة إذا لزم الأمر۔ كن حذرًا بشأن انتشار الإلغاء وتنظيف المهام۔

التحكم في التزامن (Semaphore)

نظرًا لأن إجراء العديد من الاتصالات أو الطلبات المتزامنة قد يستهلك الموارد، حدد التزامن باستخدام asyncio.Semaphore۔

فيما يلي مثال على تحديد عدد التنزيلات المتزامنة باستخدام semaphore۔

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • بهذه الطريقة، يمكنك الوصول إلى الخدمات الخارجية بلطف وتجنب تحميل عمليتك بشكل زائد۔

معالجة الأخطاء واستراتيجيات إعادة المحاولة

تحدث الأخطاء بالضرورة حتى في المعالجة غير المتزامنة۔ التقط الاستثناءات بشكل مناسب وطبق استراتيجيات إعادة المحاولة مثل التراجع الأسي (exponential backoff)۔

فيما يلي مثال تطبيقي على إعادة المحاولة البسيطة حتى N مرات۔

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • من المهم وجود منطق إعادة محاولة جيد من أجل تحقيق التوازن بين الاتساق والتحكم في حركة المرور۔

نصائح لتصحيح الأخطاء وتسجيل السجلات (Logging)

في المعالجة غير المتزامنة، تتقدم المهام بشكل متزامن، مما قد يصعب تحديد سبب المشكلات۔ لتتبع المشكلات بكفاءة، سيساعد مراعاة النقاط التالية في تسهيل عملية التصحيح۔

  • من السهل تفويت الاستثناءات الصادرة عن asyncio.run() و Task، لذا تأكد من تسجيل الاستثناءات غير المعالجة۔
  • عند استخدام logging، يساعد تضمين اسم الدالة التعاونية أو، في بايثون 3.8 وما فوق، task.get_name() في السجلات على سهولة تتبع العمليات.۔
  • يمكنك التحقق من الحالة الحالية للمهام باستخدام asyncio.Task.all_tasks().۔ ومع ذلك، فإن هذه الواجهة البرمجية مخصصة لأغراض تصحيح الأخطاء ويجب استخدامها بحذر في بيئات الإنتاج لتجنب مشاكل الأداء أو التداخل غير المتوقع.۔

الاعتبارات المتعلقة بالأداء

بينما تتفوق البرمجة غير المتزامنة في التعامل مع انتظار الإدخال/الإخراج، إلا أن الاستخدام غير السليم يمكن أن يقلل من الأداء فعلاً۔ حسّن الأداء عبر مراعاة النقاط التالية:۔

  • تتفوق المعالجة غير المتزامنة في المهام المرتبطة بالإدخال/الإخراج، لكنها غير مناسبة للمهام المرتبطة بالمعالج (CPU)؛ استخدم مجموعة العمليات لهذه الحالات۔
  • عند استخدام مجموعات مؤشرات الترابط أو العمليات، ضع في الاعتبار حجم المجموعة وطبيعة المهمة۔
  • إذا بدأت العديد من المهام الصغيرة دفعة واحدة، يزيد الحمل على حلقة الأحداث؛ لذا استخدم التجميع (batching) أو semaphores للتعديل۔

الملخص

الإدخال/الإخراج غير المتزامن في بايثون هو آلية قوية تستغل أوقات الانتظار في الإدخال/الإخراج بشكل فعّال وتنفذ عمليات الشبكة والملفات بكفاءة وبشكل متزامن۔ من خلال دمج تقنيات مثل asyncio و aiohttp و aiofiles و run_in_executor، يمكنك بناء تطبيقات عملية غير متزامنة بمرونة۔ استخدام async with لأتمتة اكتساب وتحرير الموارد يمكّنك من إدارة الموارد غير المتزامنة مثل الملفات وجلسات HTTP وأقفال البيانات بأمان وموثوقية۔ من خلال دمج معالجة أخطاء وإدارة تزامن مناسبة، يمكنك تشغيل برامج غير متزامنة عالية الموثوقية بأمان۔

يمكنك متابعة المقالة أعلاه باستخدام Visual Studio Code على قناتنا على YouTube.۔ يرجى التحقق من القناة على YouTube أيضًا.۔

YouTube Video