Entrée/Sortie asynchrone

Entrée/Sortie asynchrone

Cet article explique l'entrée/sortie asynchrone.

Ce guide explique en douceur, étape par étape, les concepts et les modèles de l’entrée/sortie asynchrone qui sont pratiquement utiles en Python.

YouTube Video

Entrée/Sortie (E/S) asynchrone

Concept de l’E/S asynchrone

L’E/S asynchrone est un mécanisme qui permet à d'autres opérations de s'exécuter en parallèle pendant que l'on attend des opérations d’E/S gourmandes en temps, telles que les manipulations de fichiers ou la communication réseau. En Python, asyncio est fourni comme framework asynchrone standard, et de nombreuses bibliothèques sont conçues pour utiliser ce mécanisme.

Bases : async / await et la boucle d'événements

Tout d'abord, voici comment écrire des coroutines de base et un exemple d’exécution simultanée de plusieurs coroutines à l’aide de asyncio.gather.

Le code ci-dessous montre un exemple minimal de définition et d’exécution simultanée de fonctions asynchrones. La fonction sleep est utilisée pour démontrer l'exécution parallèle.

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Ce code démarre la boucle d’événements avec asyncio.run() et exécute trois coroutines en parallèle.

async with et les gestionnaires de contexte asynchrones

Dans le traitement asynchrone, la gestion des ressources comme l’ouverture de connexions ou la fermeture de fichiers peut rapidement devenir complexe. C’est là que les gestionnaires de contexte asynchrones avec async with deviennent utiles. Cette syntaxe s’utilise comme l'instruction with synchrone, mais le traitement interne est asynchrone, ce qui l’intègre naturellement dans le flux async/await.

