Control de sincronización en el procesamiento asincrónico de Python

Control de sincronización en el procesamiento asincrónico de Python

Este artículo explica el control de sincronización en el procesamiento asincrónico de Python.

Aprenderás paso a paso, desde los conceptos básicos de asyncio hasta patrones prácticos que se usan comúnmente para el control de sincronización.

YouTube Video

Control de sincronización en el procesamiento asincrónico de Python

En el procesamiento asincrónico, se pueden ejecutar fácilmente múltiples tareas simultáneamente. Sin embargo, en la práctica, se requieren ajustes más avanzados, como controlar la concurrencia, coordinar tareas, controlar en exclusiva los recursos compartidos, gestionar procesos síncronos pesados y realizar la limpieza tras cancelaciones.

Aquí aprenderemos paso a paso desde lo básico de asyncio hasta patrones prácticos comúnmente utilizados para la sincronización.

Introducción: Conceptos básicos (async / await y create_task)

Primero veamos un código asincrónico mínimo. await espera en ese punto hasta que la corrutina llamada termina, y asyncio.create_task programa una tarea para ejecución concurrente.

 1import asyncio
 2
 3async def worker(name, delay):
 4    # Simulate async work
 5    await asyncio.sleep(delay)
 6    return f"{name} done after {delay}s"
 7
 8async def main():
 9    # Create two tasks that run concurrently.
10    t1 = asyncio.create_task(worker("A", 1))
11    t2 = asyncio.create_task(worker("B", 2))
12
13    # Await both results (this suspends until both are done).
14    result1 = await t1
15    result2 = await t2
16    print(result1, result2)
17
18if __name__ == "__main__":
19    asyncio.run(main())
  • Este código es un patrón típico donde las tareas se crean explícitamente, se ejecutan en paralelo y los resultados se reciben al final con await. create_task permite la ejecución concurrente.

Diferencias entre asyncio.gather, asyncio.wait y asyncio.as_completed

Al ejecutar varias corrutinas simultáneamente, eliges cuál usar dependiendo de cómo deseas obtener los resultados. gather espera a que todas terminen y devuelve los resultados en el orden de entrada, mientras que as_completed permite procesar los resultados a medida que finalizan, sin importar el orden.

 1import asyncio
 2import random
 3
 4async def job(i):
 5    delay = random.random() * 2
 6    await asyncio.sleep(delay)
 7    return (i, delay)
 8
 9async def gather_example():
10    # gather waits for all tasks and returns results in the same order as input
11    results = await asyncio.gather(*(job(i) for i in range(5)))
12    print("gather order:", results)
13
14async def as_completed_example():
15    # as_completed yields results as they finish (useful to process early results)
16    tasks = [asyncio.create_task(job(i)) for i in range(5)]
17    for coro in asyncio.as_completed(tasks):
18        res = await coro
19        print("completed:", res)
20
21async def main():
22    await gather_example()
23    await as_completed_example()
24
25if __name__ == "__main__":
26    asyncio.run(main())
  • Como se muestra en este código, gather devuelve los resultados en el orden de entrada, por lo que es útil cuando deseas preservar el orden. as_completed se usa cuando quieres procesar los resultados apenas finalizan.

Control de concurrencia: Limitando ejecuciones simultáneas con asyncio.Semaphore

Cuando hay límites en la tasa de APIs externas o en las conexiones de base de datos, puedes controlar las ejecuciones concurrentes con un Semaphore.

 1import asyncio
 2import random
 3
 4sem = asyncio.Semaphore(3)  # allow up to 3 concurrent workers
 5
 6async def limited_worker(i):
 7    async with sem:
 8        # Only 3 tasks can be inside this block concurrently
 9        delay = random.random() * 2
10        await asyncio.sleep(delay)
11        print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14    tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15    await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18    asyncio.run(main())
  • Al usar Semaphore con async with, puedes limitar fácilmente el número de ejecuciones simultáneas. Esto es efectivo en situaciones con restricciones externas.

Control exclusivo de recursos compartidos: asyncio.Lock

