Controle de Sincronização no Processamento Assíncrono em Python

Controle de Sincronização no Processamento Assíncrono em Python

Este artigo explica o controle de sincronização no processamento assíncrono em Python.

Você aprenderá passo a passo, desde os fundamentos do asyncio até os padrões práticos comumente usados para controle de sincronização.

YouTube Video

Controle de Sincronização no Processamento Assíncrono em Python

No processamento assíncrono, você pode executar várias tarefas simultaneamente com facilidade. No entanto, na prática, são necessários ajustes mais avançados, como controle de concorrência, coordenação de tarefas, controle exclusivo de recursos compartilhados, tratamento de processos síncronos pesados e limpeza após cancelamentos.

Aqui, vamos aprender passo a passo desde o básico do asyncio até padrões práticos comumente utilizados para sincronização.

Introdução: Conceitos Básicos (async / await e create_task)

Vamos primeiro analisar um código assíncrono mínimo. await espera naquele ponto até que a coroutine chamada seja concluída, e asyncio.create_task agenda uma tarefa para execução concorrente.

 1import asyncio
 2
 3async def worker(name, delay):
 4    # Simulate async work
 5    await asyncio.sleep(delay)
 6    return f"{name} done after {delay}s"
 7
 8async def main():
 9    # Create two tasks that run concurrently.
10    t1 = asyncio.create_task(worker("A", 1))
11    t2 = asyncio.create_task(worker("B", 2))
12
13    # Await both results (this suspends until both are done).
14    result1 = await t1
15    result2 = await t2
16    print(result1, result2)
17
18if __name__ == "__main__":
19    asyncio.run(main())
  • Este código é um padrão típico no qual as tarefas são criadas explicitamente, executadas em paralelo e os resultados são recebidos ao final com await. create_task permite a execução concorrente.

Diferenças entre asyncio.gather, asyncio.wait e asyncio.as_completed

Ao executar múltiplas corrotinas simultaneamente, você escolhe qual usar dependendo de como deseja obter os resultados. gather espera todas terminarem e retorna os resultados na ordem de entrada, enquanto as_completed permite processar os resultados à medida que forem concluídos, independentemente da ordem.

 1import asyncio
 2import random
 3
 4async def job(i):
 5    delay = random.random() * 2
 6    await asyncio.sleep(delay)
 7    return (i, delay)
 8
 9async def gather_example():
10    # gather waits for all tasks and returns results in the same order as input
11    results = await asyncio.gather(*(job(i) for i in range(5)))
12    print("gather order:", results)
13
14async def as_completed_example():
15    # as_completed yields results as they finish (useful to process early results)
16    tasks = [asyncio.create_task(job(i)) for i in range(5)]
17    for coro in asyncio.as_completed(tasks):
18        res = await coro
19        print("completed:", res)
20
21async def main():
22    await gather_example()
23    await as_completed_example()
24
25if __name__ == "__main__":
26    asyncio.run(main())
  • Como mostrado neste código, gather retorna os resultados na ordem de entrada, sendo útil quando você deseja preservar a ordem. as_completed é usado quando você deseja processar os resultados assim que eles sejam concluídos.

Controlando a concorrência: Limitando execuções simultâneas com asyncio.Semaphore

Quando existem limites de taxa de APIs externas ou limites de conexões ao banco de dados, você pode controlar as execuções concorrentes com um Semaphore.

 1import asyncio
 2import random
 3
 4sem = asyncio.Semaphore(3)  # allow up to 3 concurrent workers
 5
 6async def limited_worker(i):
 7    async with sem:
 8        # Only 3 tasks can be inside this block concurrently
 9        delay = random.random() * 2
10        await asyncio.sleep(delay)
11        print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14    tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15    await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18    asyncio.run(main())
  • Usando Semaphore com async with, você pode facilmente limitar o número de execuções simultâneas. Isso é eficaz em situações com restrições externas.

Controle exclusivo de recursos compartilhados: asyncio.Lock

