Entrada/Salida Asíncrona

Entrada/Salida Asíncrona

Este artículo explica la entrada/salida asíncrona.

Esta guía explica de manera sencilla y paso a paso los conceptos y patrones de la entrada/salida asíncrona que son prácticamente útiles en Python.

YouTube Video

Entrada/Salida (E/S) asíncrona

Concepto de E/S asíncrona

La E/S asíncrona es un mecanismo que permite que otras operaciones se ejecuten en paralelo mientras se espera la finalización de tareas de E/S que consumen tiempo, como operaciones de archivos o comunicación de red. En Python, asyncio se proporciona como el framework asíncrono estándar y muchas bibliotecas están diseñadas para seguir este mecanismo.

Fundamentos: async / await y el bucle de eventos (event loop)

Primero, aquí tienes cómo escribir corutinas básicas y un ejemplo de ejecución simultánea de múltiples corutinas usando asyncio.gather.

El siguiente código es un ejemplo mínimo de cómo definir y ejecutar funciones asíncronas de manera concurrente. La función sleep se utiliza para demostrar la ejecución en paralelo.

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Este código inicia el bucle de eventos usando asyncio.run() y ejecuta tres corutinas de manera concurrente.

async with y gestores de contexto asíncronos

En el procesamiento asíncrono, la gestión de recursos como abrir conexiones y cerrar archivos puede volverse fácilmente compleja. Es aquí donde los gestores de contexto asíncronos con async with resultan útiles. Esta sintaxis se utiliza igual que la sentencia with síncrona, pero el procesamiento interno es asíncrono, por lo que encaja perfectamente en el flujo de async/await.

Hay dos razones principales para usar async with:.

  • Para limpiar de forma fiable recursos como conexiones, manejadores de archivos o sesiones. Puedes estar seguro de que los recursos se liberarán correctamente incluso si ocurre una terminación anormal.
  • Para automatizar tareas de inicialización y limpieza, como establecer o cerrar conexiones y vaciar datos (flushing), de manera asíncrona. Esto ahorra la molestia de codificar manualmente y hace que tu código sea más claro.

A continuación se muestra un ejemplo de cómo crear un gestor de contexto asíncrono simple desde cero.

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • Definiendo __aenter__ y __aexit__, puedes usar async with.
  • El procesamiento al entrar y salir del bloque async with se ejecuta de forma asíncrona y segura.

E/S de archivos asíncrona (aiofiles)

Las operaciones de archivos son un ejemplo clásico de operación bloqueante. Usando aiofiles, puedes manejar las operaciones de archivos de forma asíncrona y segura. Internamente, utiliza un pool de hilos y asegura que los archivos se cierren adecuadamente usando async with.

El siguiente ejemplo demuestra la lectura asíncrona y paralela de múltiples archivos. Debes instalar aiofiles con pip install aiofiles antes de ejecutar este código.

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • Este código paraleliza la lectura de cada archivo. aiofiles suele usar internamente un pool de hilos, permitiendo manejar la E/S de archivos bloqueante a través de una interfaz asíncrona.

Cliente HTTP asíncrono (aiohttp)

Como ejemplo clásico de E/S de red, aquí se muestra cómo realizar solicitudes HTTP de forma asíncrona. Es especialmente potente cuando necesitas hacer gran cantidad de solicitudes HTTP en paralelo.

A continuación se muestra un ejemplo de cómo obtener múltiples URLs en paralelo usando aiohttp. Necesitarás instalar aiohttp con pip install aiohttp.

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • Usando asyncio.as_completed, puedes procesar resultados en el orden en que las tareas finalizan. Esto es útil para manejar eficientemente un gran número de solicitudes.

Convivencia con E/S bloqueante: run_in_executor

Cuando se trate de tareas que requieren mucha CPU o APIs bloqueantes existentes en código asíncrono, utilice ThreadPoolExecutor o ProcessPoolExecutor mediante loop.run_in_executor.

El siguiente código es un ejemplo de ejecutar tareas que suponen una E/S bloqueante de forma concurrente usando un conjunto de hilos.

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Al utilizar run_in_executor, se puede incorporar código sincrónico existente en flujos asíncronos sin tener que reescribirlo significativamente. Sin embargo, debes prestar atención al número de hilos y a la carga de CPU.
  • ProcessPoolExecutor es adecuado para tareas que consumen mucha CPU.

Servidor asíncrono: servidor eco TCP basado en asyncio

Si deseas manejar sockets directamente, puedes construir fácilmente un servidor asíncrono usando asyncio.start_server.

El siguiente ejemplo es un servidor eco simple que devuelve los datos tal como los recibe del cliente.

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • En la comunicación TCP con asyncio, StreamReader y StreamWriter desempeñan un papel central en la entrada y salida asincrónica. StreamReader lee de forma asíncrona los datos enviados por el cliente, mientras que StreamWriter se utiliza para enviar respuestas del servidor de vuelta al cliente.

  • Incluso sin manejar personalmente las operaciones detalladas de los sockets, puedes lanzar un servidor asíncrono de manera simple y eficiente usando asyncio.start_server.

  • Cuando pasas una función manejadora a asyncio.start_server, esa función recibe reader y writer como sus argumentos. Usando estos, puedes implementar procesos de comunicación de manera más segura y clara que manejando directamente las API de sockets de bajo nivel. Por ejemplo, al recibir datos con reader.read() y combinar writer.write() con writer.drain(), puedes implementar el envío asíncrono que asegura que la transmisión se complete.

  • Esta configuración es adecuada para manejar grandes cantidades de conexiones simultáneas y es ideal para protocolos simples o servicios TCP de pequeña escala.

