Synkroniseringskontrol i asynkron Python-behandling
Denne artikel forklarer synkroniseringskontrol i asynkron Python-behandling.
Du vil lære trin for trin, fra grundlæggende om asyncio til praktiske mønstre, der almindeligvis bruges til synkroniseringskontrol.
YouTube Video
Synkroniseringskontrol i asynkron Python-behandling
I asynkron behandling kan du nemt køre flere opgaver samtidig. Men i praksis kræves mere avancerede justeringer, såsom styring af samtidighed, koordinering af opgaver, eksklusiv kontrol af delte ressourcer, håndtering af tunge synkrone processer og oprydning efter annulleringer.
Her vil vi trin for trin lære fra grundlæggende asyncio til praktiske mønstre, der ofte bruges til synkronisering.
Introduktion: Grundlæggende (async / await og create_task)
Lad os først se på noget minimalt asynkront kode. await venter på dette tidspunkt, indtil den kaldte koroutine afslutter, og asyncio.create_task planlægger en opgave til samtidig udførelse.
1import asyncio
2
3async def worker(name, delay):
4 # Simulate async work
5 await asyncio.sleep(delay)
6 return f"{name} done after {delay}s"
7
8async def main():
9 # Create two tasks that run concurrently.
10 t1 = asyncio.create_task(worker("A", 1))
11 t2 = asyncio.create_task(worker("B", 2))
12
13 # Await both results (this suspends until both are done).
14 result1 = await t1
15 result2 = await t2
16 print(result1, result2)
17
18if __name__ == "__main__":
19 asyncio.run(main())- Denne kode er et typisk mønster, hvor opgaver eksplicit oprettes, køres parallelt, og resultater modtages til sidst med
await.create_taskmuliggør samtidig udførelse.
Forskelle mellem asyncio.gather, asyncio.wait og asyncio.as_completed
Når du kører flere koroutiner samtidig, vælger du, hvilken du vil bruge, afhængigt af hvordan du vil hente resultaterne. gather venter på, at alle er færdige og returnerer resultaterne i den rækkefølge, de blev givet, mens as_completed tillader at behandle resultaterne, så snart de er færdige, uanset rækkefølge.
1import asyncio
2import random
3
4async def job(i):
5 delay = random.random() * 2
6 await asyncio.sleep(delay)
7 return (i, delay)
8
9async def gather_example():
10 # gather waits for all tasks and returns results in the same order as input
11 results = await asyncio.gather(*(job(i) for i in range(5)))
12 print("gather order:", results)
13
14async def as_completed_example():
15 # as_completed yields results as they finish (useful to process early results)
16 tasks = [asyncio.create_task(job(i)) for i in range(5)]
17 for coro in asyncio.as_completed(tasks):
18 res = await coro
19 print("completed:", res)
20
21async def main():
22 await gather_example()
23 await as_completed_example()
24
25if __name__ == "__main__":
26 asyncio.run(main())- Som vist i denne kode returnerer
gatherresultaterne i inddata-rækkefølge, hvilket er nyttigt, når du vil bevare rækkefølgen.as_completedbruges, når du vil behandle resultater, så snart de er færdige.
Styring af samtidighed: Begrænsning af samtidige udførelser med asyncio.Semaphore
Når der er eksterne API-ratebegrænsninger eller databaseforbindelsesgrænser, kan du styre samtidige udførelser med en Semaphore.
1import asyncio
2import random
3
4sem = asyncio.Semaphore(3) # allow up to 3 concurrent workers
5
6async def limited_worker(i):
7 async with sem:
8 # Only 3 tasks can be inside this block concurrently
9 delay = random.random() * 2
10 await asyncio.sleep(delay)
11 print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14 tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15 await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18 asyncio.run(main())- Ved at bruge
Semaphoremedasync withkan du nemt begrænse antallet af samtidige udførelser. Dette er effektivt i situationer med eksterne begrænsninger.
Eksklusiv kontrol af delte ressourcer: asyncio.Lock
Lock bruges til at forhindre samtidige opdateringer af delte data. asyncio.Lock er en eksklusiv primitive til asynkron brug.
1import asyncio
2
3counter = 0
4lock = asyncio.Lock()
5
6async def incrementer(n_times):
7 global counter
8 for _ in range(n_times):
9 # Acquire lock to update shared counter safely
10 async with lock:
11 tmp = counter
12 await asyncio.sleep(0) # yield control to increase race likelihood
13 counter = tmp + 1
14
15async def main():
16 tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17 await asyncio.gather(*tasks)
18 print("final counter:", counter)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Hvis flere opgaver opdaterer en delt variabel såsom en global
counter, kan der opstå konflikter. Ved at omslutte handlinger med enLockkan du bevare konsistens.
Opgavekoordinering: asyncio.Event
Event bruges, når én opgave signalerer, at den er klar, og andre opgaver venter på dette signal. Dette er en enkel måde for opgaver at dele signaler og synkronisere med hinanden.
1import asyncio
2
3event = asyncio.Event()
4
5async def waiter(name):
6 print(f"{name} is waiting for event")
7 await event.wait()
8 print(f"{name} resumed after event set")
9
10async def setter():
11 print("setter will sleep and then set the event")
12 await asyncio.sleep(1)
13 event.set()
14 print("event set by setter")
15
16async def main():
17 tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18 await asyncio.create_task(setter())
19 await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22 asyncio.run(main())Eventhar et boolean-flag, og ved at kaldeset()genoptages alle ventende opgaver. Det er nyttigt til enkel synkronisering.
Producer-forbruger-mønster: asyncio.Queue
Ved at bruge Queue kan producenter (der opretter data) og forbrugere (der behandler data) koordinere glat og asynkront. Når køen også er fuld, venter producenterne automatisk, hvilket naturligt indfører backpressure for at forhindre overproduktion.
1import asyncio
2import random
3
4async def producer(q, n_items):
5 for i in range(n_items):
6 await asyncio.sleep(random.random() * 0.5)
7 item = f"item-{i}"
8 await q.put(item)
9 print("produced", item)
10 # signal consumers to stop
11 await q.put(None)
12
13async def consumer(q, name):
14 while True:
15 item = await q.get()
16 if item is None:
17 # put sentinel back for other consumers and break
18 await q.put(None)
19 break
20 await asyncio.sleep(random.random() * 1)
21 print(name, "consumed", item)
22 q.task_done()
23
24async def main():
25 q = asyncio.Queue(maxsize=5) # bounded queue to apply backpressure
26 prod = asyncio.create_task(producer(q, 10))
27 cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28 await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31 asyncio.run(main())Queuehjælper med at koordinere producenter og forbrugere asynkront. Derudover får indstilling afmaxsizeproducenten til at vente påput, når køen er fuld, hvilket forhindrer overproduktion.
Håndtering af synkrone blokerende operationer: run_in_executor
For CPU-intensive behandlinger eller når du bruger biblioteker, der ikke understøtter async, skal du bruge run_in_executor til at delegere behandlingen til en anden tråd eller proces. Dette forhindrer den primære event-loop i at stoppe, så andre asynkrone opgaver kan køre problemfrit.
1import asyncio
2import time
3
4def blocking_io(x):
5 # simulate blocking I/O or CPU-bound work
6 time.sleep(2)
7 return x * x
8
9async def main():
10 loop = asyncio.get_running_loop()
11 # run blocking_io in default thread pool
12 result = await loop.run_in_executor(None, blocking_io, 3)
13 print("blocking result:", result)
14
15if __name__ == "__main__":
16 asyncio.run(main())- Direkte kald af synkrone funktioner vil blokere event-loop'en. Med
run_in_executorkører koden i en separat tråd, og asynkrone opgaver kan fortsætte samtidig.
Eksempel: API-kald med ratebegrænsning (kombination af Semaphore + run_in_executor)
Følgende er et eksempelscenarie, hvor API-kald er ratebegrænsede, og der udføres tung behandling på resultaterne. Kombinationen af Semaphore og run_in_executor gør det muligt at behandle sikkert og effektivt.
1import asyncio
2import time
3import random
4
5sem = asyncio.Semaphore(5)
6
7def heavy_sync_processing(data):
8 # simulate heavy CPU-bound work
9 time.sleep(1)
10 return f"processed-{data}"
11
12async def api_call(i):
13 await asyncio.sleep(random.random() * 0.5) # simulate network latency
14 return f"data-{i}"
15
16async def worker(i):
17 async with sem:
18 data = await api_call(i)
19 # offload CPU-bound work to threadpool
20 loop = asyncio.get_running_loop()
21 result = await loop.run_in_executor(None, heavy_sync_processing, data)
22 print(result)
23
24async def main():
25 tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26 await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29 asyncio.run(main())- Vi bruger en
Semaphoretil at begrænse antallet af samtidige API-kald, og tung behandling af de resulterende data videregives til en trådpulje. Adskillelse af netværks- og CPU-behandling øger effektiviteten.
Opgaveannullering og oprydning
Når en opgave annulleres, er korrekt håndtering af finally og asyncio.CancelledError meget vigtig. Dette sikrer, at filer og forbindelser frigives, og at mellemliggende tilstande håndteres korrekt, hvilket opretholder konsistensen i applikationen.
1import asyncio
2
3async def long_running():
4 try:
5 print("started long_running")
6 await asyncio.sleep(10)
7 print("finished long_running")
8 except asyncio.CancelledError:
9 print("long_running was cancelled, cleaning up")
10 # perform cleanup here
11 raise
12
13async def main():
14 task = asyncio.create_task(long_running())
15 await asyncio.sleep(1)
16 task.cancel()
17 try:
18 await task
19 except asyncio.CancelledError:
20 print("task cancelled in main")
21
22if __name__ == "__main__":
23 asyncio.run(main())- Annullering leveres som en undtagelse (
CancelledError), så udfør den nødvendige oprydning iexcept-blokken og genudløs undtagelsen, hvis det er nødvendigt.
Vigtige punkter for praktisk design
Følgende er praktiske punkter, der er nyttige til at designe asynkron behandling.
-
Styr samtidighed eksplicit Når der er ressourceloft såsom API'er eller databaser, kan du begrænse antallet af samtidige udførelser med
Semaphore. -
Håndter delte ressourcer sikkert Hvis du har brug for at opdatere tilstanden fra flere opgaver, skal du bruge
Lock. Reduktion af delt tilstand og design omkring uforanderlige data gør det sikrere. -
Vælg hvordan du vil modtage resultater Hvis du vil bearbejde opgaver, så snart de fuldføres, brug
asyncio.as_completed; hvis du vil behandle resultaterne i inddata-rækkefølge, bruggather. -
Isolér tung synkron behandling Til CPU-tunge eller synkrone biblioteksopkald, brug
run_in_executorellerProcessPoolExecutorfor at undgå blokering af event-loop'en. -
Planlæg for annullering og undtagelser Skriv korrekt undtagelseshåndtering for sikkert at rydde op i ressourcer, selv hvis en opgave annulleres undervejs.
-
Gør testning nem Abstraher sideeffekter som I/O, tid og tilfældighed, så de kan erstattes og det bliver lettere at teste asynkron kode.
Sammendrag
asyncio er kraftfuldt, men hvis du kun fokuserer på at "køre ting parallelt", kan du støde på problemer såsom kamp om delte ressourcer, overskridelse af ressourceloft eller blokering af event-loop'en. Ved at kombinere Semaphore, Lock, Event, Queue, run_in_executor og korrekt annulleringshåndtering kan du designe sikre og effektive asynkrone applikationer. Ved at udnytte mekanismer som producer-forbruger-mønsteret, samtidighedsbegrænsning eller adskillelse af asynkron og blokerende behandling kan asynkrone arbejdsgange konstrueres mere sikkert og effektivt.
Du kan følge med i ovenstående artikel ved hjælp af Visual Studio Code på vores YouTube-kanal. Husk også at tjekke YouTube-kanalen.