Asynchrones Ein-/Ausgabe (Input/Output)

Asynchrones Ein-/Ausgabe (Input/Output)

Dieser Artikel erklärt asynchrone Ein-/Ausgabe.

Diese Anleitung erklärt schrittweise und verständlich die Konzepte und Muster der asynchronen Ein-/Ausgabe, die in Python praktisch nützlich sind.

YouTube Video

Asynchrones Ein-/Ausgabe (I/O)

Konzept der asynchronen Ein-/Ausgabe

Asynchrone I/O ist ein Mechanismus, der es ermöglicht, andere Operationen parallel auszuführen, während auf zeitaufwändige I/O-Vorgänge wie Dateioperationen oder Netzwerkkommunikation gewartet wird. In Python steht asyncio als Standard-Framework für asynchrone Programmierung zur Verfügung, und viele Bibliotheken sind für diesen Mechanismus ausgelegt.

Grundlagen: async / await und die Eventschleife

Zunächst zeigen wir, wie Sie grundlegende Coroutinen schreiben und wie Sie mehrere Coroutinen gleichzeitig mit asyncio.gather ausführen können.

Der folgende Code ist ein minimales Beispiel dafür, wie asynchrone Funktionen parallel definiert und ausgeführt werden. Die sleep-Funktion wird verwendet, um die parallele Ausführung zu demonstrieren.

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Dieser Code startet die Eventschleife mit asyncio.run() und führt drei Coroutinen gleichzeitig aus.

async with und asynchrone Kontextmanager

Bei asynchroner Verarbeitung kann das Ressourcenmanagement, wie das Öffnen von Verbindungen oder das Schließen von Dateien, schnell komplex werden. Hier kommen asynchrone Kontextmanager mit async with zum Einsatz. Diese Syntax wird genauso verwendet wie das synchrone with-Statement, aber die interne Verarbeitung ist asynchron und fügt sich daher nahtlos in den async/await-Ablauf ein.

Es gibt zwei Hauptgründe für die Verwendung von async with:.

  • Um Ressourcen wie Verbindungen, Dateihandles oder Sitzungen zuverlässig freizugeben. Sie können sicher sein, dass Ressourcen auch bei einem unerwarteten Abbruch ordnungsgemäß freigegeben werden.
  • Um Initialisierungs- und Aufräumaufgaben wie das Herstellen oder Schließen von Verbindungen und das Leeren von Puffern asynchron zu automatisieren. Dadurch ersparen Sie sich manuelle Programmierung und Ihr Code wird übersichtlicher.

Nachfolgend finden Sie ein Beispiel für die Erstellung eines einfachen asynchronen Kontextmanagers von Grund auf.

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • Durch die Definition von __aenter__ und __aexit__ können Sie async with verwenden.
  • Die Verarbeitung beim Betreten und Verlassen des async with-Blocks erfolgt asynchron und sicher.

Asynchrone Dateiein-/ausgabe (aiofiles)

Dateioperationen sind ein klassisches Beispiel für blockierende Vorgänge. Mit aiofiles können Sie Dateioperationen sicher asynchron durchführen. Intern wird ein Thread-Pool verwendet, und durch async with wird sichergestellt, dass Dateien ordnungsgemäß geschlossen werden.

Das folgende Beispiel zeigt das parallele, asynchrone Lesen mehrerer Dateien. Sie müssen aiofiles mit pip install aiofiles installieren, bevor Sie diesen Code ausführen.

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • Dieser Code liest jede Datei parallel ein. aiofiles verwendet intern häufig einen Thread-Pool, sodass blockierende Datei-I/O über eine asynchrone Schnittstelle verarbeitet werden kann.

Asynchroner HTTP-Client (aiohttp)

Hier sehen Sie am Beispiel der Netzwerk-I/O, wie Sie HTTP-Anfragen asynchron durchführen können. Dies ist besonders leistungsfähig, wenn Sie viele HTTP-Anfragen parallel ausführen müssen.

Nachfolgend finden Sie ein Beispiel, wie mit aiohttp mehrere URLs parallel abgerufen werden. Sie müssen aiohttp mit pip install aiohttp installieren.

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • Mit asyncio.as_completed können Sie Ergebnisse in der Reihenfolge verarbeiten, in der die Aufgaben abgeschlossen werden. Dies ist nützlich, um viele Anfragen effizient zu bearbeiten.

