קלט/פלט אסינכרוני

קלט/פלט אסינכרוני

מאמר זה מסביר קלט/פלט אסינכרוני.

מדריך זה מסביר בעדינות, שלב אחר שלב, את העקרונות והתבניות של קלט/פלט אסינכרוני שנמצאים בשימוש מעשי בפייתון.

YouTube Video

קלט/פלט אסינכרוני (I/O)

המושג של קלט/פלט אסינכרוני

קלט/פלט אסינכרוני הוא מנגנון שמאפשר לפעולות אחרות לרוץ במקביל בזמן שממתינים לפעולות I/O איטיות, כמו עבודה עם קבצים או תקשורת רשת. בפייתון, Asyncio הוא פריימוורק סטנדרטי לאסינכרוניות, ורבות מהספריות בנויו כך שיעבדו בשיטה זו.

יסודות: async/await ולולאת האירועים (Event Loop)

ראשית, כך כותבים קורוטינות בסיסיות ודוגמה להרצת כמה קורוטינות במקביל בעזרת asyncio.gather.

הקוד למטה הוא דוגמה מינימלית להגדרת והרצה מקבילה של פונקציות אסינכרוניות. הפונקציה sleep משמשת להדגמת ביצוע מקבילי.

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • קוד זה מתחיל את לולאת האירועים עם asyncio.run() ומריץ שלוש קורוטינות במקביל.

async with ומנהלי הקשר אסינכרוניים

בעיבוד אסינכרוני, ניהול משאבים כמו פתיחת חיבורים וסגירת קבצים עלול להסתבך בקלות. כאן נכנסים לתמונה מנהלי הקשר אסינכרוניים באמצעות async with. תחביר זה דומה ל-with הסינכרוני, אך הביצועים בו אסינכרוניים ומתאימים לזרימת הקוד ב-async/await.

ישנן שתי סיבות עיקריות לשימוש ב-async with:.

  • לנקות באופן אמין משאבים כמו חיבורים, ידיות קבצים או סשנים. ניתן להיות בטוחים שהמשאבים ישוחררו גם במקרה של סיום חריג.
  • לבצע אוטומציה של פעולות אתחול וסיום, כמו פתיחה/סגירת חיבורים וניקוי זיכרון, בצורה אסינכרונית. זה חוסך קוד ידני ומפשט את הקוד שלך.

להלן דוגמה ליצירת מנהל הקשר אסינכרוני פשוט מהתחלה.

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • על ידי הגדרת __aenter__ ו-__aexit__ ניתן להשתמש ב-async with.
  • הפעולות בכניסה ויציאה מבלוק async with יבוצעו באופן אסינכרוני ובטוח.

קלט/פלט קבצים אסינכרוני (aiofiles)

עבודה עם קבצים היא דוגמה קלאסית לפעולה שחוסמת. באמצעות aiofiles תוכל לבצע פעולות קבצים בצורה אסינכרונית ובטוחה. ברקע, הספרייה משתמשת ב-thread pool ודואגת לסגור קבצים נכון עם async with.

הדוגמה הבאה מתארת קריאה אסינכרונית ומשולבת של כמה קבצים במקביל. יש להתקין את aiofiles באמצעות pip install aiofiles לפני הרצת קוד זה.

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • קוד זה מפרק לקריאות מקבילות לכל קובץ בנפרד. בפנים, aiofiles עושה שימוש ב-thread pool, ומאפשר גישה אסינכרונית לפעולות קבצים שחוסמות.

לקוח HTTP אסינכרוני (aiohttp)

כדוגמה קלאסית ל-I/O ברשת, כך מבצעים בקשות HTTP אסינכרונית. הכלי הזה שימושי במיוחד כאשר צריך לבצע מספר רב של בקשות HTTP במקביל.

להלן דוגמה לאיסוף מספר URL-ים במקביל באמצעות aiohttp. יש להתקין את aiohttp באמצעות pip install aiohttp.

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • באמצעות asyncio.as_completed ניתן לעבד תוצאות בסדר סיום המשימות. זה יעיל במיוחד עבור ניהול של מספר רב של בקשות.

