Synkroniseringskontroll i asynkron behandling med Python

Synkroniseringskontroll i asynkron behandling med Python

Denne artikkelen forklarer synkroniseringskontroll i asynkron behandling med Python.

Du vil lære steg for steg, fra det grunnleggende i asyncio til praktiske mønstre som ofte brukes for synkroniseringskontroll.

YouTube Video

Synkroniseringskontroll i asynkron behandling med Python

I asynkron behandling kan man enkelt kjøre flere oppgaver samtidig. Men i praksis kreves det mer avanserte justeringer, som å kontrollere samtidighet, koordinere oppgaver, eksklusiv kontroll av delte ressurser, håndtere tunge synkrone prosesser og opprydning etter avbrudd.

Her skal vi lære steg for steg fra grunnleggende bruk av asyncio til praktiske mønstre som ofte brukes for synkronisering.

Introduksjon: Grunnleggende (async / await og create_task)

La oss først se på noe helt enkelt asynkront kodeeksempel. await venter på det tidspunktet til den kalte koroutinen er ferdig, og asyncio.create_task oppretter en oppgave for samtidig kjøring.

 1import asyncio
 2
 3async def worker(name, delay):
 4    # Simulate async work
 5    await asyncio.sleep(delay)
 6    return f"{name} done after {delay}s"
 7
 8async def main():
 9    # Create two tasks that run concurrently.
10    t1 = asyncio.create_task(worker("A", 1))
11    t2 = asyncio.create_task(worker("B", 2))
12
13    # Await both results (this suspends until both are done).
14    result1 = await t1
15    result2 = await t2
16    print(result1, result2)
17
18if __name__ == "__main__":
19    asyncio.run(main())
  • Denne koden er et typisk mønster hvor oppgaver blir eksplisitt opprettet, kjørt parallelt, og resultatene hentes til slutt med await. create_task muliggjør samtidig utførelse.

Forskjellen mellom asyncio.gather, asyncio.wait og asyncio.as_completed

Når du kjører flere korutiner parallelt, velger du hvilken metode du vil bruke avhengig av hvordan du ønsker å hente resultatene. gather venter til alle er ferdige og returnerer resultatene i den rekkefølgen de ble sendt inn, mens as_completed lar deg behandle resultatene etter hvert som de blir ferdige, uansett rekkefølge.

 1import asyncio
 2import random
 3
 4async def job(i):
 5    delay = random.random() * 2
 6    await asyncio.sleep(delay)
 7    return (i, delay)
 8
 9async def gather_example():
10    # gather waits for all tasks and returns results in the same order as input
11    results = await asyncio.gather(*(job(i) for i in range(5)))
12    print("gather order:", results)
13
14async def as_completed_example():
15    # as_completed yields results as they finish (useful to process early results)
16    tasks = [asyncio.create_task(job(i)) for i in range(5)]
17    for coro in asyncio.as_completed(tasks):
18        res = await coro
19        print("completed:", res)
20
21async def main():
22    await gather_example()
23    await as_completed_example()
24
25if __name__ == "__main__":
26    asyncio.run(main())
  • Som vist i denne koden, returnerer gather resultatene i innsendt rekkefølge, noe som er nyttig hvis du vil bevare rekkefølgen. as_completed brukes når du vil behandle resultatene så snart de er ferdige.

Kontrollere samtidighet: Begrens samtidig utførelse med asyncio.Semaphore

Når det er f.eks. begrensninger i eksterne API-er eller databasen, kan du kontrollere samtidige kjøringer med en Semaphore.

 1import asyncio
 2import random
 3
 4sem = asyncio.Semaphore(3)  # allow up to 3 concurrent workers
 5
 6async def limited_worker(i):
 7    async with sem:
 8        # Only 3 tasks can be inside this block concurrently
 9        delay = random.random() * 2
10        await asyncio.sleep(delay)
11        print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14    tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15    await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18    asyncio.run(main())
  • Ved å bruke Semaphore sammen med async with, kan du enkelt begrense antall samtidige kjøringer. Dette er effektivt i situasjoner med eksterne begrensninger.

Eksklusiv kontroll av delte ressurser: asyncio.Lock

