Entrada/Saída Assíncrona

Entrada/Saída Assíncrona

Este artigo explica a entrada/saída assíncrona.

Este guia explica de forma simples, passo a passo, os conceitos e padrões de entrada/saída assíncrona que são praticamente úteis em Python.

YouTube Video

Entrada/Saída Assíncrona (I/O)

Conceito de I/O Assíncrono

A I/O assíncrona é um mecanismo que permite que outras operações sejam executadas em paralelo enquanto se espera por operações de I/O demoradas, como operações de arquivos ou comunicação de rede. Em Python, o asyncio é fornecido como o framework assíncrono padrão e muitas bibliotecas são projetadas para seguir esse mecanismo.

Básico: async / await e o Loop de Eventos

Primeiro, veja como escrever corrotinas básicas e um exemplo de execução simultânea de várias corrotinas usando asyncio.gather.

O código abaixo é um exemplo mínimo de definição e execução concorrente de funções assíncronas. A função sleep é usada para demonstrar a execução paralela.

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Este código inicia o loop de eventos usando asyncio.run() e executa três corrotinas simultaneamente.

async with e Gerenciadores de Contexto Assíncronos

No processamento assíncrono, o gerenciamento de recursos como abrir conexões e fechar arquivos pode se tornar facilmente complexo. É aí que os gerenciadores de contexto assíncronos usando async with se tornam úteis. Esta sintaxe é usada da mesma forma que a instrução síncrona with, mas o processamento interno é assíncrono, integrando-se naturalmente ao fluxo async/await.

Existem dois motivos principais para usar async with:.

  • Para liberar com confiabilidade recursos como conexões, handles de arquivos ou sessões. Você pode ter certeza de que os recursos serão liberados corretamente mesmo se ocorrer uma finalização anormal.
  • Para automatizar tarefas de inicialização e limpeza, como abrir ou fechar conexões e flush, de forma assíncrona. Isso evita trabalho manual de codificação e torna seu código mais claro.

Abaixo está um exemplo de criação de um gerenciador de contexto assíncrono simples do zero.

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • Definindo __aenter__ e __aexit__, você pode usar async with.
  • O processamento ao entrar e sair do bloco async with é executado de forma assíncrona e segura.

I/O de Arquivos Assíncrona (aiofiles)

Operações de arquivos são um exemplo clássico de bloqueio. Usando aiofiles, você pode manipular operações de arquivos de forma assíncrona e segura. Internamente, utiliza um pool de threads e garante que os arquivos sejam fechados corretamente utilizando async with.

O exemplo a seguir demonstra a leitura assíncrona e paralela de vários arquivos. Você precisa instalar o aiofiles com pip install aiofiles antes de executar este código.

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • Este código paraleliza a leitura de cada arquivo. aiofiles frequentemente utiliza internamente um pool de threads, permitindo manipular I/O de arquivos bloqueantes via interface assíncrona.

Cliente HTTP Assíncrono (aiohttp)

Como exemplo clássico de I/O de rede, aqui está como executar requisições HTTP de forma assíncrona. É particularmente poderoso quando há necessidade de fazer um grande número de requisições HTTP em paralelo.

Abaixo está um exemplo de obtenção de várias URLs em paralelo utilizando aiohttp. Você precisará instalar o aiohttp com pip install aiohttp.

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • Usando asyncio.as_completed, você pode processar resultados na ordem em que as tarefas são concluídas. Isso é útil para lidar eficientemente com muitas requisições.

Coexistência com I/O bloqueante: run_in_executor

Ao lidar com tarefas intensivas de CPU ou APIs bloqueantes existentes em código assíncrono, use ThreadPoolExecutor ou ProcessPoolExecutor através de loop.run_in_executor.

O código a seguir é um exemplo de execução concorrente de tarefas que assumem E/S bloqueante usando um pool de threads.

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Ao utilizar run_in_executor, você pode incorporar código síncrono existente em fluxos assíncronos sem grandes reescritas. No entanto, é importante prestar atenção ao número de threads e à carga da CPU.
  • ProcessPoolExecutor é adequado para tarefas que exigem muito da CPU.

Servidor Assíncrono: Servidor TCP Echo baseado em asyncio

Se você deseja manipular sockets diretamente, pode facilmente construir um servidor assíncrono usando asyncio.start_server.

O exemplo a seguir é um servidor echo simples que retorna exatamente os dados recebidos do cliente.

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • Na comunicação TCP com asyncio, StreamReader e StreamWriter desempenham um papel central na entrada e saída assíncronas. StreamReader lê dados enviados pelo cliente de forma assíncrona, enquanto StreamWriter é usado para enviar respostas do servidor de volta ao cliente.

  • Mesmo sem lidar diretamente com as operações detalhadas dos sockets, você pode lançar um servidor assíncrono de forma simples e eficiente usando asyncio.start_server.

  • Quando você passa uma função manipuladora para asyncio.start_server, essa função recebe reader e writer como seus argumentos. Usando esses recursos, é possível implementar processos de comunicação de maneira mais segura e clara do que lidando diretamente com APIs de socket de baixo nível. Por exemplo, ao receber dados com reader.read() e combinar writer.write() com writer.drain(), você pode implementar um envio assíncrono que garante que a transmissão seja concluída.

  • Esta configuração é adequada para lidar com grande número de conexões simultâneas e é ideal para protocolos simples ou serviços TCP de pequeno porte.