קיום משותף עם קלט/פלט חוסם: run_in_executor

כאשר מתמודדים עם משימות שדורשות עיבוד כבד או ממשקי API חוסמים קיימים בקוד אסינכרוני, יש להשתמש ב-ThreadPoolExecutor או ProcessPoolExecutor דרך loop.run_in_executor.

הקוד הבא הוא דוגמה להרצת משימות שמניחות קלט/פלט חוסם במקביל באמצעות סליל תהליכונים (thread pool).

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • על ידי שימוש ב-run_in_executor, ניתן לשלב קוד סינכרוני קיים בתוך זרימות אסינכרוניות ללא צורך בשכתוב משמעותי. אך יש לשים לב למספר התהליכים ולעומס על ה-CPU.
  • ProcessPoolExecutor מתאים למשימות תלויות CPU.

שרת אסינכרוני: שרת ה-TCP Echo מבוסס asyncio

אם תרצה לנהל סוקטים ישירות, תוכל לבנות שרת אסינכרוני בקלות עם asyncio.start_server.

הדוגמה הבאה היא שרת echo פשוט שמחזיר בדיוק את מה שקיבל מהלקוח.

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • בתקשורת TCP עם asyncio, StreamReader ו-StreamWriter ממלאים תפקיד מרכזי בקלט ופלט אסינכרוניים. StreamReader קורא מידע שנשלח מהלקוח באופן אסינכרוני, בעוד ש-StreamWriter משמש לשליחת תשובות מהשרת בחזרה ללקוח.

  • גם מבלי לטפל בפעולות המפורטות של סוקטים בעצמך, ניתן להפעיל שרת אסינכרוני בצורה פשוטה ויעילה באמצעות asyncio.start_server.

  • כאשר מועברת פונקציית מטפל ל-asyncio.start_server, פונקציה זו מקבלת את reader ו-writer כארגומנטים. באמצעות השימוש בהם, ניתן לממש תהליכי תקשורת בצורה בטוחה וברורה יותר מאשר טיפול ישיר ב-API-ים של סוקטים ברמה נמוכה. לדוגמה, על ידי קבלת נתונים באמצעות reader.read() ושילוב של writer.write() עם writer.drain(), ניתן לממש שליחה אסינכרונית שמבטיחה שהשידור הושלם.

  • הגדרה זו מתאימה לטיפול במספר רב של חיבורים בו זמנית, מומלצת לפרוטוקולים פשוטים או שירותי TCP קטנים.

ניהול נתונים בזרימה (Streaming) גדולים

בעת עיבוד קבצים גדולים או תגובות רצופות, יש לקרוא ולכתוב נתונים בגושים (chunks) כדי לחסוך בזיכרון. להלן דוגמת קריאה בזרימה עם aiohttp.

הקוד הבא מעבד תגובות HTTP בגושים וכותב לדיסק בזמן אמת.

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • קוד זה אינו טוען קובץ גדול בבת אחת; אלא, הוא מקבל את הנתונים במקטעים (חתיכות קטנות) וכותב אותם לקובץ באופן אסינכרוני. כתוצאה מכך, ניתן לבצע הורדות במהירות וביעילות תוך שמירה על שימוש נמוך בזיכרון. aiohttp מאחזר נתונים באופן אסינכרוני ו-aiofiles כותב לקובץ ללא חסימה, מה שמאפשר לפעול בקלות לצד תהליכים נוספים.

  • תבנית זו מתאימה להורדה ושמירה של קבצים גדולים באופן יעיל וחסכוני בזיכרון.

הרצת תתי-תהליכים (Subprocesses) בצורה אסינכרונית

אם ברצונך להריץ פקודות חיצוניות בצורה אסינכרונית ולקרוא את הפלט בזמן אמת, asyncio.create_subprocess_exec הוא הכלי המתאים.

להלן דוגמה להרצת פקודה חיצונית וקריאת הפלט הסטנדרטי שלה בזמן אמת.

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • באמצעות ניהול אסינכרוני של תתי תהליכים, תוכל לנהל לוגים מכלים חיצוניים בזמן אמת או להריץ מספר תהליכים במקביל.

