Contrôle de la synchronisation dans le traitement asynchrone Python
Cet article explique le contrôle de la synchronisation dans le traitement asynchrone en Python.
Vous apprendrez étape par étape, des bases de asyncio aux modèles pratiques couramment utilisés pour le contrôle de la synchronisation.
YouTube Video
Contrôle de la synchronisation dans le traitement asynchrone Python
Dans le traitement asynchrone, il est facile d'exécuter plusieurs tâches simultanément. Cependant, en pratique, des ajustements plus avancés sont nécessaires, tels que la gestion de la concurrence, la coordination des tâches, le contrôle exclusif des ressources partagées, la gestion des processus synchrones lourds et le nettoyage après les annulations.
Ici, nous allons apprendre étape par étape, des bases d'asyncio aux modèles pratiques couramment utilisés pour la synchronisation.
Introduction : Bases (async / await et create_task)
Jetons d'abord un coup d'œil à un code asynchrone minimal. await attend à ce point jusqu'à ce que la coroutine appelée se termine, et asyncio.create_task planifie une tâche pour une exécution concurrente.
1import asyncio
2
3async def worker(name, delay):
4 # Simulate async work
5 await asyncio.sleep(delay)
6 return f"{name} done after {delay}s"
7
8async def main():
9 # Create two tasks that run concurrently.
10 t1 = asyncio.create_task(worker("A", 1))
11 t2 = asyncio.create_task(worker("B", 2))
12
13 # Await both results (this suspends until both are done).
14 result1 = await t1
15 result2 = await t2
16 print(result1, result2)
17
18if __name__ == "__main__":
19 asyncio.run(main())- Ce code suit un schéma typique où les tâches sont créées explicitement, exécutées en parallèle et les résultats sont récupérés à la fin avec
await.create_taskpermet l'exécution concurrente.
Différences entre asyncio.gather, asyncio.wait et asyncio.as_completed
Lorsque vous exécutez plusieurs coroutines concurremment, vous choisissez laquelle utiliser en fonction de la manière dont vous souhaitez récupérer les résultats. gather attend que toutes se terminent et retourne les résultats dans l'ordre d'entrée, tandis que as_completed permet de traiter les résultats au fur et à mesure qu'ils se terminent, indépendamment de l'ordre.
1import asyncio
2import random
3
4async def job(i):
5 delay = random.random() * 2
6 await asyncio.sleep(delay)
7 return (i, delay)
8
9async def gather_example():
10 # gather waits for all tasks and returns results in the same order as input
11 results = await asyncio.gather(*(job(i) for i in range(5)))
12 print("gather order:", results)
13
14async def as_completed_example():
15 # as_completed yields results as they finish (useful to process early results)
16 tasks = [asyncio.create_task(job(i)) for i in range(5)]
17 for coro in asyncio.as_completed(tasks):
18 res = await coro
19 print("completed:", res)
20
21async def main():
22 await gather_example()
23 await as_completed_example()
24
25if __name__ == "__main__":
26 asyncio.run(main())- Comme le montre ce code,
gatherretourne les résultats dans l'ordre d'entrée, ce qui est utile lorsque vous souhaitez préserver l'ordre.as_completedest utilisé lorsque vous souhaitez traiter les résultats dès qu'ils sont terminés.
Contrôler la concurrence : Limiter les exécutions simultanées avec asyncio.Semaphore
Lorsqu'il existe des limites de débit API externes ou de connexions à la base de données, vous pouvez contrôler l'exécution concurrente avec un Semaphore.
1import asyncio
2import random
3
4sem = asyncio.Semaphore(3) # allow up to 3 concurrent workers
5
6async def limited_worker(i):
7 async with sem:
8 # Only 3 tasks can be inside this block concurrently
9 delay = random.random() * 2
10 await asyncio.sleep(delay)
11 print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14 tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15 await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18 asyncio.run(main())- En utilisant
Semaphoreavecasync with, vous pouvez facilement limiter le nombre d'exécutions simultanées. Ceci est efficace dans des situations avec des contraintes externes.
Contrôle exclusif des ressources partagées : asyncio.Lock
Lock est utilisé pour empêcher les mises à jour simultanées des données partagées. asyncio.Lock est une primitive exclusive à usage asynchrone.
1import asyncio
2
3counter = 0
4lock = asyncio.Lock()
5
6async def incrementer(n_times):
7 global counter
8 for _ in range(n_times):
9 # Acquire lock to update shared counter safely
10 async with lock:
11 tmp = counter
12 await asyncio.sleep(0) # yield control to increase race likelihood
13 counter = tmp + 1
14
15async def main():
16 tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17 await asyncio.gather(*tasks)
18 print("final counter:", counter)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Si plusieurs tâches mettent à jour une variable partagée comme un
counterglobal, des conflits peuvent se produire. En entourant les opérations d'unLock, vous pouvez maintenir la cohérence.
Coordination des tâches : asyncio.Event
Event est utilisé lorsqu'une tâche signale qu'elle est prête et que d'autres tâches attendent ce signal. Il s'agit d'un moyen simple pour les tâches de partager des signaux et de se synchroniser les unes avec les autres.
1import asyncio
2
3event = asyncio.Event()
4
5async def waiter(name):
6 print(f"{name} is waiting for event")
7 await event.wait()
8 print(f"{name} resumed after event set")
9
10async def setter():
11 print("setter will sleep and then set the event")
12 await asyncio.sleep(1)
13 event.set()
14 print("event set by setter")
15
16async def main():
17 tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18 await asyncio.create_task(setter())
19 await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22 asyncio.run(main())Eventpossède un indicateur booléen, et l'appel àset()réveille toutes les tâches en attente. C'est utile pour une synchronisation simple.
Modèle producteur-consommateur : asyncio.Queue
En utilisant Queue, les producteurs (qui créent les données) et les consommateurs (qui traitent les données) peuvent se coordonner facilement et de manière asynchrone. De plus, lorsque la file d'attente est pleine, les producteurs attendent automatiquement, mettant en œuvre une contre-pression pour éviter la surproduction.
1import asyncio
2import random
3
4async def producer(q, n_items):
5 for i in range(n_items):
6 await asyncio.sleep(random.random() * 0.5)
7 item = f"item-{i}"
8 await q.put(item)
9 print("produced", item)
10 # signal consumers to stop
11 await q.put(None)
12
13async def consumer(q, name):
14 while True:
15 item = await q.get()
16 if item is None:
17 # put sentinel back for other consumers and break
18 await q.put(None)
19 break
20 await asyncio.sleep(random.random() * 1)
21 print(name, "consumed", item)
22 q.task_done()
23
24async def main():
25 q = asyncio.Queue(maxsize=5) # bounded queue to apply backpressure
26 prod = asyncio.create_task(producer(q, 10))
27 cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28 await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31 asyncio.run(main())Queueaide à coordonner producteurs et consommateurs de manière asynchrone. En outre, le paramétrage demaxsizefait patienter le producteur lors d'unputsi la file est pleine, évitant ainsi la surproduction.
Gestion des opérations bloquantes synchrones : run_in_executor
Pour le traitement intensif CPU ou lors de l'utilisation de bibliothèques non compatibles avec l'asynchrone, utilisez run_in_executor pour déléguer le traitement à un autre thread ou processus. Cela permet d'éviter le blocage de la boucle d'événement principale, ce qui autorise l'exécution fluide des autres tâches asynchrones.
1import asyncio
2import time
3
4def blocking_io(x):
5 # simulate blocking I/O or CPU-bound work
6 time.sleep(2)
7 return x * x
8
9async def main():
10 loop = asyncio.get_running_loop()
11 # run blocking_io in default thread pool
12 result = await loop.run_in_executor(None, blocking_io, 3)
13 print("blocking result:", result)
14
15if __name__ == "__main__":
16 asyncio.run(main())- Appeler directement des fonctions synchrones bloquera la boucle d'événements. Avec
run_in_executor, le code s'exécute dans un thread séparé et les tâches asynchrones peuvent continuer à progresser simultanément.
Exemple : Appels API soumis à des limitations de débit (combinaison Semaphore + run_in_executor)
Voici un exemple de scénario où les appels d'API sont limités en débit et où un traitement lourd est effectué sur les résultats. La combinaison de Semaphore et run_in_executor permet au traitement de se poursuivre de manière sûre et efficace.
1import asyncio
2import time
3import random
4
5sem = asyncio.Semaphore(5)
6
7def heavy_sync_processing(data):
8 # simulate heavy CPU-bound work
9 time.sleep(1)
10 return f"processed-{data}"
11
12async def api_call(i):
13 await asyncio.sleep(random.random() * 0.5) # simulate network latency
14 return f"data-{i}"
15
16async def worker(i):
17 async with sem:
18 data = await api_call(i)
19 # offload CPU-bound work to threadpool
20 loop = asyncio.get_running_loop()
21 result = await loop.run_in_executor(None, heavy_sync_processing, data)
22 print(result)
23
24async def main():
25 tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26 await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29 asyncio.run(main())- Nous utilisons un
Semaphorepour limiter le nombre d'appels API concurrents, et le traitement lourd est délégué à un pool de threads. Séparer le traitement réseau du traitement CPU améliore l'efficacité.
Annulation de tâche et nettoyage
Lorsqu'une tâche est annulée, il est très important de gérer correctement les blocs finally et asyncio.CancelledError. Cela garantit la libération des fichiers et des connexions ainsi que la gestion correcte des états intermédiaires, maintenant la cohérence de l'application.
1import asyncio
2
3async def long_running():
4 try:
5 print("started long_running")
6 await asyncio.sleep(10)
7 print("finished long_running")
8 except asyncio.CancelledError:
9 print("long_running was cancelled, cleaning up")
10 # perform cleanup here
11 raise
12
13async def main():
14 task = asyncio.create_task(long_running())
15 await asyncio.sleep(1)
16 task.cancel()
17 try:
18 await task
19 except asyncio.CancelledError:
20 print("task cancelled in main")
21
22if __name__ == "__main__":
23 asyncio.run(main())- L'annulation se produit sous forme d'exception (
CancelledError), effectuez donc le nettoyage nécessaire dans le blocexceptet relancez l'exception si besoin.
Points clés pour la conception pratique
Voici des points pratiques utiles pour la conception de traitements asynchrones.
-
Contrôlez explicitement la concurrence Lorsque des limites de ressources existent (API, base de données), vous pouvez limiter le nombre d'exécutions simultanées avec un
Semaphore. -
Gérez les ressources partagées de manière sécurisée Si vous devez mettre à jour un état depuis plusieurs tâches, utilisez
Lock. Réduire l’état partagé et privilégier les données immuables renforce la sécurité. -
Choisissez comment recevoir les résultats Pour traiter les tâches dès qu'elles se terminent, utilisez
asyncio.as_completed; pour traiter les résultats dans l'ordre d'entrée, utilisezgather. -
Isolez le traitement synchrone lourd Pour les traitements gourmands en CPU ou les appels de bibliothèques synchrones, utilisez
run_in_executorouProcessPoolExecutorafin de ne pas bloquer la boucle d'événement. -
Préparez-vous à l'annulation et à la gestion des exceptions Écrivez une gestion des exceptions appropriée afin de nettoyer correctement les ressources même si une tâche est annulée en cours d'exécution.
-
Facilitez les tests Isolez les effets de bord comme l'I/O, le temps et l'aléatoire pour qu'ils puissent être remplacés, facilitant ainsi le test du code asynchrone.
Résumé
asyncio est puissant, mais si vous vous concentrez seulement sur « exécuter des tâches en parallèle », vous risquez de rencontrer des problèmes tels que la contention sur les ressources partagées, le dépassement des limites de ressources ou le blocage de la boucle d'événement. En combinant Semaphore, Lock, Event, Queue, run_in_executor et une gestion correcte de l’annulation, vous pouvez concevoir des applications asynchrones sûres et efficaces. En utilisant des mécanismes tels que le modèle producteur-consommateur, la limitation de la concurrence ou la séparation du traitement asynchrone et bloquant, les workflows asynchrones peuvent être construits de manière plus sûre et plus efficace.
Vous pouvez suivre l'article ci-dessus avec Visual Studio Code sur notre chaîne YouTube. Veuillez également consulter la chaîne YouTube.