Synkroniseringskontroll i asynkrona processer i Python
Den här artikeln förklarar synkroniseringskontroll i asynkron bearbetning med Python.
Du kommer att lära dig steg för steg, från grunderna i asyncio till praktiska mönster som ofta används för synkroniseringskontroll.
YouTube Video
Synkroniseringskontroll i asynkrona processer i Python
I asynkron bearbetning kan du enkelt köra flera uppgifter samtidigt. Men i praktiken krävs mer avancerade anpassningar, såsom styrning av samtidighet, samordning av uppgifter, exklusiv kontroll över delade resurser, hantering av tunga synkrona processer och städning efter avbrott.
Här lär vi oss steg för steg från grunderna i asyncio till praktiska mönster som vanligtvis används för synkronisering.
Introduktion: Grunderna (async / await och create_task)
Låt oss först titta på lite minimal asynkron kod. await väntar vid den punkten tills den anropade koroutinen är klar och asyncio.create_task schemalägger en uppgift för samtidig körning.
1import asyncio
2
3async def worker(name, delay):
4 # Simulate async work
5 await asyncio.sleep(delay)
6 return f"{name} done after {delay}s"
7
8async def main():
9 # Create two tasks that run concurrently.
10 t1 = asyncio.create_task(worker("A", 1))
11 t2 = asyncio.create_task(worker("B", 2))
12
13 # Await both results (this suspends until both are done).
14 result1 = await t1
15 result2 = await t2
16 print(result1, result2)
17
18if __name__ == "__main__":
19 asyncio.run(main())- Den här koden är ett typiskt mönster där uppgifter explicit skapas, körs parallellt och resultatet tas emot i slutet med
await.create_taskmöjliggör samtidig körning.
Skillnader mellan asyncio.gather, asyncio.wait och asyncio.as_completed
När du kör flera koroutiner samtidigt väljer du vilken metod du använder beroende på hur du vill hämta resultat. gather väntar tills alla är färdiga och returnerar resultaten i inmatningsordning, medan as_completed gör att du kan bearbeta resultat vartefter de blir klara, oavsett ordning.
1import asyncio
2import random
3
4async def job(i):
5 delay = random.random() * 2
6 await asyncio.sleep(delay)
7 return (i, delay)
8
9async def gather_example():
10 # gather waits for all tasks and returns results in the same order as input
11 results = await asyncio.gather(*(job(i) for i in range(5)))
12 print("gather order:", results)
13
14async def as_completed_example():
15 # as_completed yields results as they finish (useful to process early results)
16 tasks = [asyncio.create_task(job(i)) for i in range(5)]
17 for coro in asyncio.as_completed(tasks):
18 res = await coro
19 print("completed:", res)
20
21async def main():
22 await gather_example()
23 await as_completed_example()
24
25if __name__ == "__main__":
26 asyncio.run(main())- Som visas i denna kod returnerar
gatherresultaten i inmatningsordning, vilket är användbart när du vill bevara ordningen.as_completedanvänds när du vill bearbeta resultat direkt när de är klara.
Styrning av samtidighet: Begränsning av samtidiga körningar med asyncio.Semaphore
När det finns gränser för externa API-anrop eller databasanslutningar kan du kontrollera samtidiga körningar med en Semaphore.
1import asyncio
2import random
3
4sem = asyncio.Semaphore(3) # allow up to 3 concurrent workers
5
6async def limited_worker(i):
7 async with sem:
8 # Only 3 tasks can be inside this block concurrently
9 delay = random.random() * 2
10 await asyncio.sleep(delay)
11 print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14 tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15 await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18 asyncio.run(main())- Genom att använda
Semaphoretillsammans medasync withkan du enkelt begränsa antalet samtidiga körningar. Detta är effektivt i situationer med externa begränsningar.
Exklusiv kontroll över delade resurser: asyncio.Lock
Lock används för att förhindra samtidiga uppdateringar av delade data. asyncio.Lock är en exklusiv primitiv för asynkront bruk.
1import asyncio
2
3counter = 0
4lock = asyncio.Lock()
5
6async def incrementer(n_times):
7 global counter
8 for _ in range(n_times):
9 # Acquire lock to update shared counter safely
10 async with lock:
11 tmp = counter
12 await asyncio.sleep(0) # yield control to increase race likelihood
13 counter = tmp + 1
14
15async def main():
16 tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17 await asyncio.gather(*tasks)
18 print("final counter:", counter)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Om flera uppgifter uppdaterar en delad variabel som en global
counterkan konflikter uppstå. Genom att omge operationer med enLockkan du bibehålla konsistens.
Samordning mellan uppgifter: asyncio.Event
Event används när en uppgift signalerar att den är klar, och andra uppgifter väntar på denna signal. Detta är ett enkelt sätt för uppgifter att dela signaler och synkronisera med varandra.
1import asyncio
2
3event = asyncio.Event()
4
5async def waiter(name):
6 print(f"{name} is waiting for event")
7 await event.wait()
8 print(f"{name} resumed after event set")
9
10async def setter():
11 print("setter will sleep and then set the event")
12 await asyncio.sleep(1)
13 event.set()
14 print("event set by setter")
15
16async def main():
17 tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18 await asyncio.create_task(setter())
19 await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22 asyncio.run(main())Eventhar en boolesk flagga, och närset()anropas återupptas alla väntande uppgifter. Det är användbart för enkel synkronisering.
Producent-konsument-mönster: asyncio.Queue
Genom att använda Queue kan producenter (som skapar data) och konsumenter (som behandlar data) samverka smidigt och asynkront. När kön är full väntar producenterna automatiskt, vilket naturligt inför backpressure för att förhindra överproduktion.
1import asyncio
2import random
3
4async def producer(q, n_items):
5 for i in range(n_items):
6 await asyncio.sleep(random.random() * 0.5)
7 item = f"item-{i}"
8 await q.put(item)
9 print("produced", item)
10 # signal consumers to stop
11 await q.put(None)
12
13async def consumer(q, name):
14 while True:
15 item = await q.get()
16 if item is None:
17 # put sentinel back for other consumers and break
18 await q.put(None)
19 break
20 await asyncio.sleep(random.random() * 1)
21 print(name, "consumed", item)
22 q.task_done()
23
24async def main():
25 q = asyncio.Queue(maxsize=5) # bounded queue to apply backpressure
26 prod = asyncio.create_task(producer(q, 10))
27 cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28 await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31 asyncio.run(main())Queuehjälper till att samordna producenter och konsumenter asynkront. Genom att angemaxsizefår du producenten att vänta påputnär kön är full, vilket förhindrar överproduktion.
Hantering av synkrona blockerande operationer: run_in_executor
För processer som kräver mycket CPU eller när du använder bibliotek som inte stöder async, använd run_in_executor för att delegera bearbetningen till en annan tråd eller process. Detta förhindrar att huvudloopen blockeras, så att andra asynkrona uppgifter kan köras smidigt.
1import asyncio
2import time
3
4def blocking_io(x):
5 # simulate blocking I/O or CPU-bound work
6 time.sleep(2)
7 return x * x
8
9async def main():
10 loop = asyncio.get_running_loop()
11 # run blocking_io in default thread pool
12 result = await loop.run_in_executor(None, blocking_io, 3)
13 print("blocking result:", result)
14
15if __name__ == "__main__":
16 asyncio.run(main())- Om du anropar synkrona funktioner direkt kommer händelseloopen att blockeras. Med
run_in_executorkörs koden i en separat tråd och asynkrona uppgifter kan fortsätta samtidigt.
Exempel: API-anrop med hastighetsbegränsning (kombinera Semaphore + run_in_executor)
Följande är ett exempel där API-anrop är hastighetsbegränsade och tung bearbetning sker på resultaten. Att kombinera Semaphore och run_in_executor låter bearbetningen fortsätta säkert och effektivt.
1import asyncio
2import time
3import random
4
5sem = asyncio.Semaphore(5)
6
7def heavy_sync_processing(data):
8 # simulate heavy CPU-bound work
9 time.sleep(1)
10 return f"processed-{data}"
11
12async def api_call(i):
13 await asyncio.sleep(random.random() * 0.5) # simulate network latency
14 return f"data-{i}"
15
16async def worker(i):
17 async with sem:
18 data = await api_call(i)
19 # offload CPU-bound work to threadpool
20 loop = asyncio.get_running_loop()
21 result = await loop.run_in_executor(None, heavy_sync_processing, data)
22 print(result)
23
24async def main():
25 tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26 await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29 asyncio.run(main())- Vi använder en
Semaphoreför att begränsa antalet samtidiga API-anrop, och tung bearbetning av data delegeras till en trådpool. Att separera nätverks- och CPU-bearbetning förbättrar effektiviteten.
Avbryt uppgifter och städning
När en uppgift avbryts är det mycket viktigt att rätt hantera finally och asyncio.CancelledError. Detta säkerställer att filer och anslutningar frigörs och mellanliggande tillstånd hanteras korrekt, vilket bevarar applikationens konsistens.
1import asyncio
2
3async def long_running():
4 try:
5 print("started long_running")
6 await asyncio.sleep(10)
7 print("finished long_running")
8 except asyncio.CancelledError:
9 print("long_running was cancelled, cleaning up")
10 # perform cleanup here
11 raise
12
13async def main():
14 task = asyncio.create_task(long_running())
15 await asyncio.sleep(1)
16 task.cancel()
17 try:
18 await task
19 except asyncio.CancelledError:
20 print("task cancelled in main")
21
22if __name__ == "__main__":
23 asyncio.run(main())- Avbrott levereras som ett undantag (
CancelledError), så utför nödvändig städning iexcept-blocket och kasta undantaget på nytt om det behövs.
Viktiga punkter för praktisk design
Följande är praktiska punkter som är användbara vid design av asynkron bearbetning.
-
Styr samtidighet explicit När det finns resursbegränsningar som API:er eller databaser kan du begränsa antalet samtidiga körningar med
Semaphore. -
Hantera delade resurser säkert Om du behöver uppdatera tillståndet från flera uppgifter, använd
Lock. Att minska delat tillstånd och utforma kring oföränderliga data gör allt säkrare. -
Välj hur du vill ta emot resultat Vill du bearbeta uppgifter när de blir klara, använd
asyncio.as_completed; vill du bearbeta resultaten i inmatningsordning, användgather. -
Isolera tung synkron bearbetning För CPU-intensiva eller synkrona biblioteksanrop, använd
run_in_executorellerProcessPoolExecutorför att undvika att blockera händelseloopen. -
Planera för avbrott och undantag Skriv ordentlig undantagshantering för att säkert städa upp resurser även om en uppgift avbryts mitt i.
-
Gör testning enkelt Abstrahera bieffekter som I/O, tid och slumpmässighet så att de kan ersättas, vilket gör det enklare att testa asynkron kod.
Sammanfattning
asyncio är kraftfullt, men om du bara fokuserar på att 'köra saker parallellt', kan du stöta på problem som konkurrens om resurser, överträdelse av resursbegränsningar eller blockering av händelseloopen. Genom att kombinera Semaphore, Lock, Event, Queue, run_in_executor och rätt avbrottshantering kan du utforma säkra och effektiva asynkrona applikationer. Genom att använda mekanismer som producent–konsument-mönster, begränsning av samtidighet eller uppdelning av asynkron och blockerande bearbetning, kan asynkrona arbetsflöden konstrueras mer säkert och effektivt.
Du kan följa med i artikeln ovan med hjälp av Visual Studio Code på vår YouTube-kanal. Vänligen kolla även in YouTube-kanalen.