Asynchrone invoer/uitvoer

Asynchrone invoer/uitvoer

Dit artikel legt asynchrone invoer/uitvoer uit.

Deze gids legt stap voor stap op een toegankelijke manier de concepten en patronen uit van asynchrone invoer/uitvoer die praktisch bruikbaar zijn in Python.

YouTube Video

Asynchrone invoer/uitvoer (I/O)

Concept van asynchrone I/O

Asynchrone I/O is een mechanisme dat toelaat dat andere bewerkingen parallel kunnen worden uitgevoerd terwijl er gewacht wordt op tijdrovende I/O, zoals bestandsbewerkingen of netwerkcommunicatie. In Python wordt asyncio geleverd als het standaard asynchrone framework, en veel libraries zijn ontworpen om dit mechanisme te volgen.

Basis: async / await en de event loop

Hier is eerst hoe je basis-coroutines schrijft en een voorbeeld van het gelijktijdig uitvoeren van meerdere coroutines met asyncio.gather.

De onderstaande code is een minimaal voorbeeld van het definiëren en gelijktijdig uitvoeren van asynchrone functies. De functie sleep wordt gebruikt om parallelle uitvoering te demonstreren.

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Deze code start de event loop met asyncio.run() en voert drie coroutines gelijktijdig uit.

async with en asynchrone contextmanagers

Bij asynchrone verwerking kan het beheer van bronnen zoals het openen van verbindingen en het sluiten van bestanden al snel complex worden. Hier komen asynchrone contextmanagers met async with van pas. Deze syntaxis wordt net als het synchrone with-statement gebruikt, maar de interne verwerking is asynchroon en past dus natuurlijk in de async/await-flow.

Er zijn twee belangrijke redenen om async with te gebruiken:.

  • Om bronnen zoals verbindingen, bestandshandvatten of sessies betrouwbaar op te ruimen. Je kunt erop vertrouwen dat bronnen correct worden vrijgegeven, zelfs als er een abnormale beëindiging optreedt.
  • Om initialisatie en opruimtaken, zoals het opzetten of sluiten van verbindingen en flushen, asynchroon te automatiseren. Dit bespaart de moeite van handmatig coderen en maakt je code overzichtelijker.

Hieronder staat een voorbeeld van het maken van een eenvoudige asynchrone contextmanager vanaf nul.

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • Door __aenter__ en __aexit__ te definiëren kun je async with gebruiken.
  • Het verwerken bij het binnenkomen en verlaten van het async with-blok wordt asynchroon en veilig uitgevoerd.

Asynchrone bestands-I/O (aiofiles)

Bestandsoperaties zijn een klassiek voorbeeld van blocking. Met aiofiles kun je bestandsoperaties veilig asynchroon uitvoeren. Intern gebruikt het een threadpool en zorgt ervoor dat bestanden correct worden gesloten met async with.

Het volgende voorbeeld laat het parallel asynchroon lezen van meerdere bestanden zien. Je moet aiofiles installeren met pip install aiofiles voordat je deze code uitvoert.

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • Deze code paralleliseert het lezen van elk bestand. aiofiles gebruikt intern vaak een threadpool, waardoor je blocking bestands-I/O via een asynchrone interface kunt behandelen.

Asynchrone HTTP-client (aiohttp)

Als een klassiek voorbeeld van netwerk-I/O, volgt hier hoe je HTTP-verzoeken asynchroon uitvoert. Het is vooral krachtig als je veel HTTP-verzoeken parallel moet uitvoeren.

Hieronder vind je een voorbeeld van het parallel ophalen van meerdere URL's met aiohttp. Je moet aiohttp installeren met pip install aiohttp.

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • Met asyncio.as_completed kun je resultaten verwerken in de volgorde waarin de taken klaar zijn. Dit is handig om efficiënt met veel verzoeken om te gaan.

Coëxistentie met blokkerende I/O: run_in_executor

Wanneer je te maken hebt met CPU-intensieve taken of bestaande blokkerende API’s in asynchrone code, gebruik dan ThreadPoolExecutor of ProcessPoolExecutor via loop.run_in_executor.

De volgende code is een voorbeeld van het gelijktijdig uitvoeren van taken die uitgaan van blokkerende I/O met behulp van een thread pool.

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Door gebruik te maken van run_in_executor kun je bestaande synchrone code opnemen in asynchrone processen zonder grote herschrijvingen. Let echter op het aantal threads en de CPU-belasting.
  • ProcessPoolExecutor is geschikt voor CPU-intensieve taken.

Asynchrone server: TCP Echo-server gebaseerd op asyncio

Als je direct sockets wilt beheren, kun je eenvoudig een asynchrone server bouwen met asyncio.start_server.

Het volgende voorbeeld is een eenvoudige echo-server die de ontvangen gegevens exact terugstuurt naar de client.

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • Bij TCP-communicatie met asyncio spelen StreamReader en StreamWriter een centrale rol bij asynchrone invoer en uitvoer. StreamReader leest asynchroon gegevens die van de cliënt worden verzonden, terwijl StreamWriter wordt gebruikt om reacties van de server terug naar de cliënt te sturen.

  • Zelfs zonder de gedetailleerde bewerkingen van sockets zelf af te handelen, kun je eenvoudig en efficiënt een asynchrone server opzetten met asyncio.start_server.

  • Wanneer je een handlerfunctie aan asyncio.start_server doorgeeft, ontvangt die functie reader en writer als argumenten. Door deze te gebruiken kun je communicatieprocessen veiliger en duidelijker implementeren dan door direct low-level socket-API's te behandelen. Door bijvoorbeeld gegevens te ontvangen met reader.read() en writer.write() te combineren met writer.drain(), kun je een asynchrone verzending implementeren die garandeert dat de overdracht volledig is.

  • Deze setup is geschikt voor het verwerken van veel gelijktijdige verbindingen en ideaal voor eenvoudige protocollen of kleinschalige TCP-diensten.