Se utiliza Lock para evitar actualizaciones simultáneas de datos compartidos. asyncio.Lock es una primitiva exclusiva para uso asincrónico.

 1import asyncio
 2
 3counter = 0
 4lock = asyncio.Lock()
 5
 6async def incrementer(n_times):
 7    global counter
 8    for _ in range(n_times):
 9        # Acquire lock to update shared counter safely
10        async with lock:
11            tmp = counter
12            await asyncio.sleep(0)  # yield control to increase race likelihood
13            counter = tmp + 1
14
15async def main():
16    tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17    await asyncio.gather(*tasks)
18    print("final counter:", counter)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Si varias tareas actualizan una variable compartida como un counter global, pueden ocurrir conflictos. Al envolver las operaciones con un Lock, puedes mantener la coherencia.

Coordinación de tareas: asyncio.Event

Event se usa cuando una tarea señala que está lista y otras tareas esperan esa señal. Esta es una forma sencilla para que las tareas compartan señales y se sincronicen entre sí.

 1import asyncio
 2
 3event = asyncio.Event()
 4
 5async def waiter(name):
 6    print(f"{name} is waiting for event")
 7    await event.wait()
 8    print(f"{name} resumed after event set")
 9
10async def setter():
11    print("setter will sleep and then set the event")
12    await asyncio.sleep(1)
13    event.set()
14    print("event set by setter")
15
16async def main():
17    tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18    await asyncio.create_task(setter())
19    await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22    asyncio.run(main())
  • Event tiene una bandera booleana, y al llamar a set() se reanudan todas las tareas en espera. Es útil para sincronización sencilla.

Patrón productor-consumidor: asyncio.Queue

Utilizando Queue, los productores (que generan datos) y los consumidores (que procesan datos) pueden coordinarse de forma fluida y asincrónica. Además, cuando la cola está llena, los productores esperan automáticamente, implementando de forma natural retro-presión para evitar la sobreproducción.

 1import asyncio
 2import random
 3
 4async def producer(q, n_items):
 5    for i in range(n_items):
 6        await asyncio.sleep(random.random() * 0.5)
 7        item = f"item-{i}"
 8        await q.put(item)
 9        print("produced", item)
10    # signal consumers to stop
11    await q.put(None)
12
13async def consumer(q, name):
14    while True:
15        item = await q.get()
16        if item is None:
17            # put sentinel back for other consumers and break
18            await q.put(None)
19            break
20        await asyncio.sleep(random.random() * 1)
21        print(name, "consumed", item)
22        q.task_done()
23
24async def main():
25    q = asyncio.Queue(maxsize=5)  # bounded queue to apply backpressure
26    prod = asyncio.create_task(producer(q, 10))
27    cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28    await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31    asyncio.run(main())
  • Queue ayuda a coordinar productores y consumidores de forma asincrónica. Además, al establecer maxsize, el productor espera en put cuando la cola está llena, evitando la sobreproducción.

Manejo de operaciones bloqueantes síncronas: run_in_executor

Para procesamiento intensivo de CPU o al usar librerías que no soportan async, utiliza run_in_executor para delegar el procesamiento a otro hilo o proceso. Haciendo esto, se evita que se bloquee el bucle principal de eventos, permitiendo que otras tareas asincrónicas se ejecuten sin problemas.

 1import asyncio
 2import time
 3
 4def blocking_io(x):
 5    # simulate blocking I/O or CPU-bound work
 6    time.sleep(2)
 7    return x * x
 8
 9async def main():
10    loop = asyncio.get_running_loop()
11    # run blocking_io in default thread pool
12    result = await loop.run_in_executor(None, blocking_io, 3)
13    print("blocking result:", result)
14
15if __name__ == "__main__":
16    asyncio.run(main())
  • Llamar funciones síncronas directamente bloqueará el bucle de eventos. Con run_in_executor, el código se ejecuta en un hilo separado y las tareas asincrónicas pueden seguir progresando concurrentemente.

Ejemplo: Llamadas a API con límite de tasa (combinando Semaphore + run_in_executor)