Manipulação de Grandes Fluxos de Dados (Streaming)

Ao processar arquivos ou respostas grandes de forma sequencial, leia e escreva os dados em blocos para manter baixo o uso de memória. Abaixo está um exemplo de leitura em streaming usando aiohttp.

O código a seguir processa respostas HTTP em blocos e escreve no disco à medida que os dados são recebidos.

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • Este código não carrega um arquivo grande de uma só vez; em vez disso, recebe os dados em blocos (pequenos pedaços) e os grava em um arquivo de forma assíncrona. Como resultado, ele pode realizar downloads de forma rápida e eficiente, mantendo o uso de memória baixo. aiohttp recupera dados de forma assíncrona e aiofiles grava no arquivo sem bloquear, facilitando a execução junto com outros processos.

  • Esse padrão é adequado para download e salvamento eficiente de arquivos grandes, minimizando o uso de memória.

Execução Assíncrona de Subprocessos

Se você deseja executar comandos externos de forma assíncrona e ler sua saída em tempo real, asyncio.create_subprocess_exec é útil.

Abaixo está um exemplo de inicializar um comando externo e ler sua saída padrão em tempo real.

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • Controlando subprocessos de forma assíncrona, você pode lidar com logs de ferramentas externas em tempo real ou executar múltiplos processos em paralelo.

Tratamento de Cancelamento e Timeout

Tarefas assíncronas podem ser canceladas. Ao implementar um timeout, é simples usar asyncio.wait_for.

Abaixo está um exemplo de execução de tarefa com timeout.

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • wait_for lança um TimeoutError se o tempo limite for atingido e cancela a tarefa, se necessário. Cuidado com a propagação do cancelamento de tarefas e a limpeza de recursos.

Controlando Concorrência (Semaphore)

Como muitas conexões ou requisições simultâneas podem esgotar recursos, limite a concorrência com asyncio.Semaphore.

O seguinte é um exemplo de limitação de downloads simultâneos usando um semáforo.

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • Com esse método, você pode acessar serviços externos de forma controlada e evitar sobrecarregar seu próprio processo.

Tratamento de Erros e Estratégias de Retentativa

Erros inevitavelmente ocorrem mesmo em processamento assíncrono. Capture exceções adequadamente e implemente estratégias de retentativa como backoff exponencial.

Abaixo está uma implementação de retentativa simples até N vezes.

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • A lógica correta de retentativa é importante para equilibrar consistência e controle de tráfego.

Dicas para Depuração e Logging

No processamento assíncrono, as tarefas prosseguem simultaneamente, o que pode dificultar a identificação da causa de problemas. Para rastrear problemas de forma eficiente, manter os seguintes pontos em mente facilitará a depuração.

  • Exceções de asyncio.run() e Task são fáceis de passar despercebidas, portanto, registre exceções não tratadas.
  • Ao usar o logging, incluir o nome da coroutine ou, no Python 3.8 e superior, task.get_name() em seus logs facilita o rastreamento.
  • Você pode verificar o status atual das tarefas usando asyncio.Task.all_tasks(). No entanto, esta API se destina a fins de depuração e deve ser usada com cautela em ambientes de produção para evitar problemas de desempenho ou interferências inesperadas.

Considerações de Desempenho

Embora a programação assíncrona seja ótima para lidar com esperas de I/O, o uso inadequado pode reduzir o desempenho. Otimize mantendo os seguintes pontos em mente:.

  • O processamento assíncrono é excelente para tarefas limitadas por I/O, mas não adequado para tarefas limitadas por CPU; utilize um pool de processos nesses casos.
  • Ao usar pools de threads ou processos, considere o tamanho do pool e a natureza das tarefas.
  • Se você iniciar muitas tarefas pequenas de uma vez, a sobrecarga do loop de eventos aumenta — portanto utilize batching ou semáforos para ajustar.

Resumo

A I/O assíncrona do Python é um mecanismo poderoso que utiliza de forma eficaz os tempos de espera de I/O e executa operações de rede e arquivos de maneira eficiente e concorrente. Combinando técnicas como asyncio, aiohttp, aiofiles e run_in_executor, você pode construir aplicações assíncronas práticas e flexíveis. Utilizar async with para automatizar a aquisição e liberação de recursos permite gerenciar com segurança e confiabilidade recursos assíncronos como arquivos, sessões HTTP e locks. Incorporando tratamento de erros apropriado e gestão de concorrência, você pode executar programas assíncronos de alta confiabilidade com segurança.

Você pode acompanhar o artigo acima usando o Visual Studio Code em nosso canal do YouTube. Por favor, confira também o canal do YouTube.

YouTube Video