Grote streamingdata verwerken

Bij het sequentieel verwerken van grote bestanden of responses, lees en schrijf je gegevens in stukken om het geheugenverbruik laag te houden. Hieronder staat een voorbeeld van streaming lezen met aiohttp.

De volgende code verwerkt HTTP-responses in stukken en schrijft naar de schijf zodra de data binnenkomt.

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • Deze code laadt een groot bestand niet ineens; in plaats daarvan ontvangt hij de gegevens in stukken (kleine pakketjes) en schrijft ze asynchroon naar een bestand. Hierdoor kunnen downloads snel en efficiënt worden uitgevoerd terwijl het geheugenverbruik laag blijft. aiohttp haalt asynchroon gegevens op en aiofiles schrijft deze naar het bestand zonder te blokkeren, waardoor het gemakkelijk is om naast andere processen te draaien.

  • Dit patroon is geschikt voor het efficiënt downloaden en opslaan van grote bestanden met minimaal geheugenverbruik.

Asynchrone uitvoering van subprocessen

Als je externe commando's asynchroon wilt uitvoeren en hun uitvoer in realtime wilt lezen, is asyncio.create_subprocess_exec handig.

Hieronder volgt een voorbeeld van het starten van een extern commando en het lezen van de standaarduitvoer in realtime.

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • Door subprocessen asynchroon aan te sturen kun je logs van externe tools realtime verwerken of meerdere processen parallel uitvoeren.

Omgaan met annuleren en time-out

Asynchrone taken kunnen worden geannuleerd. Bij het implementeren van een time-out kun je eenvoudig asyncio.wait_for gebruiken.

Hieronder staat een voorbeeld van het uitvoeren van een taak met een time-out.

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • wait_for geeft een TimeoutError als de time-out wordt bereikt en annuleert indien nodig de taak. Wees voorzichtig met het doorgeven van taakannuleringen en het opruimen ervan.

Controle van gelijktijdigheid (Semaphore)

Omdat veel gelijktijdige verbindingen of verzoeken je bronnen kunnen uitputten, beperk je de gelijktijdigheid met asyncio.Semaphore.

Het volgende is een voorbeeld van het beperken van gelijktijdige downloads met een semafoor.

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • Met deze methode kun je externe diensten voorzichtig benaderen en voorkom je overbelasting van je eigen proces.

Foutherstel en retry-strategieën

Fouten komen onvermijdelijk voor, ook bij asynchrone verwerking. Vang uitzonderingen correct af en implementeer retry-strategieën zoals exponentiële backoff.

Hieronder staat een voorbeeldimplementatie van eenvoudige herhalingspogingen tot N keer.

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • Een goede retry-logica is belangrijk voor het balanceren van consistentie en verkeersregeling.

Tips voor debuggen en loggen

Bij asynchrone verwerking verlopen taken gelijktijdig, wat het lastig kan maken om de oorzaak van problemen te achterhalen. Om problemen efficiënt te traceren, houd je het beste rekening met de volgende punten voor soepel debuggen.

  • Uitzonderingen van asyncio.run() en Task zijn makkelijk te missen, dus zorg ervoor dat je niet-afgehandelde uitzonderingen logt.
  • Bij gebruik van logging maakt het opnemen van de coroutinemaam of, in Python 3.8 en hoger, task.get_name() in je logs het volgen eenvoudiger.
  • Je kunt de huidige status van taken controleren met behulp van asyncio.Task.all_tasks(). Deze API is echter bedoeld voor foutopsporingsdoeleinden en moet met voorzichtigheid worden gebruikt in productieomgevingen om prestatieproblemen of onverwachte interferentie te voorkomen.

Overwegingen rond prestaties

Hoewel asynchroon programmeren uitblinkt in het verwerken van I/O-wachttijden, kan onjuist gebruik de prestaties juist verslechteren. Optimaliseer door met de volgende punten rekening te houden:.

  • Asynchrone verwerking is uitstekend voor I/O-gebonden taken, maar niet geschikt voor CPU-gebonden taken; gebruik daarvoor een processpool.
  • Denk bij het gebruik van thread- of processpools aan de poolgrootte en de aard van de taken.
  • Als je veel kleine taken tegelijk start, neemt de overhead van de event loop toe — gebruik daarom batching of semaforen om bij te stellen.

Samenvatting

De asynchrone I/O van Python is een krachtig mechanisme dat efficiënt gebruikmaakt van wachttijden bij I/O en netwerk- en bestandsbewerkingen gelijktijdig uitvoert. Door technieken als asyncio, aiohttp, aiofiles en run_in_executor te combineren kun je flexibel praktische asynchrone applicaties bouwen. Door async with te gebruiken om het verkrijgen en vrijgeven van bronnen te automatiseren, kun je asynchrone bronnen zoals bestanden, HTTP-sessies en locks veilig en betrouwbaar beheren. Door correcte foutafhandeling en beheersing van gelijktijdigheid toe te passen kun je asynchrone programma's met hoge betrouwbaarheid veilig uitvoeren.

Je kunt het bovenstaande artikel volgen met Visual Studio Code op ons YouTube-kanaal. Bekijk ook het YouTube-kanaal.

YouTube Video