ניהול ביטול (Cancel) ופג תוקף (Timeout)

משימות אסינכרוניות יכולות להתבטל. כשרוצים ליישם פג תוקף (timeout), ניתן להשתמש בפשטות ב-asyncio.wait_for.

להלן דוגמה להרצת משימה עם תזמון פג תוקף.

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • wait_for יזרוק חריגת TimeoutError כאשר הזמן נגמר, ויבטל את המשימה במידת הצורך. יש להיזהר עם הפצת ביטול המשימה וניקוי משאבים לאחריה.

שליטה בריבוי משימות (Concurrency) עם Semaphore

כיוון שחיבורים ובקשות במקביל רבים עלולים לכלות משאבים, יש להגביל ריבוי משימות בעזרת asyncio.Semaphore.

הדוגמה הבאה מציגה כיצד מגבילים הורדות סימולטניות עם סמפור (Semaphore).

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • בעזרת שיטה זו ניתן לגשת לשירותים חיצוניים בזהירות ולמנוע עומס יתר על התהליך שלך.

טיפול בשגיאות ואסטרטגיות נסיונות חוזרים (Retry)

טעויות יתרחשו גם בעיבוד אסינכרוני. יש לטפל בחריגות כראוי וליישם מנגנוני נסיונות חוזרים (כגון exponential backoff).

להלן דוגמה ליישום נסיונות חוזרים פשוטים עד N פעמים.

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • לוגיקה נכונה של נסיונות חוזרים חיונית לאיזון בין עקביות לניהול תעבורה.

טיפים לדיבאג ולניהול רישום (לוגים)

בפיתוח אסינכרוני המשימות רצות במקביל, מה שמסרבל את איתור הסיבה לתקלות. לאיתור תקלות, שמירה על הנקודות הבאות תאפשר דיבאג נוח יותר.

  • שגיאות מ-asyncio.run() ומ-Task עלולות להיעלם; דאגו לרשום חריגות שלא טופלו.
  • בעת שימוש ב-logging, הכללת שם הקורוטינה או, בפייתון 3.8 ומעלה, task.get_name() ביומני הרישום מקלה על המעקב.
  • ניתן לבדוק את המצב הנוכחי של משימות באמצעות asyncio.Task.all_tasks(). ואולם, API זה מיועד למטרות דיבאג בלבד ויש להשתמש בו בזהירות בסביבות ייצור כדי להימנע מבעיות ביצועים או הפרעות לא צפויות.

היבטי ביצועים

אף שתכנות אסינכרוני מצטיין בטיפול בזמני המתנה ל-I/O, שימוש לא נכון עלול דווקא להאט. יש לאופטם תוך שמירה על הנקודות הבאות:.

  • עיבוד אסינכרוני קל לטיפול במשימות תלויות קלט/פלט, אך פחות למטלות כבדות ל-CPU; עבורן השתמשו בתהליכי pool.
  • בשימוש ב-thread או process pools יש לבחור את גודל ה-pool בהתאם למשימה.
  • אם מתחילים הרבה משימות קטנות יחד, עלויות ניהול הלולאה גדלות; יש להשתמש בקיבוץ (batching) או semaphores להגבלה.

סיכום

קלט/פלט אסינכרוני בפייתון הוא מנגנון עוצמתי שמנצל זמני המתנה ל-I/O ומבצע פעולות רשת וקבצים ביעילות ובמקביל. בעזרת שילוב כלים כמו asyncio,‏ aiohttp,‏ aiofiles ו-run_in_executor תוכל לבנות יישומים אסינכרוניים מעשיים וגמישים. שימוש ב-async with לאוטומציה של קבלת ושחרור משאבים מאפשר לנהל קבצים, סשנים ו-locks אסינכרוניים בביטחה. באמצעות ניהול נכון של טעויות ושליטה בריבוי משימות, תוכל להריץ תוכניות אסינכרוניות אמינות ובטוחות.

תוכלו לעקוב אחר המאמר שלמעלה באמצעות Visual Studio Code בערוץ היוטיוב שלנו. נא לבדוק גם את ערוץ היוטיוב.

YouTube Video