Il y a deux raisons principales d’utiliser async with :.

  • Pour nettoyer de manière fiable les ressources comme les connexions, les descripteurs de fichiers ou les sessions. Vous pouvez être sûr que les ressources seront correctement libérées même en cas d’arrêt anormal.
  • Pour automatiser les tâches d’initialisation et de nettoyage (comme l'ouverture/la fermeture de connexions, le flush, etc.) de manière asynchrone. Cela évite le codage manuel répétitif et rend votre code plus clair.

Voici un exemple de création d’un gestionnaire de contexte asynchrone simple à partir de zéro :.

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • En définissant __aenter__ et __aexit__, vous pouvez utiliser async with.
  • Le traitement lors de l’entrée et de la sortie du bloc async with s’effectue de manière asynchrone et sûre.

E/S de fichiers asynchrone (aiofiles)

Les opérations sur les fichiers sont un exemple classique de blocage. En utilisant aiofiles, vous pouvez gérer les opérations sur les fichiers de manière asynchrone en toute sécurité. En interne, il utilise un pool de threads et garantit que les fichiers sont correctement fermés grâce à async with.

L’exemple suivant montre la lecture asynchrone et parallèle de plusieurs fichiers. Vous devez installer aiofiles (pip install aiofiles) avant d’exécuter ce code.

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • Ce code parallélise la lecture de chaque fichier. aiofiles utilise souvent un pool de threads, ce qui permet de traiter une E/S de fichiers bloquante via une interface asynchrone.

Client HTTP asynchrone (aiohttp)

Voici, à titre d’exemple classique d’E/S réseau, comment effectuer des requêtes HTTP de manière asynchrone. C’est particulièrement puissant lorsque vous devez effectuer un grand nombre de requêtes HTTP en parallèle.

Voici un exemple de récupération de plusieurs URLs en parallèle avec aiohttp. Vous devrez installer aiohttp avec pip install aiohttp.

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • Avec asyncio.as_completed, vous pouvez traiter les résultats dans l’ordre d’achèvement des tâches. C’est utile pour traiter efficacement de nombreuses requêtes.

Cohabitation avec les E/S bloquantes : run_in_executor

Lors du traitement de tâches intensives en CPU ou d'API bloquantes existantes dans un code asynchrone, utilisez ThreadPoolExecutor ou ProcessPoolExecutor via loop.run_in_executor.

Le code suivant est un exemple d'exécution concurrente de tâches supposant une entrée/sortie bloquante, à l'aide d'un pool de threads.

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • En utilisant run_in_executor, vous pouvez intégrer du code synchrone existant dans des flux asynchrones sans réécriture majeure. Toutefois, il faut faire attention au nombre de threads et à la charge CPU.
  • ProcessPoolExecutor est adapté aux tâches nécessitant beaucoup de ressources CPU.

Serveur asynchrone : serveur TCP Echo basé sur asyncio

Si vous souhaitez manipuler les sockets directement, vous pouvez facilement créer un serveur asynchrone avec asyncio.start_server.

L’exemple suivant est un serveur echo simple qui renvoie exactement les données reçues du client.

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • Dans la communication TCP avec asyncio, StreamReader et StreamWriter jouent un rôle central dans les entrées et sorties asynchrones. StreamReader lit de manière asynchrone les données envoyées par le client, tandis que StreamWriter est utilisé pour renvoyer les réponses du serveur au client.

  • Même sans gérer vous-même les opérations détaillées des sockets, vous pouvez lancer un serveur asynchrone simplement et efficacement en utilisant asyncio.start_server.

  • Lorsque vous passez une fonction de gestion à asyncio.start_server, cette fonction reçoit reader et writer comme arguments. En les utilisant, vous pouvez implémenter les processus de communication de manière plus sûre et plus claire que si vous manipuliez directement les APIs socket de bas niveau. Par exemple, en recevant les données avec reader.read() et en combinant writer.write() avec writer.drain(), vous pouvez implémenter un envoi asynchrone qui garantit la fin de la transmission.

  • Cette configuration est adaptée au traitement d’un grand nombre de connexions simultanées et idéale pour les protocoles simples ou les petits services TCP.

Gestion de grands flux de données

Lorsque vous traitez de gros fichiers ou des réponses volumineuses, lisez et écrivez les données par blocs pour limiter la consommation mémoire. Voici un exemple de lecture en streaming avec aiohttp.

Le code suivant traite la réponse HTTP par blocs et écrit sur le disque à mesure que les données sont reçues.

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • Ce code ne charge pas un gros fichier en une seule fois ; au lieu de cela, il reçoit les données par morceaux (petits blocs) et les écrit dans un fichier de façon asynchrone. En conséquence, il peut effectuer les téléchargements rapidement et efficacement tout en maintenant une faible consommation de mémoire. aiohttp récupère les données de façon asynchrone et aiofiles écrit dans le fichier sans blocage, ce qui facilite l'exécution parallèle avec d'autres processus.

  • Ce modèle est adapté au téléchargement et à l’enregistrement efficace de gros fichiers tout en minimisant l’utilisation de la mémoire.

Exécution asynchrone de sous-processus

Si vous souhaitez exécuter des commandes externes de façon asynchrone et lire leur sortie en temps réel, asyncio.create_subprocess_exec est utile.

Voici un exemple de lancement d'une commande externe et de lecture de sa sortie standard en temps réel.

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • En contrôlant les sous-processus de manière asynchrone, vous pouvez traiter les journaux d’outils externes en temps réel ou exécuter plusieurs processus en parallèle.

Gestion de l’annulation et du délai d’attente

Les tâches asynchrones peuvent être annulées. Pour implémenter un délai d’attente, il est simple d’utiliser asyncio.wait_for.

Voici un exemple d’exécution d’une tâche avec un délai d’attente.

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • wait_for déclenche une TimeoutError si le délai est atteint et annule la tâche si besoin. Soyez prudent avec la propagation et le nettoyage lors de l’annulation d’une tâche.

Contrôle de la concurrence (Semaphore)

Étant donné que de nombreuses connexions ou requêtes concurrentes peuvent épuiser les ressources, limitez la concurrence avec asyncio.Semaphore.

Voici un exemple de limitation des téléchargements simultanés à l’aide d’un sémaphore.

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • Avec cette méthode, vous accédez aux services externes de façon raisonnée et évitez de surcharger votre propre processus.

Gestion des erreurs et stratégies de nouvelle tentative

Des erreurs surviennent inévitablement, même dans les traitements asynchrones. Interceptez les exceptions de manière appropriée et mettez en œuvre des stratégies de nouvelle tentative comme l’attente exponentielle (exponential backoff).

Voici un exemple d’implémentation de tentatives répétées jusqu’à N fois.

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • Une bonne logique de nouvelle tentative est importante pour équilibrer la cohérence et la régulation du trafic.

Astuces pour le débogage et la journalisation

Dans le traitement asynchrone, les tâches s’exécutent en parallèle, ce qui peut compliquer l’identification de la cause des problèmes. Pour suivre les problèmes efficacement, garder à l’esprit les points suivants simplifiera le débogage.

  • Les exceptions provenant de asyncio.run() et des objets Task peuvent facilement passer inaperçues, veillez donc à journaliser les exceptions non gérées.
  • Lorsque vous utilisez logging, inclure le nom de la coroutine ou, à partir de Python 3.8, task.get_name() dans vos logs facilite le suivi.
  • Vous pouvez vérifier l'état actuel des tâches à l'aide de asyncio.Task.all_tasks(). Cependant, cette API est destinée à des fins de débogage et doit être utilisée avec précaution en environnement de production afin d'éviter des problèmes de performance ou des interférences inattendues.

Considérations de performance

Si la programmation asynchrone excelle dans le traitement des attentes d'E/S, une mauvaise utilisation peut dégrader les performances. Optimisez en gardant à l’esprit les points suivants :.

  • Le traitement asynchrone est idéal pour les tâches limitées par l’E/S, mais inadapté pour les tâches CPU-bound ; utilisez alors un pool de processus.
  • Lorsque vous utilisez des pools de threads ou de processus, tenez compte de leur taille et de la nature des tâches.
  • Si vous lancez un grand nombre de petites tâches simultanément, la surcharge de la boucle d’événements augmente ; utilisez donc le batching ou les sémaphores pour réguler.

Résumé

L’E/S asynchrone de Python est un mécanisme puissant qui exploite efficacement les temps d’attente d’E/S et exécute en parallèle les opérations réseau et fichier. En combinant des techniques telles que asyncio, aiohttp, aiofiles ou run_in_executor, vous pouvez construire des applications asynchrones pratiques et flexibles. L’utilisation de async with pour automatiser l’acquisition et la libération des ressources vous permet de gérer de façon sécurisée et fiable les ressources asynchrones comme les fichiers, sessions HTTP ou verrous. En intégrant une gestion appropriée des erreurs et de la concurrence, vous pouvez exécuter des programmes asynchrones très fiables en toute sécurité.

Vous pouvez suivre l'article ci-dessus avec Visual Studio Code sur notre chaîne YouTube. Veuillez également consulter la chaîne YouTube.

YouTube Video