Control de sincronización en el procesamiento asincrónico de Python
Este artículo explica el control de sincronización en el procesamiento asincrónico de Python.
Aprenderás paso a paso, desde los conceptos básicos de asyncio hasta patrones prácticos que se usan comúnmente para el control de sincronización.
YouTube Video
Control de sincronización en el procesamiento asincrónico de Python
En el procesamiento asincrónico, se pueden ejecutar fácilmente múltiples tareas simultáneamente. Sin embargo, en la práctica, se requieren ajustes más avanzados, como controlar la concurrencia, coordinar tareas, controlar en exclusiva los recursos compartidos, gestionar procesos síncronos pesados y realizar la limpieza tras cancelaciones.
Aquí aprenderemos paso a paso desde lo básico de asyncio hasta patrones prácticos comúnmente utilizados para la sincronización.
Introducción: Conceptos básicos (async / await y create_task)
Primero veamos un código asincrónico mínimo. await espera en ese punto hasta que la corrutina llamada termina, y asyncio.create_task programa una tarea para ejecución concurrente.
1import asyncio
2
3async def worker(name, delay):
4 # Simulate async work
5 await asyncio.sleep(delay)
6 return f"{name} done after {delay}s"
7
8async def main():
9 # Create two tasks that run concurrently.
10 t1 = asyncio.create_task(worker("A", 1))
11 t2 = asyncio.create_task(worker("B", 2))
12
13 # Await both results (this suspends until both are done).
14 result1 = await t1
15 result2 = await t2
16 print(result1, result2)
17
18if __name__ == "__main__":
19 asyncio.run(main())- Este código es un patrón típico donde las tareas se crean explícitamente, se ejecutan en paralelo y los resultados se reciben al final con
await.create_taskpermite la ejecución concurrente.
Diferencias entre asyncio.gather, asyncio.wait y asyncio.as_completed
Al ejecutar varias corrutinas simultáneamente, eliges cuál usar dependiendo de cómo deseas obtener los resultados. gather espera a que todas terminen y devuelve los resultados en el orden de entrada, mientras que as_completed permite procesar los resultados a medida que finalizan, sin importar el orden.
1import asyncio
2import random
3
4async def job(i):
5 delay = random.random() * 2
6 await asyncio.sleep(delay)
7 return (i, delay)
8
9async def gather_example():
10 # gather waits for all tasks and returns results in the same order as input
11 results = await asyncio.gather(*(job(i) for i in range(5)))
12 print("gather order:", results)
13
14async def as_completed_example():
15 # as_completed yields results as they finish (useful to process early results)
16 tasks = [asyncio.create_task(job(i)) for i in range(5)]
17 for coro in asyncio.as_completed(tasks):
18 res = await coro
19 print("completed:", res)
20
21async def main():
22 await gather_example()
23 await as_completed_example()
24
25if __name__ == "__main__":
26 asyncio.run(main())- Como se muestra en este código,
gatherdevuelve los resultados en el orden de entrada, por lo que es útil cuando deseas preservar el orden.as_completedse usa cuando quieres procesar los resultados apenas finalizan.
Control de concurrencia: Limitando ejecuciones simultáneas con asyncio.Semaphore
Cuando hay límites en la tasa de APIs externas o en las conexiones de base de datos, puedes controlar las ejecuciones concurrentes con un Semaphore.
1import asyncio
2import random
3
4sem = asyncio.Semaphore(3) # allow up to 3 concurrent workers
5
6async def limited_worker(i):
7 async with sem:
8 # Only 3 tasks can be inside this block concurrently
9 delay = random.random() * 2
10 await asyncio.sleep(delay)
11 print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14 tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15 await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18 asyncio.run(main())- Al usar
Semaphoreconasync with, puedes limitar fácilmente el número de ejecuciones simultáneas. Esto es efectivo en situaciones con restricciones externas.
Control exclusivo de recursos compartidos: asyncio.Lock
Se utiliza Lock para evitar actualizaciones simultáneas de datos compartidos. asyncio.Lock es una primitiva exclusiva para uso asincrónico.
1import asyncio
2
3counter = 0
4lock = asyncio.Lock()
5
6async def incrementer(n_times):
7 global counter
8 for _ in range(n_times):
9 # Acquire lock to update shared counter safely
10 async with lock:
11 tmp = counter
12 await asyncio.sleep(0) # yield control to increase race likelihood
13 counter = tmp + 1
14
15async def main():
16 tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17 await asyncio.gather(*tasks)
18 print("final counter:", counter)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Si varias tareas actualizan una variable compartida como un
counterglobal, pueden ocurrir conflictos. Al envolver las operaciones con unLock, puedes mantener la coherencia.
Coordinación de tareas: asyncio.Event
Event se usa cuando una tarea señala que está lista y otras tareas esperan esa señal. Esta es una forma sencilla para que las tareas compartan señales y se sincronicen entre sí.
1import asyncio
2
3event = asyncio.Event()
4
5async def waiter(name):
6 print(f"{name} is waiting for event")
7 await event.wait()
8 print(f"{name} resumed after event set")
9
10async def setter():
11 print("setter will sleep and then set the event")
12 await asyncio.sleep(1)
13 event.set()
14 print("event set by setter")
15
16async def main():
17 tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18 await asyncio.create_task(setter())
19 await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22 asyncio.run(main())Eventtiene una bandera booleana, y al llamar aset()se reanudan todas las tareas en espera. Es útil para sincronización sencilla.
Patrón productor-consumidor: asyncio.Queue
Utilizando Queue, los productores (que generan datos) y los consumidores (que procesan datos) pueden coordinarse de forma fluida y asincrónica. Además, cuando la cola está llena, los productores esperan automáticamente, implementando de forma natural retro-presión para evitar la sobreproducción.
1import asyncio
2import random
3
4async def producer(q, n_items):
5 for i in range(n_items):
6 await asyncio.sleep(random.random() * 0.5)
7 item = f"item-{i}"
8 await q.put(item)
9 print("produced", item)
10 # signal consumers to stop
11 await q.put(None)
12
13async def consumer(q, name):
14 while True:
15 item = await q.get()
16 if item is None:
17 # put sentinel back for other consumers and break
18 await q.put(None)
19 break
20 await asyncio.sleep(random.random() * 1)
21 print(name, "consumed", item)
22 q.task_done()
23
24async def main():
25 q = asyncio.Queue(maxsize=5) # bounded queue to apply backpressure
26 prod = asyncio.create_task(producer(q, 10))
27 cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28 await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31 asyncio.run(main())Queueayuda a coordinar productores y consumidores de forma asincrónica. Además, al establecermaxsize, el productor espera enputcuando la cola está llena, evitando la sobreproducción.
Manejo de operaciones bloqueantes síncronas: run_in_executor
Para procesamiento intensivo de CPU o al usar librerías que no soportan async, utiliza run_in_executor para delegar el procesamiento a otro hilo o proceso. Haciendo esto, se evita que se bloquee el bucle principal de eventos, permitiendo que otras tareas asincrónicas se ejecuten sin problemas.
1import asyncio
2import time
3
4def blocking_io(x):
5 # simulate blocking I/O or CPU-bound work
6 time.sleep(2)
7 return x * x
8
9async def main():
10 loop = asyncio.get_running_loop()
11 # run blocking_io in default thread pool
12 result = await loop.run_in_executor(None, blocking_io, 3)
13 print("blocking result:", result)
14
15if __name__ == "__main__":
16 asyncio.run(main())- Llamar funciones síncronas directamente bloqueará el bucle de eventos. Con
run_in_executor, el código se ejecuta en un hilo separado y las tareas asincrónicas pueden seguir progresando concurrentemente.
Ejemplo: Llamadas a API con límite de tasa (combinando Semaphore + run_in_executor)
A continuación, un escenario de ejemplo donde las llamadas a la API tienen un límite de tasa y se realiza un procesamiento intensivo sobre los resultados. Combinando Semaphore y run_in_executor el procesamiento puede llevarse a cabo de manera segura y eficiente.
1import asyncio
2import time
3import random
4
5sem = asyncio.Semaphore(5)
6
7def heavy_sync_processing(data):
8 # simulate heavy CPU-bound work
9 time.sleep(1)
10 return f"processed-{data}"
11
12async def api_call(i):
13 await asyncio.sleep(random.random() * 0.5) # simulate network latency
14 return f"data-{i}"
15
16async def worker(i):
17 async with sem:
18 data = await api_call(i)
19 # offload CPU-bound work to threadpool
20 loop = asyncio.get_running_loop()
21 result = await loop.run_in_executor(None, heavy_sync_processing, data)
22 print(result)
23
24async def main():
25 tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26 await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29 asyncio.run(main())- Utilizamos un
Semaphorepara limitar el número de llamadas a la API en concurrencia, y el procesamiento intensivo de los datos resultantes se delega a un pool de hilos. Separar el procesamiento de red y CPU mejora la eficiencia.
Cancelación de tareas y limpieza
Cuando se cancela una tarea, es muy importante gestionar correctamente finally y asyncio.CancelledError. Esto garantiza que los archivos y conexiones se liberen y los estados intermedios se gestionen adecuadamente, manteniendo la coherencia en la aplicación.
1import asyncio
2
3async def long_running():
4 try:
5 print("started long_running")
6 await asyncio.sleep(10)
7 print("finished long_running")
8 except asyncio.CancelledError:
9 print("long_running was cancelled, cleaning up")
10 # perform cleanup here
11 raise
12
13async def main():
14 task = asyncio.create_task(long_running())
15 await asyncio.sleep(1)
16 task.cancel()
17 try:
18 await task
19 except asyncio.CancelledError:
20 print("task cancelled in main")
21
22if __name__ == "__main__":
23 asyncio.run(main())- La cancelación se transmite como una excepción (
CancelledError), por lo que debes realizar la limpieza necesaria en el bloqueexcepty volver a lanzar la excepción si es necesario.
Puntos clave para el diseño práctico
Los siguientes son puntos prácticos útiles para diseñar procesamiento asincrónico.
-
Controla la concurrencia explícitamente Cuando existen límites de recursos como APIs o bases de datos, puedes limitar el número de ejecuciones concurrentes con
Semaphore. -
Gestiona los recursos compartidos de forma segura Si necesitas actualizar el estado desde varias tareas, utiliza
Lock. Reducir el estado compartido y diseñar en torno a datos inmutables hace que todo sea más seguro. -
Elige cómo recibir los resultados Si deseas procesar las tareas a medida que terminan, usa
asyncio.as_completed; si quieres procesar los resultados en el orden de entrada, utilizagather. -
Aísla el procesamiento síncrono intensivo Para procesamiento intensivo de CPU o llamadas a librerías síncronas, utiliza
run_in_executoroProcessPoolExecutorpara evitar bloquear el bucle de eventos. -
Planifica para cancelaciones y excepciones Escribe una gestión adecuada de excepciones para limpiar recursos de manera segura incluso si una tarea es cancelada a mitad de camino.
-
Facilita las pruebas Abstrae efectos secundarios como E/S, tiempo y aleatoriedad para que puedan ser reemplazados, facilitando las pruebas de código asincrónico.
Resumen
asyncio es potente, pero si solo te enfocas en “ejecutar cosas en paralelo”, podrías encontrarte con problemas como contención de recursos compartidos, violaciones de límites de recursos o bloqueo del bucle de eventos. Combinando Semaphore, Lock, Event, Queue, run_in_executor y una correcta gestión de cancelaciones, puedes diseñar aplicaciones asincrónicas seguras y eficientes. Utilizando mecanismos como el patrón productor-consumidor, la limitación de concurrencia o separando el procesamiento asincrónico y bloqueante, pueden construirse flujos de trabajo asincrónicos de manera más segura y eficiente.
Puedes seguir el artículo anterior utilizando Visual Studio Code en nuestro canal de YouTube. Por favor, también revisa nuestro canal de YouTube.