Koexistenz mit blockierendem I/O: run_in_executor

Bei CPU-intensiven Aufgaben oder bestehenden blockierenden APIs im asynchronen Code sollten Sie ThreadPoolExecutor oder ProcessPoolExecutor über loop.run_in_executor verwenden.

Der folgende Code ist ein Beispiel dafür, wie Aufgaben, die blockierende Ein-/Ausgabe erfordern, gleichzeitig mit einem Thread-Pool ausgeführt werden können.

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Durch die Verwendung von run_in_executor können Sie bestehenden synchronen Code ohne größere Umstrukturierungen in asynchrone Abläufe integrieren. Sie sollten jedoch auf die Anzahl der Threads und die CPU-Auslastung achten.
  • ProcessPoolExecutor ist für CPU-intensive Aufgaben geeignet.

Asynchroner Server: TCP Echo Server auf Basis von asyncio

Wenn Sie Sockets direkt verarbeiten möchten, können Sie mit asyncio.start_server einfach einen asynchronen Server erstellen.

Das folgende Beispiel ist ein einfacher Echo-Server, der die vom Client empfangenen Daten unverändert zurücksendet.

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • Bei der TCP-Kommunikation mit asyncio spielen StreamReader und StreamWriter eine zentrale Rolle bei der asynchronen Ein- und Ausgabe. StreamReader liest asynchron Daten, die vom Client gesendet werden, während StreamWriter verwendet wird, um Antworten vom Server zurück an den Client zu senden.

  • Auch ohne die detaillierten Operationen der Sockets selbst zu übernehmen, können Sie mit asyncio.start_server einfach und effizient einen asynchronen Server starten.

  • Wenn Sie eine Handler-Funktion an asyncio.start_server übergeben, erhält diese Funktion reader und writer als Argumente. Durch die Verwendung dieser können Sie Kommunikationsprozesse sicherer und übersichtlicher implementieren, als wenn Sie direkt mit Low-Level-Socket-APIs arbeiten würden. Zum Beispiel können Sie durch das Empfangen von Daten mit reader.read() und die Kombination von writer.write() mit writer.drain() ein asynchrones Senden umsetzen, das sicherstellt, dass die Übertragung abgeschlossen wird.

  • Diese Konfiguration eignet sich zur Verarbeitung vieler gleichzeitiger Verbindungen und ist ideal für einfache Protokolle oder kleine TCP-Dienste.

Verarbeitung großer Streaming-Daten

Beim sequenziellen Verarbeiten großer Dateien oder Antworten sollten Sie Daten in Blöcken lesen und schreiben, um den Speicherverbrauch gering zu halten. Nachfolgend finden Sie ein Beispiel für das Streaming-Lesen mit aiohttp.

Der folgende Code verarbeitet HTTP-Antworten blockweise und schreibt den empfangenen Datenstrom direkt auf die Festplatte.

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • Dieser Code lädt eine große Datei nicht auf einmal, sondern empfängt die Daten in Blöcken (kleinen Teilen) und schreibt sie asynchron in eine Datei. Dadurch können Downloads schnell und effizient durchgeführt werden, während der Speicherverbrauch niedrig bleibt. aiohttp ruft Daten asynchron ab und aiofiles schreibt ohne Blockierung in die Datei, was die gleichzeitige Ausführung mit anderen Prozessen erleichtert.

  • Dieses Muster eignet sich, um große Dateien effizient herunterzuladen und zu speichern und gleichzeitig den Speicherverbrauch zu minimieren.

Asynchrone Ausführung von Subprozessen

Wenn Sie externe Befehle asynchron ausführen und deren Ausgabe in Echtzeit lesen möchten, ist asyncio.create_subprocess_exec hilfreich.

Nachfolgend finden Sie ein Beispiel, wie Sie einen externen Befehl starten und dessen Standardausgabe in Echtzeit lesen.

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • Durch die asynchrone Steuerung von Subprozessen können Sie Protokolle externer Tools in Echtzeit verarbeiten oder mehrere Prozesse parallel ausführen.

Umgang mit Abbruch und Zeitüberschreitung

