Synchronisatiebeheer in asynchrone verwerking met Python
Dit artikel legt het synchronisatiebeheer uit in asynchrone verwerking met Python.
Je leert stap voor stap, van de basis van asyncio tot praktische patronen die vaak worden gebruikt voor synchronisatiecontrole.
YouTube Video
Synchronisatiebeheer in asynchrone verwerking met Python
Bij asynchrone verwerking kun je eenvoudig meerdere taken gelijktijdig uitvoeren. In de praktijk zijn echter geavanceerdere aanpassingen nodig, zoals het beheersen van gelijktijdigheid, het coördineren van taken, exclusieve controle over gedeelde resources, afhandelen van zware synchrone processen en opruimen na annuleringen.
Hier leren we stap voor stap van de basisprincipes van asyncio tot aan praktische patronen die vaak gebruikt worden voor synchronisatie.
Introductie: Basisprincipes (async / await en create_task)
Laten we eerst naar wat minimale asynchrone code kijken. await wacht op dat punt tot de aangeroepen coroutine is voltooid, en asyncio.create_task plant een taak voor gelijktijdige uitvoering.
1import asyncio
2
3async def worker(name, delay):
4 # Simulate async work
5 await asyncio.sleep(delay)
6 return f"{name} done after {delay}s"
7
8async def main():
9 # Create two tasks that run concurrently.
10 t1 = asyncio.create_task(worker("A", 1))
11 t2 = asyncio.create_task(worker("B", 2))
12
13 # Await both results (this suspends until both are done).
14 result1 = await t1
15 result2 = await t2
16 print(result1, result2)
17
18if __name__ == "__main__":
19 asyncio.run(main())- Deze code is een typisch patroon waarbij taken expliciet worden aangemaakt, parallel worden uitgevoerd en de resultaten aan het einde met
awaitworden opgehaald.create_taskmaakt gelijktijdige uitvoering mogelijk.
Verschillen tussen asyncio.gather, asyncio.wait en asyncio.as_completed
Bij het gelijktijdig uitvoeren van meerdere coroutines kies je welke functie je gebruikt, afhankelijk van hoe je de resultaten wilt ophalen. gather wacht tot alles klaar is en retourneert de resultaten in de volgorde van invoer, terwijl as_completed het mogelijk maakt resultaten te verwerken zodra ze klaar zijn, ongeacht de volgorde.
1import asyncio
2import random
3
4async def job(i):
5 delay = random.random() * 2
6 await asyncio.sleep(delay)
7 return (i, delay)
8
9async def gather_example():
10 # gather waits for all tasks and returns results in the same order as input
11 results = await asyncio.gather(*(job(i) for i in range(5)))
12 print("gather order:", results)
13
14async def as_completed_example():
15 # as_completed yields results as they finish (useful to process early results)
16 tasks = [asyncio.create_task(job(i)) for i in range(5)]
17 for coro in asyncio.as_completed(tasks):
18 res = await coro
19 print("completed:", res)
20
21async def main():
22 await gather_example()
23 await as_completed_example()
24
25if __name__ == "__main__":
26 asyncio.run(main())- Zoals in deze code te zien is, retourneert
gatherde resultaten in invoervolgorde, wat handig is wanneer je die volgorde wilt behouden.as_completedwordt gebruikt als je resultaten direct wilt verwerken zodra ze klaar zijn.
Gelijktijdigheid regelen: Gelijktijdige uitvoering beperken met asyncio.Semaphore
Als er limieten zijn op externe API-aanroepen of databankverbindingen, kun je het aantal gelijktijdige uitvoeringen regelen met een Semaphore.
1import asyncio
2import random
3
4sem = asyncio.Semaphore(3) # allow up to 3 concurrent workers
5
6async def limited_worker(i):
7 async with sem:
8 # Only 3 tasks can be inside this block concurrently
9 delay = random.random() * 2
10 await asyncio.sleep(delay)
11 print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14 tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15 await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18 asyncio.run(main())- Door
Semaphorete gebruiken metasync with, kun je eenvoudig het aantal gelijktijdige uitvoeringen beperken. Dit is effectief in situaties met externe beperkingen.
Exclusieve controle van gedeelde resources: asyncio.Lock
Lock wordt gebruikt om te voorkomen dat gedeelde data gelijktijdig worden bijgewerkt. asyncio.Lock is een exclusief primitive voor asynchroon gebruik.
1import asyncio
2
3counter = 0
4lock = asyncio.Lock()
5
6async def incrementer(n_times):
7 global counter
8 for _ in range(n_times):
9 # Acquire lock to update shared counter safely
10 async with lock:
11 tmp = counter
12 await asyncio.sleep(0) # yield control to increase race likelihood
13 counter = tmp + 1
14
15async def main():
16 tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17 await asyncio.gather(*tasks)
18 print("final counter:", counter)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Als meerdere taken een gedeelde variabele zoals een globale
counterbijwerken, kunnen er conflicten optreden. Door bewerkingen binnen eenLockte plaatsen, kun je consistentie waarborgen.
Taakcoördinatie: asyncio.Event
Event wordt gebruikt wanneer één taak aangeeft klaar te zijn en andere taken op dit signaal wachten. Dit is een eenvoudige manier voor taken om signalen te delen en met elkaar te synchroniseren.
1import asyncio
2
3event = asyncio.Event()
4
5async def waiter(name):
6 print(f"{name} is waiting for event")
7 await event.wait()
8 print(f"{name} resumed after event set")
9
10async def setter():
11 print("setter will sleep and then set the event")
12 await asyncio.sleep(1)
13 event.set()
14 print("event set by setter")
15
16async def main():
17 tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18 await asyncio.create_task(setter())
19 await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22 asyncio.run(main())Eventheeft een booleaanse vlag, en het aanroepen vanset()hervat alle wachtende taken. Het is handig voor eenvoudige synchronisatie.
Producer-consumer patroon: asyncio.Queue
Door gebruik te maken van een Queue kunnen producers (die data aanmaken) en consumers (die data verwerken) soepel en asynchroon samenwerken. Ook als de wachtrij vol is, wachten producers automatisch, waardoor vanzelf backpressure ontstaat om overproductie te voorkomen.
1import asyncio
2import random
3
4async def producer(q, n_items):
5 for i in range(n_items):
6 await asyncio.sleep(random.random() * 0.5)
7 item = f"item-{i}"
8 await q.put(item)
9 print("produced", item)
10 # signal consumers to stop
11 await q.put(None)
12
13async def consumer(q, name):
14 while True:
15 item = await q.get()
16 if item is None:
17 # put sentinel back for other consumers and break
18 await q.put(None)
19 break
20 await asyncio.sleep(random.random() * 1)
21 print(name, "consumed", item)
22 q.task_done()
23
24async def main():
25 q = asyncio.Queue(maxsize=5) # bounded queue to apply backpressure
26 prod = asyncio.create_task(producer(q, 10))
27 cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28 await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31 asyncio.run(main())Queuehelpt producenten en consumenten asynchroon te coördineren. Daarnaast zorgt het instellen vanmaxsizeervoor dat de producer bijputwacht als de wachtrij vol is, waarmee overproductie wordt voorkomen.
Omgaan met synchrone blokkerende operaties: run_in_executor
Voor CPU-intensieve bewerkingen of wanneer je bibliotheken gebruikt die geen async ondersteunen, gebruik je run_in_executor om de verwerking uit te besteden aan een andere thread of proces. Hierdoor wordt voorkomen dat de hoofdevenlus vastloopt, zodat andere asynchrone taken soepel blijven verlopen.
1import asyncio
2import time
3
4def blocking_io(x):
5 # simulate blocking I/O or CPU-bound work
6 time.sleep(2)
7 return x * x
8
9async def main():
10 loop = asyncio.get_running_loop()
11 # run blocking_io in default thread pool
12 result = await loop.run_in_executor(None, blocking_io, 3)
13 print("blocking result:", result)
14
15if __name__ == "__main__":
16 asyncio.run(main())- Het direct aanroepen van synchrone functies zal de event loop blokkeren. Met
run_in_executordraait de code in een aparte thread en kunnen asynchrone taken gelijktijdig verdergaan.
Voorbeeld: API-aanroepen met snelheidslimiet (combinatie van Semaphore + run_in_executor)
Hieronder volgt een voorbeeldscenario waarin API-aanroepen een snelheidslimiet hebben en zware verwerking wordt uitgevoerd op de resultaten. Door Semaphore en run_in_executor te combineren, kan de verwerking veilig en efficiënt verlopen.
1import asyncio
2import time
3import random
4
5sem = asyncio.Semaphore(5)
6
7def heavy_sync_processing(data):
8 # simulate heavy CPU-bound work
9 time.sleep(1)
10 return f"processed-{data}"
11
12async def api_call(i):
13 await asyncio.sleep(random.random() * 0.5) # simulate network latency
14 return f"data-{i}"
15
16async def worker(i):
17 async with sem:
18 data = await api_call(i)
19 # offload CPU-bound work to threadpool
20 loop = asyncio.get_running_loop()
21 result = await loop.run_in_executor(None, heavy_sync_processing, data)
22 print(result)
23
24async def main():
25 tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26 await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29 asyncio.run(main())- We gebruiken een
Semaphoreom het aantal gelijktijdige API-aanroepen te beperken en laten de zware verwerking van de resultaten uitvoeren door een thread pool. Door netwerk- en CPU-verwerking te scheiden, verbeter je de efficiëntie.
Taakannulering en opruimen
Wanneer een taak wordt geannuleerd, is het belangrijk finally en asyncio.CancelledError correct af te handelen. Hierdoor worden bestanden en connecties vrijgegeven en worden tussentijdse staten goed afgehandeld, zodat de consistentie van de applicatie behouden blijft.
1import asyncio
2
3async def long_running():
4 try:
5 print("started long_running")
6 await asyncio.sleep(10)
7 print("finished long_running")
8 except asyncio.CancelledError:
9 print("long_running was cancelled, cleaning up")
10 # perform cleanup here
11 raise
12
13async def main():
14 task = asyncio.create_task(long_running())
15 await asyncio.sleep(1)
16 task.cancel()
17 try:
18 await task
19 except asyncio.CancelledError:
20 print("task cancelled in main")
21
22if __name__ == "__main__":
23 asyncio.run(main())- Annulering wordt geleverd als een uitzondering (
CancelledError), dus voer de benodigde opruimacties uit in hetexcept-blok en gooi de uitzondering opnieuw als dat nodig is.
Belangrijke punten voor praktisch ontwerp
De volgende punten zijn praktisch en nuttig bij het ontwerpen van asynchrone verwerking.
-
Beheer gelijktijdigheid expliciet Als er resource-limieten zijn, zoals bij APIs of databases, kun je het aantal gelijktijdige uitvoeringen met een
Semaphorebeperken. -
Behandel gedeelde resources veilig Als je uit meerdere taken de status moet bijwerken, gebruik dan een
Lock. Het verminderen van gedeelde status en ontwerpen rond onveranderlijke data maakt het veiliger. -
Kies hoe je de resultaten ontvangt Als je taken wilt verwerken zodra ze klaar zijn, gebruik
asyncio.as_completed; als je resultaten in invoervolgorde wilt verwerken, gebruikgather. -
Isoleer zware synchrone verwerking Voor CPU-intensieve of synchrone bibliotheekoproepen gebruik je
run_in_executorofProcessPoolExecutorom blokkering van de event loop te voorkomen. -
Voorzie in annulering en uitzonderingen Schrijf goede foutafhandeling om resources veilig op te ruimen, zelfs als een taak halverwege geannuleerd wordt.
-
Maak testen eenvoudig Abstraheer bijwerkingen zoals I/O, tijd en willekeurigheid zodat ze kunnen worden vervangen, waardoor testen van asynchrone code makkelijker wordt.
Samenvatting
asyncio is krachtig, maar als je je alleen richt op “zaken parallel uitvoeren”, kun je problemen ondervinden zoals wedijver om gedeelde resources, overschrijding van resource-limieten of blokkering van de event loop. Door Semaphore, Lock, Event, Queue, run_in_executor en een goede afhandeling van annuleringen te combineren, kun je veilige en efficiënte asynchrone applicaties ontwerpen. Door mechanismen te gebruiken zoals het producer-consumer patroon, gelijktijdigheidslimiet of het scheiden van asynchrone en blokkerende verwerking, kunnen asynchrone workflows veiliger en efficiënter worden opgebouwd.
Je kunt het bovenstaande artikel volgen met Visual Studio Code op ons YouTube-kanaal. Bekijk ook het YouTube-kanaal.