Lock é usado para evitar atualizações simultâneas de dados compartilhados. asyncio.Lock é uma primitiva exclusiva para uso assíncrono.

 1import asyncio
 2
 3counter = 0
 4lock = asyncio.Lock()
 5
 6async def incrementer(n_times):
 7    global counter
 8    for _ in range(n_times):
 9        # Acquire lock to update shared counter safely
10        async with lock:
11            tmp = counter
12            await asyncio.sleep(0)  # yield control to increase race likelihood
13            counter = tmp + 1
14
15async def main():
16    tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17    await asyncio.gather(*tasks)
18    print("final counter:", counter)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Se várias tarefas atualizarem uma variável compartilhada como um counter global, podem ocorrer conflitos. Ao envolver operações com um Lock, você pode manter a consistência.

Coordenação de tarefas: asyncio.Event

Event é usado quando uma tarefa sinaliza que está pronta e outras tarefas aguardam este sinal. Esta é uma maneira simples para que as tarefas compartilhem sinais e se sincronizem entre si.

 1import asyncio
 2
 3event = asyncio.Event()
 4
 5async def waiter(name):
 6    print(f"{name} is waiting for event")
 7    await event.wait()
 8    print(f"{name} resumed after event set")
 9
10async def setter():
11    print("setter will sleep and then set the event")
12    await asyncio.sleep(1)
13    event.set()
14    print("event set by setter")
15
16async def main():
17    tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18    await asyncio.create_task(setter())
19    await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22    asyncio.run(main())
  • Event possui uma flag booleana, e ao chamar set(), todas as tarefas em espera são retomadas. É útil para sincronizações simples.

Padrão produtor-consumidor: asyncio.Queue

Usando Queue, produtores (que criam dados) e consumidores (que processam os dados) podem se coordenar de forma fluida e assíncrona. Além disso, quando a fila está cheia, os produtores esperam automaticamente, implementando backpressure naturalmente para evitar a superprodução.

 1import asyncio
 2import random
 3
 4async def producer(q, n_items):
 5    for i in range(n_items):
 6        await asyncio.sleep(random.random() * 0.5)
 7        item = f"item-{i}"
 8        await q.put(item)
 9        print("produced", item)
10    # signal consumers to stop
11    await q.put(None)
12
13async def consumer(q, name):
14    while True:
15        item = await q.get()
16        if item is None:
17            # put sentinel back for other consumers and break
18            await q.put(None)
19            break
20        await asyncio.sleep(random.random() * 1)
21        print(name, "consumed", item)
22        q.task_done()
23
24async def main():
25    q = asyncio.Queue(maxsize=5)  # bounded queue to apply backpressure
26    prod = asyncio.create_task(producer(q, 10))
27    cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28    await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31    asyncio.run(main())
  • Queue ajuda a coordenar produtores e consumidores de forma assíncrona. Além disso, ao definir maxsize, faz com que o produtor espere em put quando a fila estiver cheia, prevenindo a superprodução.

Lidando com Operações Bloqueantes Síncronas: run_in_executor

Para processamento intensivo de CPU ou ao usar bibliotecas que não suportam async, use run_in_executor para delegar o processamento para outra thread ou processo. Fazendo isso, previne que o loop principal de eventos pare, permitindo que outras tarefas assíncronas ocorram normalmente.

 1import asyncio
 2import time
 3
 4def blocking_io(x):
 5    # simulate blocking I/O or CPU-bound work
 6    time.sleep(2)
 7    return x * x
 8
 9async def main():
10    loop = asyncio.get_running_loop()
11    # run blocking_io in default thread pool
12    result = await loop.run_in_executor(None, blocking_io, 3)
13    print("blocking result:", result)
14
15if __name__ == "__main__":
16    asyncio.run(main())
  • Chamar funções síncronas diretamente bloqueará o event loop. Com run_in_executor, o código é executado em uma thread separada e as tarefas assíncronas podem continuar progredindo simultaneamente.

Exemplo: Chamadas de API com limite de taxa (combinando Semaphore + run_in_executor)