Asynchrone Aufgaben können abgebrochen werden. Für die Implementierung eines Timeouts können Sie einfach asyncio.wait_for verwenden.

Nachfolgend finden Sie ein Beispiel für das Ausführen einer Aufgabe mit Zeitlimit.

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • wait_for löst einen TimeoutError aus, wenn das Limit erreicht ist, und bricht die Aufgabe bei Bedarf ab. Achten Sie darauf, wie Auftragsabbrüche weitergegeben werden und sorgen Sie für eine ordnungsgemäße Bereinigung.

Steuerung der Parallelität (Semaphore)

Da viele gleichzeitige Verbindungen oder Anfragen Ressourcen erschöpfen können, begrenzen Sie die Parallelität mit asyncio.Semaphore.

Nachfolgend sehen Sie ein Beispiel für die Begrenzung gleichzeitiger Downloads mit einem Semaphore.

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • Mit dieser Methode schonen Sie externe Dienste und verhindern eine Überlastung Ihres eigenen Prozesses.

Fehlerbehandlung und Wiederholungsstrategien

Auch bei asynchroner Verarbeitung treten zwangsläufig Fehler auf. Fangen Sie Ausnahmen angemessen ab und implementieren Sie Wiederholungsstrategien wie exponentielles Backoff.

Nachfolgend finden Sie ein Beispiel für eine einfache Wiederholung bis zu N Versuchen.

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • Eine geeignete Wiederholungslogik ist entscheidend für ein Gleichgewicht zwischen Konsistenz und Verkehrskontrolle.

Tipps für Debugging und Logging

Bei asynchroner Verarbeitung laufen Aufgaben gleichzeitig ab, was das Auffinden von Fehlerursachen erschweren kann. Beachten Sie die folgenden Punkte, um Probleme effizient nachzuverfolgen und das Debugging zu erleichtern.

  • Ausnahmen aus asyncio.run() und Task werden leicht übersehen – stellen Sie daher sicher, dass unbehandelte Ausnahmen geloggt werden.
  • Wenn Sie logging verwenden, erleichtert die Aufnahme des Coroutine-Namens oder (in Python 3.8 und höher) von task.get_name() in Ihren Log-Ausgaben das Nachverfolgen.
  • Sie können den aktuellen Status von Tasks mit asyncio.Task.all_tasks() überprüfen. Diese API ist jedoch für Debugging-Zwecke gedacht und sollte in Produktionsumgebungen mit Vorsicht verwendet werden, um Leistungsprobleme oder unerwartete Störungen zu vermeiden.

Leistungshinweise

Obwohl asynchrone Programmierung die I/O-Auslastung optimal nutzt, kann unsachgemäße Verwendung die Leistung verschlechtern. Optimieren Sie, indem Sie folgende Punkte beachten:.

  • Asynchrone Verarbeitung glänzt bei I/O-gebundenen Aufgaben, ist aber für CPU-gebundene Aufgaben ungeeignet – nutzen Sie dafür einen Prozess-Pool.
  • Berücksichtigen Sie bei Thread- oder Prozess-Pools die Pool-Größe und den Aufgabentyp.
  • Wenn Sie viele kleine Aufgaben gleichzeitig starten, steigt der Overhead der Eventschleife – justieren Sie mit Batching oder Semaphoren.

Zusammenfassung

Pythons asynchrone I/O ist ein leistungsfähiger Mechanismus, der I/O-Wartezeiten effektiv nutzt und Netzwerk- sowie Dateioperationen effizient parallel ausführt. Durch die Kombination von Techniken wie asyncio, aiohttp, aiofiles und run_in_executor können Sie flexibel praxisnahe asynchrone Anwendungen erstellen. Durch die Verwendung von async with zur Automatisierung von Ressourcenverwaltung können Sie asynchrone Ressourcen wie Dateien, HTTP-Sitzungen und Sperren sicher und zuverlässig steuern. Durch die Einbindung einer passenden Fehlerbehandlung und Parallelitätssteuerung können Sie hochzuverlässige asynchrone Programme sicher betreiben.

Sie können den obigen Artikel mit Visual Studio Code auf unserem YouTube-Kanal verfolgen. Bitte schauen Sie sich auch den YouTube-Kanal an.

YouTube Video