A continuación, un escenario de ejemplo donde las llamadas a la API tienen un límite de tasa y se realiza un procesamiento intensivo sobre los resultados. Combinando Semaphore y run_in_executor el procesamiento puede llevarse a cabo de manera segura y eficiente.

 1import asyncio
 2import time
 3import random
 4
 5sem = asyncio.Semaphore(5)
 6
 7def heavy_sync_processing(data):
 8    # simulate heavy CPU-bound work
 9    time.sleep(1)
10    return f"processed-{data}"
11
12async def api_call(i):
13    await asyncio.sleep(random.random() * 0.5)  # simulate network latency
14    return f"data-{i}"
15
16async def worker(i):
17    async with sem:
18        data = await api_call(i)
19        # offload CPU-bound work to threadpool
20        loop = asyncio.get_running_loop()
21        result = await loop.run_in_executor(None, heavy_sync_processing, data)
22        print(result)
23
24async def main():
25    tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26    await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29    asyncio.run(main())
  • Utilizamos un Semaphore para limitar el número de llamadas a la API en concurrencia, y el procesamiento intensivo de los datos resultantes se delega a un pool de hilos. Separar el procesamiento de red y CPU mejora la eficiencia.

Cancelación de tareas y limpieza

Cuando se cancela una tarea, es muy importante gestionar correctamente finally y asyncio.CancelledError. Esto garantiza que los archivos y conexiones se liberen y los estados intermedios se gestionen adecuadamente, manteniendo la coherencia en la aplicación.

 1import asyncio
 2
 3async def long_running():
 4    try:
 5        print("started long_running")
 6        await asyncio.sleep(10)
 7        print("finished long_running")
 8    except asyncio.CancelledError:
 9        print("long_running was cancelled, cleaning up")
10        # perform cleanup here
11        raise
12
13async def main():
14    task = asyncio.create_task(long_running())
15    await asyncio.sleep(1)
16    task.cancel()
17    try:
18        await task
19    except asyncio.CancelledError:
20        print("task cancelled in main")
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • La cancelación se transmite como una excepción (CancelledError), por lo que debes realizar la limpieza necesaria en el bloque except y volver a lanzar la excepción si es necesario.

Puntos clave para el diseño práctico

Los siguientes son puntos prácticos útiles para diseñar procesamiento asincrónico.

  • Controla la concurrencia explícitamente Cuando existen límites de recursos como APIs o bases de datos, puedes limitar el número de ejecuciones concurrentes con Semaphore.

  • Gestiona los recursos compartidos de forma segura Si necesitas actualizar el estado desde varias tareas, utiliza Lock. Reducir el estado compartido y diseñar en torno a datos inmutables hace que todo sea más seguro.

  • Elige cómo recibir los resultados Si deseas procesar las tareas a medida que terminan, usa asyncio.as_completed; si quieres procesar los resultados en el orden de entrada, utiliza gather.

  • Aísla el procesamiento síncrono intensivo Para procesamiento intensivo de CPU o llamadas a librerías síncronas, utiliza run_in_executor o ProcessPoolExecutor para evitar bloquear el bucle de eventos.

  • Planifica para cancelaciones y excepciones Escribe una gestión adecuada de excepciones para limpiar recursos de manera segura incluso si una tarea es cancelada a mitad de camino.

  • Facilita las pruebas Abstrae efectos secundarios como E/S, tiempo y aleatoriedad para que puedan ser reemplazados, facilitando las pruebas de código asincrónico.

Resumen

asyncio es potente, pero si solo te enfocas en “ejecutar cosas en paralelo”, podrías encontrarte con problemas como contención de recursos compartidos, violaciones de límites de recursos o bloqueo del bucle de eventos. Combinando Semaphore, Lock, Event, Queue, run_in_executor y una correcta gestión de cancelaciones, puedes diseñar aplicaciones asincrónicas seguras y eficientes. Utilizando mecanismos como el patrón productor-consumidor, la limitación de concurrencia o separando el procesamiento asincrónico y bloqueante, pueden construirse flujos de trabajo asincrónicos de manera más segura y eficiente.

Puedes seguir el artículo anterior utilizando Visual Studio Code en nuestro canal de YouTube. Por favor, también revisa nuestro canal de YouTube.

YouTube Video