Lock brukes for å hindre samtidige oppdateringer av delte data. asyncio.Lock er en eksklusiv primitiv spesielt for asynkron bruk.

 1import asyncio
 2
 3counter = 0
 4lock = asyncio.Lock()
 5
 6async def incrementer(n_times):
 7    global counter
 8    for _ in range(n_times):
 9        # Acquire lock to update shared counter safely
10        async with lock:
11            tmp = counter
12            await asyncio.sleep(0)  # yield control to increase race likelihood
13            counter = tmp + 1
14
15async def main():
16    tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17    await asyncio.gather(*tasks)
18    print("final counter:", counter)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Hvis flere oppgaver oppdaterer en delt variabel, som for eksempel en global counter, kan det oppstå konflikter. Ved å bruke en Lock rundt operasjonene, kan du opprettholde konsistens.

Oppgavekoordinering: asyncio.Event

Event brukes når en oppgave signaliserer at den er klar, og andre venter på dette signalet. Dette er en enkel måte for oppgaver å dele signaler og synkronisere med hverandre.

 1import asyncio
 2
 3event = asyncio.Event()
 4
 5async def waiter(name):
 6    print(f"{name} is waiting for event")
 7    await event.wait()
 8    print(f"{name} resumed after event set")
 9
10async def setter():
11    print("setter will sleep and then set the event")
12    await asyncio.sleep(1)
13    event.set()
14    print("event set by setter")
15
16async def main():
17    tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18    await asyncio.create_task(setter())
19    await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22    asyncio.run(main())
  • Event har en boolsk flagg, og hvis du kaller set(), gjenopptas alle ventende oppgaver. Den er nyttig for enkel synkronisering.

Produsent-konsument-mønster: asyncio.Queue

Ved å bruke Queue kan produsenter (som lager data) og konsumenter (som prosesserer data) koordinere smidig og asynkront. Når køen er full, vil produsentene automatisk vente, noe som gir en naturlig backpressure for å unngå overproduksjon.

 1import asyncio
 2import random
 3
 4async def producer(q, n_items):
 5    for i in range(n_items):
 6        await asyncio.sleep(random.random() * 0.5)
 7        item = f"item-{i}"
 8        await q.put(item)
 9        print("produced", item)
10    # signal consumers to stop
11    await q.put(None)
12
13async def consumer(q, name):
14    while True:
15        item = await q.get()
16        if item is None:
17            # put sentinel back for other consumers and break
18            await q.put(None)
19            break
20        await asyncio.sleep(random.random() * 1)
21        print(name, "consumed", item)
22        q.task_done()
23
24async def main():
25    q = asyncio.Queue(maxsize=5)  # bounded queue to apply backpressure
26    prod = asyncio.create_task(producer(q, 10))
27    cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28    await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31    asyncio.run(main())
  • Queue hjelper produsenter og konsumenter å koordinere asynkront. I tillegg, hvis du setter maxsize, vil produsenten vente på put når køen er full, og dermed forhindre overproduksjon.

Håndtering av synkrone blokkerende operasjoner: run_in_executor

For CPU-intensive oppgaver eller bruk av biblioteker som ikke støtter async, bruk run_in_executor for å delegere behandlingen til en annen tråd eller prosess. Dette hindrer at hovedløkken stopper, slik at andre asynkrone oppgaver kan kjøre uforstyrret.

 1import asyncio
 2import time
 3
 4def blocking_io(x):
 5    # simulate blocking I/O or CPU-bound work
 6    time.sleep(2)
 7    return x * x
 8
 9async def main():
10    loop = asyncio.get_running_loop()
11    # run blocking_io in default thread pool
12    result = await loop.run_in_executor(None, blocking_io, 3)
13    print("blocking result:", result)
14
15if __name__ == "__main__":
16    asyncio.run(main())
  • Hvis du kaller synkrone funksjoner direkte, vil du blokkere event-loopen. Med run_in_executor kjører koden i en separat tråd, og asynkrone oppgaver kan fortsette parallelt.

Eksempel: API-kall med hastighetsbegrensning (kombinere Semaphore + run_in_executor)