A seguir está um cenário de exemplo em que chamadas de API têm limite de taxa e há processamento pesado dos resultados. Combinar Semaphore e run_in_executor permite que o processamento ocorra de forma segura e eficiente.

 1import asyncio
 2import time
 3import random
 4
 5sem = asyncio.Semaphore(5)
 6
 7def heavy_sync_processing(data):
 8    # simulate heavy CPU-bound work
 9    time.sleep(1)
10    return f"processed-{data}"
11
12async def api_call(i):
13    await asyncio.sleep(random.random() * 0.5)  # simulate network latency
14    return f"data-{i}"
15
16async def worker(i):
17    async with sem:
18        data = await api_call(i)
19        # offload CPU-bound work to threadpool
20        loop = asyncio.get_running_loop()
21        result = await loop.run_in_executor(None, heavy_sync_processing, data)
22        print(result)
23
24async def main():
25    tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26    await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29    asyncio.run(main())
  • Usamos um Semaphore para limitar o número de chamadas de API simultâneas e o processamento pesado dos dados resultantes é delegado para um pool de threads. Separar o processamento de rede e de CPU melhora a eficiência.

Cancelamento de tarefas e limpeza

Quando uma tarefa é cancelada, é muito importante lidar corretamente com finally e asyncio.CancelledError. Isso garante que arquivos e conexões sejam liberados e estados intermediários sejam tratados corretamente, mantendo a consistência da aplicação.

 1import asyncio
 2
 3async def long_running():
 4    try:
 5        print("started long_running")
 6        await asyncio.sleep(10)
 7        print("finished long_running")
 8    except asyncio.CancelledError:
 9        print("long_running was cancelled, cleaning up")
10        # perform cleanup here
11        raise
12
13async def main():
14    task = asyncio.create_task(long_running())
15    await asyncio.sleep(1)
16    task.cancel()
17    try:
18        await task
19    except asyncio.CancelledError:
20        print("task cancelled in main")
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • O cancelamento é lançado como uma exceção (CancelledError), portanto, faça a limpeza necessária no bloco except e relance a exceção se necessário.

Pontos-chave para um design prático

A seguir, pontos práticos úteis para projetar processamento assíncrono.

  • Controle explicitamente a concorrência Quando há limites de recursos como APIs ou bancos de dados, é possível limitar o número de execuções concorrentes com Semaphore.

  • Lide com recursos compartilhados com segurança Se for necessário atualizar o estado de múltiplas tarefas, use Lock. Reduzir o estado compartilhado e projetar com dados imutáveis torna as coisas mais seguras.

  • Escolha como receber os resultados Se quiser processar tarefas à medida que terminam, use asyncio.as_completed; se quiser resultados na ordem de entrada, use gather.

  • Isole processamentos síncronos pesados Para chamadas a bibliotecas síncronas ou processamento intensivo de CPU, use run_in_executor ou ProcessPoolExecutor para evitar bloquear o event loop.

  • Planeje para cancelamentos e exceções Escreva tratamento de exceções apropriado para limpar recursos com segurança mesmo se uma tarefa for interrompida no meio do caminho.

  • Facilite os testes Abstraia efeitos colaterais como I/O, tempo e aleatoriedade para que possam ser substituídos, facilitando o teste de código assíncrono.

Resumo

asyncio é poderoso, mas ao focar apenas em “executar coisas em paralelo”, podem surgir problemas como disputa por recursos compartilhados, violação de limites de recursos ou bloqueio do event loop. Ao combinar Semaphore, Lock, Event, Queue, run_in_executor e tratamento adequado de cancelamentos, é possível projetar aplicações assíncronas seguras e eficientes. Utilizando mecanismos como o padrão produtor-consumidor, limitação de concorrência ou separação entre processamento assíncrono e bloqueante, fluxos de trabalho assíncronos podem ser construídos de forma mais segura e eficiente.

Você pode acompanhar o artigo acima usando o Visual Studio Code em nosso canal do YouTube. Por favor, confira também o canal do YouTube.

YouTube Video