Manejo de grandes flujos de datos (streaming)

Cuando proceses archivos grandes o respuestas secuenciales, lee y escribe datos en fragmentos (chunks) para mantener bajo el uso de memoria. A continuación se muestra un ejemplo de lectura por streaming usando aiohttp.

El siguiente código procesa respuestas HTTP en fragmentos y los escribe en el disco a medida que llegan los datos.

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • Este código no carga un archivo grande de una sola vez; en cambio, recibe los datos en fragmentos (pequeñas piezas) y los escribe en un archivo de forma asíncrona. Como resultado, puede realizar descargas rápida y eficientemente, manteniendo bajo el uso de memoria. aiohttp recupera datos de forma asíncrona y aiofiles escribe en el archivo sin bloquear, lo que facilita la ejecución junto con otros procesos.

  • Este patrón es adecuado para descargar y guardar archivos grandes de manera eficiente, minimizando el uso de memoria.

Ejecución asíncrona de subprocesos

Si quieres ejecutar comandos externos de manera asíncrona y leer su salida en tiempo real, asyncio.create_subprocess_exec es útil.

A continuación se muestra un ejemplo de cómo iniciar un comando externo y leer su salida estándar en tiempo real.

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • Al controlar subprocesos de manera asíncrona, puedes gestionar logs de herramientas externas en tiempo real o ejecutar múltiples procesos en paralelo.

Manejo de cancelaciones y timeouts

Las tareas asíncronas pueden ser canceladas. Al implementar un timeout, es sencillo usar asyncio.wait_for.

A continuación se muestra un ejemplo de ejecutar una tarea con un timeout.

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • wait_for lanza un TimeoutError si se alcanza el timeout y cancela la tarea si es necesario. Ten cuidado con la propagación de la cancelación de tareas y con la limpieza posterior.

Control de concurrencia (Semaphore)

Debido a que muchas conexiones o solicitudes simultáneas pueden agotar los recursos, limita la concurrencia con asyncio.Semaphore.

A continuación se muestra un ejemplo de cómo limitar descargas simultáneas utilizando un semáforo (semaphore).

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • Con este método, puedes acceder a servicios externos de forma controlada y evitar sobrecargar tu propio proceso.

Manejo de errores y estrategias de reintentos

Es inevitable que ocurran errores incluso en el procesamiento asíncrono. Captura las excepciones de forma adecuada e implementa estrategias de reintentos como el backoff exponencial.

A continuación se muestra una implementación de ejemplo de reintentos simples hasta N veces.

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • Una lógica adecuada de reintentos es importante para balancear la consistencia y el control del tráfico.

Consejos para depuración (debugging) y registro de logs

En el procesamiento asíncrono, las tareas avanzan concurrentemente, lo que puede dificultar identificar el origen de los problemas. Para rastrear problemas de manera eficiente, tener en cuenta los siguientes puntos hará que la depuración sea más fluida.

  • Las excepciones de asyncio.run() y Task son fáciles de perder, así que asegúrate de registrar (log) las excepciones no controladas.
  • Al usar logging, incluir el nombre de la corrutina o, en Python 3.8 y superiores, task.get_name() en tus registros facilita el seguimiento.
  • Puedes comprobar el estado actual de las tareas utilizando asyncio.Task.all_tasks(). Sin embargo, esta API está destinada a fines de depuración y debe usarse con precaución en entornos de producción para evitar problemas de rendimiento o interferencias inesperadas.

Consideraciones de rendimiento

Si bien la programación asíncrona se destaca en la gestión de esperas de E/S, un uso inadecuado puede degradar el rendimiento. Optimiza teniendo en cuenta los siguientes puntos:.

  • El procesamiento asíncrono es excelente para tareas dependientes de E/S, pero no es adecuado para tareas dependientes de CPU; en estos casos, utiliza un pool de procesos.
  • Al usar pools de hilos o procesos, ten en cuenta el tamaño del pool y la naturaleza de las tareas.
  • Si inicias muchas tareas pequeñas al mismo tiempo, la sobrecarga del bucle de eventos aumenta—utiliza agrupamiento (batching) o semáforos para ajustarlo.

Resumen

La E/S asíncrona de Python es un mecanismo poderoso que aprovecha eficazmente los tiempos de espera de E/S y ejecuta operaciones de red y archivos de manera eficiente y concurrente. Combinando técnicas como asyncio, aiohttp, aiofiles y run_in_executor, puedes construir aplicaciones asíncronas prácticas y flexibles. Utilizando async with para automatizar la adquisición y liberación de recursos, puedes gestionar de manera segura y confiable recursos asíncronos como archivos, sesiones HTTP y locks. Al incorporar un manejo adecuado de errores y gestión de concurrencia, puedes ejecutar programas asíncronos de alta fiabilidad de manera segura.

Puedes seguir el artículo anterior utilizando Visual Studio Code en nuestro canal de YouTube. Por favor, también revisa nuestro canal de YouTube.

YouTube Video