Asynchrones Ein-/Ausgabe (Input/Output)
Dieser Artikel erklärt asynchrone Ein-/Ausgabe.
Diese Anleitung erklärt schrittweise und verständlich die Konzepte und Muster der asynchronen Ein-/Ausgabe, die in Python praktisch nützlich sind.
YouTube Video
Asynchrones Ein-/Ausgabe (I/O)
Konzept der asynchronen Ein-/Ausgabe
Asynchrone I/O ist ein Mechanismus, der es ermöglicht, andere Operationen parallel auszuführen, während auf zeitaufwändige I/O-Vorgänge wie Dateioperationen oder Netzwerkkommunikation gewartet wird. In Python steht asyncio als Standard-Framework für asynchrone Programmierung zur Verfügung, und viele Bibliotheken sind für diesen Mechanismus ausgelegt.
Grundlagen: async / await und die Eventschleife
Zunächst zeigen wir, wie Sie grundlegende Coroutinen schreiben und wie Sie mehrere Coroutinen gleichzeitig mit asyncio.gather ausführen können.
Der folgende Code ist ein minimales Beispiel dafür, wie asynchrone Funktionen parallel definiert und ausgeführt werden. Die sleep-Funktion wird verwendet, um die parallele Ausführung zu demonstrieren.
1import asyncio
2
3async def worker(name: str, delay: float):
4 # Simulate I/O-bound work by sleeping
5 print(f"{name} started")
6 await asyncio.sleep(delay)
7 print(f"{name} finished after {delay}s")
8 return name, delay
9
10async def main():
11 # Run multiple coroutines concurrently
12 tasks = [
13 worker("A", 1.5),
14 worker("B", 1.0),
15 worker("C", 0.5),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Dieser Code startet die Eventschleife mit
asyncio.run()und führt drei Coroutinen gleichzeitig aus.
async with und asynchrone Kontextmanager
Bei asynchroner Verarbeitung kann das Ressourcenmanagement, wie das Öffnen von Verbindungen oder das Schließen von Dateien, schnell komplex werden. Hier kommen asynchrone Kontextmanager mit async with zum Einsatz. Diese Syntax wird genauso verwendet wie das synchrone with-Statement, aber die interne Verarbeitung ist asynchron und fügt sich daher nahtlos in den async/await-Ablauf ein.
Es gibt zwei Hauptgründe für die Verwendung von async with:.
- Um Ressourcen wie Verbindungen, Dateihandles oder Sitzungen zuverlässig freizugeben. Sie können sicher sein, dass Ressourcen auch bei einem unerwarteten Abbruch ordnungsgemäß freigegeben werden.
- Um Initialisierungs- und Aufräumaufgaben wie das Herstellen oder Schließen von Verbindungen und das Leeren von Puffern asynchron zu automatisieren. Dadurch ersparen Sie sich manuelle Programmierung und Ihr Code wird übersichtlicher.
Nachfolgend finden Sie ein Beispiel für die Erstellung eines einfachen asynchronen Kontextmanagers von Grund auf.
1import asyncio
2
3class AsyncResource:
4 async def __aenter__(self):
5 print("Opening resource...")
6 await asyncio.sleep(0.5)
7 print("Resource opened")
8 return self
9
10 async def __aexit__(self, exc_type, exc, tb):
11 print("Closing resource...")
12 await asyncio.sleep(0.5)
13 print("Resource closed")
14
15async def main():
16 async with AsyncResource() as r:
17 print("Using resource...")
18
19if __name__ == "__main__":
20 asyncio.run(main())- Durch die Definition von
__aenter__und__aexit__können Sieasync withverwenden. - Die Verarbeitung beim Betreten und Verlassen des
async with-Blocks erfolgt asynchron und sicher.
Asynchrone Dateiein-/ausgabe (aiofiles)
Dateioperationen sind ein klassisches Beispiel für blockierende Vorgänge. Mit aiofiles können Sie Dateioperationen sicher asynchron durchführen. Intern wird ein Thread-Pool verwendet, und durch async with wird sichergestellt, dass Dateien ordnungsgemäß geschlossen werden.
Das folgende Beispiel zeigt das parallele, asynchrone Lesen mehrerer Dateien. Sie müssen aiofiles mit pip install aiofiles installieren, bevor Sie diesen Code ausführen.
1# pip install aiofiles
2import asyncio
3import aiofiles
4from pathlib import Path
5
6async def read_file(path: Path):
7 # Read file content asynchronously
8 async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
9 contents = await f.read()
10 return path.name, len(contents)
11
12async def main():
13 files = [
14 Path("example1.txt"),
15 Path("example2.txt"),
16 Path("example3.txt")
17 ]
18 tasks = [read_file(p) for p in files]
19 results = await asyncio.gather(*tasks)
20 for name, size in results:
21 print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24 asyncio.run(main())- Dieser Code liest jede Datei parallel ein.
aiofilesverwendet intern häufig einen Thread-Pool, sodass blockierende Datei-I/O über eine asynchrone Schnittstelle verarbeitet werden kann.
Asynchroner HTTP-Client (aiohttp)
Hier sehen Sie am Beispiel der Netzwerk-I/O, wie Sie HTTP-Anfragen asynchron durchführen können. Dies ist besonders leistungsfähig, wenn Sie viele HTTP-Anfragen parallel ausführen müssen.
Nachfolgend finden Sie ein Beispiel, wie mit aiohttp mehrere URLs parallel abgerufen werden. Sie müssen aiohttp mit pip install aiohttp installieren.
1# pip install aiohttp
2import asyncio
3import aiohttp
4
5async def fetch(session: aiohttp.ClientSession, url: str):
6 # Fetch a URL asynchronously and return status and size
7 async with session.get(url) as resp:
8 text = await resp.text()
9 return url, resp.status, len(text)
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [fetch(session, u) for u in urls]
14 for coro in asyncio.as_completed(tasks):
15 url, status, size = await coro
16 print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19 urls = [
20 "https://codesparklab.com/json/example1.json",
21 "https://codesparklab.com/json/example2.json",
22 "https://codesparklab.com/json/example3.json",
23 ]
24 asyncio.run(main(urls))- Mit
asyncio.as_completedkönnen Sie Ergebnisse in der Reihenfolge verarbeiten, in der die Aufgaben abgeschlossen werden. Dies ist nützlich, um viele Anfragen effizient zu bearbeiten.
Koexistenz mit blockierendem I/O: run_in_executor
Bei CPU-intensiven Aufgaben oder bestehenden blockierenden APIs im asynchronen Code sollten Sie ThreadPoolExecutor oder ProcessPoolExecutor über loop.run_in_executor verwenden.
Der folgende Code ist ein Beispiel dafür, wie Aufgaben, die blockierende Ein-/Ausgabe erfordern, gleichzeitig mit einem Thread-Pool ausgeführt werden können.
1import asyncio
2from concurrent.futures import ThreadPoolExecutor
3import time
4
5def blocking_io(n):
6 # Simulate a blocking I/O or CPU-bound function
7 time.sleep(n)
8 return f"slept {n}s"
9
10async def main():
11 loop = asyncio.get_running_loop()
12 with ThreadPoolExecutor() as pool:
13 tasks = [
14 loop.run_in_executor(pool, blocking_io, 1),
15 loop.run_in_executor(pool, blocking_io, 2),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Blocking results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Durch die Verwendung von
run_in_executorkönnen Sie bestehenden synchronen Code ohne größere Umstrukturierungen in asynchrone Abläufe integrieren. Sie sollten jedoch auf die Anzahl der Threads und die CPU-Auslastung achten. ProcessPoolExecutorist für CPU-intensive Aufgaben geeignet.
Asynchroner Server: TCP Echo Server auf Basis von asyncio
Wenn Sie Sockets direkt verarbeiten möchten, können Sie mit asyncio.start_server einfach einen asynchronen Server erstellen.
Das folgende Beispiel ist ein einfacher Echo-Server, der die vom Client empfangenen Daten unverändert zurücksendet.
1import asyncio
2
3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
4 # Handle a single client: read data and echo it back
5 addr = writer.get_extra_info('peername')
6 print(f"Connection from {addr}")
7 while True:
8 data = await reader.read(1024)
9 if not data:
10 break
11 writer.write(data) # echo back
12 await writer.drain()
13 writer.close()
14 await writer.wait_closed()
15 print(f"Connection closed {addr}")
16
17async def main():
18 server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19 addr = server.sockets[0].getsockname()
20 print(f"Serving on {addr}")
21 async with server:
22 await server.serve_forever()
23
24if __name__ == "__main__":
25 asyncio.run(main())-
Bei der TCP-Kommunikation mit
asynciospielenStreamReaderundStreamWritereine zentrale Rolle bei der asynchronen Ein- und Ausgabe.StreamReaderliest asynchron Daten, die vom Client gesendet werden, währendStreamWriterverwendet wird, um Antworten vom Server zurück an den Client zu senden. -
Auch ohne die detaillierten Operationen der Sockets selbst zu übernehmen, können Sie mit
asyncio.start_servereinfach und effizient einen asynchronen Server starten. -
Wenn Sie eine Handler-Funktion an
asyncio.start_serverübergeben, erhält diese Funktionreaderundwriterals Argumente. Durch die Verwendung dieser können Sie Kommunikationsprozesse sicherer und übersichtlicher implementieren, als wenn Sie direkt mit Low-Level-Socket-APIs arbeiten würden. Zum Beispiel können Sie durch das Empfangen von Daten mitreader.read()und die Kombination vonwriter.write()mitwriter.drain()ein asynchrones Senden umsetzen, das sicherstellt, dass die Übertragung abgeschlossen wird. -
Diese Konfiguration eignet sich zur Verarbeitung vieler gleichzeitiger Verbindungen und ist ideal für einfache Protokolle oder kleine TCP-Dienste.
Verarbeitung großer Streaming-Daten
Beim sequenziellen Verarbeiten großer Dateien oder Antworten sollten Sie Daten in Blöcken lesen und schreiben, um den Speicherverbrauch gering zu halten. Nachfolgend finden Sie ein Beispiel für das Streaming-Lesen mit aiohttp.
Der folgende Code verarbeitet HTTP-Antworten blockweise und schreibt den empfangenen Datenstrom direkt auf die Festplatte.
1import aiohttp
2import asyncio
3import aiofiles
4
5async def stream_download(url: str, dest: str):
6 # Stream download and write to file in chunks
7 async with aiohttp.ClientSession() as session:
8 async with session.get(url) as resp:
9 async with aiofiles.open(dest, 'wb') as f:
10 async for chunk in resp.content.iter_chunked(1024 * 64):
11 await f.write(chunk)
12
13if __name__ == "__main__":
14 asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))-
Dieser Code lädt eine große Datei nicht auf einmal, sondern empfängt die Daten in Blöcken (kleinen Teilen) und schreibt sie asynchron in eine Datei. Dadurch können Downloads schnell und effizient durchgeführt werden, während der Speicherverbrauch niedrig bleibt.
aiohttpruft Daten asynchron ab undaiofilesschreibt ohne Blockierung in die Datei, was die gleichzeitige Ausführung mit anderen Prozessen erleichtert. -
Dieses Muster eignet sich, um große Dateien effizient herunterzuladen und zu speichern und gleichzeitig den Speicherverbrauch zu minimieren.
Asynchrone Ausführung von Subprozessen
Wenn Sie externe Befehle asynchron ausführen und deren Ausgabe in Echtzeit lesen möchten, ist asyncio.create_subprocess_exec hilfreich.
Nachfolgend finden Sie ein Beispiel, wie Sie einen externen Befehl starten und dessen Standardausgabe in Echtzeit lesen.
1import asyncio
2
3async def run_cmd(cmd):
4 # Run external command asynchronously and capture output line by line
5 proc = await asyncio.create_subprocess_exec(
6 *cmd,
7 stdout=asyncio.subprocess.PIPE,
8 stderr=asyncio.subprocess.PIPE
9 )
10
11 async def read_stream(stream, name):
12 while True:
13 line = await stream.readline()
14 if not line:
15 break
16 print(f"[{name}] {line.decode().rstrip()}")
17
18 await asyncio.gather(
19 read_stream(proc.stdout, "stdout"),
20 read_stream(proc.stderr, "stderr"),
21 )
22 await proc.wait()
23 return proc.returncode
24
25if __name__ == "__main__":
26 asyncio.run(run_cmd(["python", "--version"]))- Durch die asynchrone Steuerung von Subprozessen können Sie Protokolle externer Tools in Echtzeit verarbeiten oder mehrere Prozesse parallel ausführen.
Umgang mit Abbruch und Zeitüberschreitung
Asynchrone Aufgaben können abgebrochen werden. Für die Implementierung eines Timeouts können Sie einfach asyncio.wait_for verwenden.
Nachfolgend finden Sie ein Beispiel für das Ausführen einer Aufgabe mit Zeitlimit.
1import asyncio
2
3async def slow_task():
4 await asyncio.sleep(5)
5 return "done"
6
7async def main():
8 try:
9 result = await asyncio.wait_for(slow_task(), timeout=2.0)
10 print("Result:", result)
11 except asyncio.TimeoutError:
12 print("Task timed out")
13
14if __name__ == "__main__":
15 asyncio.run(main())wait_forlöst einenTimeoutErroraus, wenn das Limit erreicht ist, und bricht die Aufgabe bei Bedarf ab. Achten Sie darauf, wie Auftragsabbrüche weitergegeben werden und sorgen Sie für eine ordnungsgemäße Bereinigung.
Steuerung der Parallelität (Semaphore)
Da viele gleichzeitige Verbindungen oder Anfragen Ressourcen erschöpfen können, begrenzen Sie die Parallelität mit asyncio.Semaphore.
Nachfolgend sehen Sie ein Beispiel für die Begrenzung gleichzeitiger Downloads mit einem Semaphore.
1import asyncio
2import aiohttp
3
4semaphore = asyncio.Semaphore(3) # allow up to 3 concurrent tasks
5
6async def limited_fetch(session, url):
7 async with semaphore:
8 async with session.get(url) as resp:
9 return url, resp.status
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [limited_fetch(session, u) for u in urls]
14 results = await asyncio.gather(*tasks)
15 print(results)
16
17if __name__ == "__main__":
18 urls = ["https://codesparklab.com/json/example.json"] * 10
19 asyncio.run(main(urls))- Mit dieser Methode schonen Sie externe Dienste und verhindern eine Überlastung Ihres eigenen Prozesses.
Fehlerbehandlung und Wiederholungsstrategien
Auch bei asynchroner Verarbeitung treten zwangsläufig Fehler auf. Fangen Sie Ausnahmen angemessen ab und implementieren Sie Wiederholungsstrategien wie exponentielles Backoff.
Nachfolgend finden Sie ein Beispiel für eine einfache Wiederholung bis zu N Versuchen.
1import asyncio
2import aiohttp
3import random
4
5async def fetch_with_retry(session, url, retries=3):
6 for attempt in range(1, retries + 1):
7 try:
8 async with session.get(url) as resp:
9 resp.raise_for_status()
10 text = await resp.text()
11 return text
12 except Exception as e:
13 if attempt == retries:
14 raise
15 await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18 async with aiohttp.ClientSession() as session:
19 text = await fetch_with_retry(session, "https://codesparklab.com/")
20 print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23 asyncio.run(main())- Eine geeignete Wiederholungslogik ist entscheidend für ein Gleichgewicht zwischen Konsistenz und Verkehrskontrolle.
Tipps für Debugging und Logging
Bei asynchroner Verarbeitung laufen Aufgaben gleichzeitig ab, was das Auffinden von Fehlerursachen erschweren kann. Beachten Sie die folgenden Punkte, um Probleme effizient nachzuverfolgen und das Debugging zu erleichtern.
- Ausnahmen aus
asyncio.run()undTaskwerden leicht übersehen – stellen Sie daher sicher, dass unbehandelte Ausnahmen geloggt werden. - Wenn Sie
loggingverwenden, erleichtert die Aufnahme des Coroutine-Namens oder (in Python 3.8 und höher) vontask.get_name()in Ihren Log-Ausgaben das Nachverfolgen. - Sie können den aktuellen Status von Tasks mit
asyncio.Task.all_tasks()überprüfen. Diese API ist jedoch für Debugging-Zwecke gedacht und sollte in Produktionsumgebungen mit Vorsicht verwendet werden, um Leistungsprobleme oder unerwartete Störungen zu vermeiden.
Leistungshinweise
Obwohl asynchrone Programmierung die I/O-Auslastung optimal nutzt, kann unsachgemäße Verwendung die Leistung verschlechtern. Optimieren Sie, indem Sie folgende Punkte beachten:.
- Asynchrone Verarbeitung glänzt bei I/O-gebundenen Aufgaben, ist aber für CPU-gebundene Aufgaben ungeeignet – nutzen Sie dafür einen Prozess-Pool.
- Berücksichtigen Sie bei Thread- oder Prozess-Pools die Pool-Größe und den Aufgabentyp.
- Wenn Sie viele kleine Aufgaben gleichzeitig starten, steigt der Overhead der Eventschleife – justieren Sie mit Batching oder Semaphoren.
Zusammenfassung
Pythons asynchrone I/O ist ein leistungsfähiger Mechanismus, der I/O-Wartezeiten effektiv nutzt und Netzwerk- sowie Dateioperationen effizient parallel ausführt. Durch die Kombination von Techniken wie asyncio, aiohttp, aiofiles und run_in_executor können Sie flexibel praxisnahe asynchrone Anwendungen erstellen. Durch die Verwendung von async with zur Automatisierung von Ressourcenverwaltung können Sie asynchrone Ressourcen wie Dateien, HTTP-Sitzungen und Sperren sicher und zuverlässig steuern. Durch die Einbindung einer passenden Fehlerbehandlung und Parallelitätssteuerung können Sie hochzuverlässige asynchrone Programme sicher betreiben.
Sie können den obigen Artikel mit Visual Studio Code auf unserem YouTube-Kanal verfolgen. Bitte schauen Sie sich auch den YouTube-Kanal an.