Dette er et scenario der API-kall er ratebegrenset og det utføres tung prosessering på resultatene. Kombinering av Semaphore og run_in_executor gjør at behandlingen går trygt og effektivt.

 1import asyncio
 2import time
 3import random
 4
 5sem = asyncio.Semaphore(5)
 6
 7def heavy_sync_processing(data):
 8    # simulate heavy CPU-bound work
 9    time.sleep(1)
10    return f"processed-{data}"
11
12async def api_call(i):
13    await asyncio.sleep(random.random() * 0.5)  # simulate network latency
14    return f"data-{i}"
15
16async def worker(i):
17    async with sem:
18        data = await api_call(i)
19        # offload CPU-bound work to threadpool
20        loop = asyncio.get_running_loop()
21        result = await loop.run_in_executor(None, heavy_sync_processing, data)
22        print(result)
23
24async def main():
25    tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26    await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29    asyncio.run(main())
  • Vi bruker en Semaphore for å begrense antall samtidige API-kall, og tung prosessering på resultatene delegere vi til en trådpøl. Å skille mellom nettverks- og CPU-behandling øker effektiviteten.

Avbrytelse av oppgaver og opprydding

Når en oppgave avbrytes, er det veldig viktig å håndtere finally og asyncio.CancelledError riktig. Dette sikrer at filer og tilkoblinger frigjøres og mellomtilstander håndteres riktig, slik at applikasjonen forblir konsistent.

 1import asyncio
 2
 3async def long_running():
 4    try:
 5        print("started long_running")
 6        await asyncio.sleep(10)
 7        print("finished long_running")
 8    except asyncio.CancelledError:
 9        print("long_running was cancelled, cleaning up")
10        # perform cleanup here
11        raise
12
13async def main():
14    task = asyncio.create_task(long_running())
15    await asyncio.sleep(1)
16    task.cancel()
17    try:
18        await task
19    except asyncio.CancelledError:
20        print("task cancelled in main")
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • Avbrytelse leveres som et unntak (CancelledError), så gjør nødvendig opprydding i except-blokken og kast unntaket på nytt om nødvendig.

Nøkkelpunkter for praktisk design

Følgende er praktiske tips som er nyttige når du skal designe asynkron behandling.

  • Kontroller samtidighet eksplisitt Når det er ressursbegrensninger som API-er eller databaser, kan du begrense samtidige kjøringer med Semaphore.

  • Håndter delte ressurser trygt Hvis flere oppgaver skal oppdatere tilstand, bruk Lock. Å redusere delt tilstand og designe for uforanderlige data gjør ting sikrere.

  • Velg hvordan du vil motta resultater Hvis du vil behandle oppgaver så snart de fullføres, bruk asyncio.as_completed; hvis du vil behandle resultater i innsendt rekkefølge, bruk gather.

  • Isoler tunge synkrone prosesser Ved CPU-intensive oppgaver eller synkrone biblioteksfunksjoner, bruk run_in_executor eller ProcessPoolExecutor for å unngå å blokkere event-loopen.

  • Planlegg for avbrytelser og unntak Skriv god unntakshåndtering for trygg opprydning selv om en oppgave avbrytes underveis.

  • Gjør det enkelt å teste Abstraher sideeffekter som I/O, tid og tilfeldighet slik at de kan erstattes, noe som gjør det enklere å teste asynkron kode.

Sammendrag

asyncio er kraftig, men dersom du kun fokuserer på å «kjøre ting parallelt», kan du få problemer som kamp om delte ressurser, overskridelse av ressursgrenser eller blokkering av event-loopen. Ved å kombinere Semaphore, Lock, Event, Queue, run_in_executor og riktig håndtering av avbrytelser, kan du designe sikre og effektive asynkrone applikasjoner. Ved å bruke mekanismer som produsent-konsument-mønster, begrense samtidighet, eller å skille mellom asynkron og blokkerende behandling, kan asynkrone arbeidsflyter bygges tryggere og mer effektivt.

Du kan følge med på artikkelen ovenfor ved å bruke Visual Studio Code på vår YouTube-kanal. Vennligst sjekk ut YouTube-kanalen.

YouTube Video