Synkroniseringskontroll i asynkron behandling med Python
Denne artikkelen forklarer synkroniseringskontroll i asynkron behandling med Python.
Du vil lære steg for steg, fra det grunnleggende i asyncio til praktiske mønstre som ofte brukes for synkroniseringskontroll.
YouTube Video
Synkroniseringskontroll i asynkron behandling med Python
I asynkron behandling kan man enkelt kjøre flere oppgaver samtidig. Men i praksis kreves det mer avanserte justeringer, som å kontrollere samtidighet, koordinere oppgaver, eksklusiv kontroll av delte ressurser, håndtere tunge synkrone prosesser og opprydning etter avbrudd.
Her skal vi lære steg for steg fra grunnleggende bruk av asyncio til praktiske mønstre som ofte brukes for synkronisering.
Introduksjon: Grunnleggende (async / await og create_task)
La oss først se på noe helt enkelt asynkront kodeeksempel. await venter på det tidspunktet til den kalte koroutinen er ferdig, og asyncio.create_task oppretter en oppgave for samtidig kjøring.
1import asyncio
2
3async def worker(name, delay):
4 # Simulate async work
5 await asyncio.sleep(delay)
6 return f"{name} done after {delay}s"
7
8async def main():
9 # Create two tasks that run concurrently.
10 t1 = asyncio.create_task(worker("A", 1))
11 t2 = asyncio.create_task(worker("B", 2))
12
13 # Await both results (this suspends until both are done).
14 result1 = await t1
15 result2 = await t2
16 print(result1, result2)
17
18if __name__ == "__main__":
19 asyncio.run(main())- Denne koden er et typisk mønster hvor oppgaver blir eksplisitt opprettet, kjørt parallelt, og resultatene hentes til slutt med
await.create_taskmuliggjør samtidig utførelse.
Forskjellen mellom asyncio.gather, asyncio.wait og asyncio.as_completed
Når du kjører flere korutiner parallelt, velger du hvilken metode du vil bruke avhengig av hvordan du ønsker å hente resultatene. gather venter til alle er ferdige og returnerer resultatene i den rekkefølgen de ble sendt inn, mens as_completed lar deg behandle resultatene etter hvert som de blir ferdige, uansett rekkefølge.
1import asyncio
2import random
3
4async def job(i):
5 delay = random.random() * 2
6 await asyncio.sleep(delay)
7 return (i, delay)
8
9async def gather_example():
10 # gather waits for all tasks and returns results in the same order as input
11 results = await asyncio.gather(*(job(i) for i in range(5)))
12 print("gather order:", results)
13
14async def as_completed_example():
15 # as_completed yields results as they finish (useful to process early results)
16 tasks = [asyncio.create_task(job(i)) for i in range(5)]
17 for coro in asyncio.as_completed(tasks):
18 res = await coro
19 print("completed:", res)
20
21async def main():
22 await gather_example()
23 await as_completed_example()
24
25if __name__ == "__main__":
26 asyncio.run(main())- Som vist i denne koden, returnerer
gatherresultatene i innsendt rekkefølge, noe som er nyttig hvis du vil bevare rekkefølgen.as_completedbrukes når du vil behandle resultatene så snart de er ferdige.
Kontrollere samtidighet: Begrens samtidig utførelse med asyncio.Semaphore
Når det er f.eks. begrensninger i eksterne API-er eller databasen, kan du kontrollere samtidige kjøringer med en Semaphore.
1import asyncio
2import random
3
4sem = asyncio.Semaphore(3) # allow up to 3 concurrent workers
5
6async def limited_worker(i):
7 async with sem:
8 # Only 3 tasks can be inside this block concurrently
9 delay = random.random() * 2
10 await asyncio.sleep(delay)
11 print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14 tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15 await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18 asyncio.run(main())- Ved å bruke
Semaphoresammen medasync with, kan du enkelt begrense antall samtidige kjøringer. Dette er effektivt i situasjoner med eksterne begrensninger.
Eksklusiv kontroll av delte ressurser: asyncio.Lock
Lock brukes for å hindre samtidige oppdateringer av delte data. asyncio.Lock er en eksklusiv primitiv spesielt for asynkron bruk.
1import asyncio
2
3counter = 0
4lock = asyncio.Lock()
5
6async def incrementer(n_times):
7 global counter
8 for _ in range(n_times):
9 # Acquire lock to update shared counter safely
10 async with lock:
11 tmp = counter
12 await asyncio.sleep(0) # yield control to increase race likelihood
13 counter = tmp + 1
14
15async def main():
16 tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17 await asyncio.gather(*tasks)
18 print("final counter:", counter)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Hvis flere oppgaver oppdaterer en delt variabel, som for eksempel en global
counter, kan det oppstå konflikter. Ved å bruke enLockrundt operasjonene, kan du opprettholde konsistens.
Oppgavekoordinering: asyncio.Event
Event brukes når en oppgave signaliserer at den er klar, og andre venter på dette signalet. Dette er en enkel måte for oppgaver å dele signaler og synkronisere med hverandre.
1import asyncio
2
3event = asyncio.Event()
4
5async def waiter(name):
6 print(f"{name} is waiting for event")
7 await event.wait()
8 print(f"{name} resumed after event set")
9
10async def setter():
11 print("setter will sleep and then set the event")
12 await asyncio.sleep(1)
13 event.set()
14 print("event set by setter")
15
16async def main():
17 tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18 await asyncio.create_task(setter())
19 await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22 asyncio.run(main())Eventhar en boolsk flagg, og hvis du kallerset(), gjenopptas alle ventende oppgaver. Den er nyttig for enkel synkronisering.
Produsent-konsument-mønster: asyncio.Queue
Ved å bruke Queue kan produsenter (som lager data) og konsumenter (som prosesserer data) koordinere smidig og asynkront. Når køen er full, vil produsentene automatisk vente, noe som gir en naturlig backpressure for å unngå overproduksjon.
1import asyncio
2import random
3
4async def producer(q, n_items):
5 for i in range(n_items):
6 await asyncio.sleep(random.random() * 0.5)
7 item = f"item-{i}"
8 await q.put(item)
9 print("produced", item)
10 # signal consumers to stop
11 await q.put(None)
12
13async def consumer(q, name):
14 while True:
15 item = await q.get()
16 if item is None:
17 # put sentinel back for other consumers and break
18 await q.put(None)
19 break
20 await asyncio.sleep(random.random() * 1)
21 print(name, "consumed", item)
22 q.task_done()
23
24async def main():
25 q = asyncio.Queue(maxsize=5) # bounded queue to apply backpressure
26 prod = asyncio.create_task(producer(q, 10))
27 cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28 await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31 asyncio.run(main())Queuehjelper produsenter og konsumenter å koordinere asynkront. I tillegg, hvis du settermaxsize, vil produsenten vente påputnår køen er full, og dermed forhindre overproduksjon.
Håndtering av synkrone blokkerende operasjoner: run_in_executor
For CPU-intensive oppgaver eller bruk av biblioteker som ikke støtter async, bruk run_in_executor for å delegere behandlingen til en annen tråd eller prosess. Dette hindrer at hovedløkken stopper, slik at andre asynkrone oppgaver kan kjøre uforstyrret.
1import asyncio
2import time
3
4def blocking_io(x):
5 # simulate blocking I/O or CPU-bound work
6 time.sleep(2)
7 return x * x
8
9async def main():
10 loop = asyncio.get_running_loop()
11 # run blocking_io in default thread pool
12 result = await loop.run_in_executor(None, blocking_io, 3)
13 print("blocking result:", result)
14
15if __name__ == "__main__":
16 asyncio.run(main())- Hvis du kaller synkrone funksjoner direkte, vil du blokkere event-loopen. Med
run_in_executorkjører koden i en separat tråd, og asynkrone oppgaver kan fortsette parallelt.
Eksempel: API-kall med hastighetsbegrensning (kombinere Semaphore + run_in_executor)
Dette er et scenario der API-kall er ratebegrenset og det utføres tung prosessering på resultatene. Kombinering av Semaphore og run_in_executor gjør at behandlingen går trygt og effektivt.
1import asyncio
2import time
3import random
4
5sem = asyncio.Semaphore(5)
6
7def heavy_sync_processing(data):
8 # simulate heavy CPU-bound work
9 time.sleep(1)
10 return f"processed-{data}"
11
12async def api_call(i):
13 await asyncio.sleep(random.random() * 0.5) # simulate network latency
14 return f"data-{i}"
15
16async def worker(i):
17 async with sem:
18 data = await api_call(i)
19 # offload CPU-bound work to threadpool
20 loop = asyncio.get_running_loop()
21 result = await loop.run_in_executor(None, heavy_sync_processing, data)
22 print(result)
23
24async def main():
25 tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26 await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29 asyncio.run(main())- Vi bruker en
Semaphorefor å begrense antall samtidige API-kall, og tung prosessering på resultatene delegere vi til en trådpøl. Å skille mellom nettverks- og CPU-behandling øker effektiviteten.
Avbrytelse av oppgaver og opprydding
Når en oppgave avbrytes, er det veldig viktig å håndtere finally og asyncio.CancelledError riktig. Dette sikrer at filer og tilkoblinger frigjøres og mellomtilstander håndteres riktig, slik at applikasjonen forblir konsistent.
1import asyncio
2
3async def long_running():
4 try:
5 print("started long_running")
6 await asyncio.sleep(10)
7 print("finished long_running")
8 except asyncio.CancelledError:
9 print("long_running was cancelled, cleaning up")
10 # perform cleanup here
11 raise
12
13async def main():
14 task = asyncio.create_task(long_running())
15 await asyncio.sleep(1)
16 task.cancel()
17 try:
18 await task
19 except asyncio.CancelledError:
20 print("task cancelled in main")
21
22if __name__ == "__main__":
23 asyncio.run(main())- Avbrytelse leveres som et unntak (
CancelledError), så gjør nødvendig opprydding iexcept-blokken og kast unntaket på nytt om nødvendig.
Nøkkelpunkter for praktisk design
Følgende er praktiske tips som er nyttige når du skal designe asynkron behandling.
-
Kontroller samtidighet eksplisitt Når det er ressursbegrensninger som API-er eller databaser, kan du begrense samtidige kjøringer med
Semaphore. -
Håndter delte ressurser trygt Hvis flere oppgaver skal oppdatere tilstand, bruk
Lock. Å redusere delt tilstand og designe for uforanderlige data gjør ting sikrere. -
Velg hvordan du vil motta resultater Hvis du vil behandle oppgaver så snart de fullføres, bruk
asyncio.as_completed; hvis du vil behandle resultater i innsendt rekkefølge, brukgather. -
Isoler tunge synkrone prosesser Ved CPU-intensive oppgaver eller synkrone biblioteksfunksjoner, bruk
run_in_executorellerProcessPoolExecutorfor å unngå å blokkere event-loopen. -
Planlegg for avbrytelser og unntak Skriv god unntakshåndtering for trygg opprydning selv om en oppgave avbrytes underveis.
-
Gjør det enkelt å teste Abstraher sideeffekter som I/O, tid og tilfeldighet slik at de kan erstattes, noe som gjør det enklere å teste asynkron kode.
Sammendrag
asyncio er kraftig, men dersom du kun fokuserer på å «kjøre ting parallelt», kan du få problemer som kamp om delte ressurser, overskridelse av ressursgrenser eller blokkering av event-loopen. Ved å kombinere Semaphore, Lock, Event, Queue, run_in_executor og riktig håndtering av avbrytelser, kan du designe sikre og effektive asynkrone applikasjoner. Ved å bruke mekanismer som produsent-konsument-mønster, begrense samtidighet, eller å skille mellom asynkron og blokkerende behandling, kan asynkrone arbeidsflyter bygges tryggere og mer effektivt.
Du kan følge med på artikkelen ovenfor ved å bruke Visual Studio Code på vår YouTube-kanal. Vennligst sjekk ut YouTube-kanalen.