Synkroniseringskontrol i asynkron Python-behandling

Synkroniseringskontrol i asynkron Python-behandling

Denne artikel forklarer synkroniseringskontrol i asynkron Python-behandling.

Du vil lære trin for trin, fra grundlæggende om asyncio til praktiske mønstre, der almindeligvis bruges til synkroniseringskontrol.

YouTube Video

Synkroniseringskontrol i asynkron Python-behandling

I asynkron behandling kan du nemt køre flere opgaver samtidig. Men i praksis kræves mere avancerede justeringer, såsom styring af samtidighed, koordinering af opgaver, eksklusiv kontrol af delte ressourcer, håndtering af tunge synkrone processer og oprydning efter annulleringer.

Her vil vi trin for trin lære fra grundlæggende asyncio til praktiske mønstre, der ofte bruges til synkronisering.

Introduktion: Grundlæggende (async / await og create_task)

Lad os først se på noget minimalt asynkront kode. await venter på dette tidspunkt, indtil den kaldte koroutine afslutter, og asyncio.create_task planlægger en opgave til samtidig udførelse.

 1import asyncio
 2
 3async def worker(name, delay):
 4    # Simulate async work
 5    await asyncio.sleep(delay)
 6    return f"{name} done after {delay}s"
 7
 8async def main():
 9    # Create two tasks that run concurrently.
10    t1 = asyncio.create_task(worker("A", 1))
11    t2 = asyncio.create_task(worker("B", 2))
12
13    # Await both results (this suspends until both are done).
14    result1 = await t1
15    result2 = await t2
16    print(result1, result2)
17
18if __name__ == "__main__":
19    asyncio.run(main())
  • Denne kode er et typisk mønster, hvor opgaver eksplicit oprettes, køres parallelt, og resultater modtages til sidst med await. create_task muliggør samtidig udførelse.

Forskelle mellem asyncio.gather, asyncio.wait og asyncio.as_completed

Når du kører flere koroutiner samtidig, vælger du, hvilken du vil bruge, afhængigt af hvordan du vil hente resultaterne. gather venter på, at alle er færdige og returnerer resultaterne i den rækkefølge, de blev givet, mens as_completed tillader at behandle resultaterne, så snart de er færdige, uanset rækkefølge.

 1import asyncio
 2import random
 3
 4async def job(i):
 5    delay = random.random() * 2
 6    await asyncio.sleep(delay)
 7    return (i, delay)
 8
 9async def gather_example():
10    # gather waits for all tasks and returns results in the same order as input
11    results = await asyncio.gather(*(job(i) for i in range(5)))
12    print("gather order:", results)
13
14async def as_completed_example():
15    # as_completed yields results as they finish (useful to process early results)
16    tasks = [asyncio.create_task(job(i)) for i in range(5)]
17    for coro in asyncio.as_completed(tasks):
18        res = await coro
19        print("completed:", res)
20
21async def main():
22    await gather_example()
23    await as_completed_example()
24
25if __name__ == "__main__":
26    asyncio.run(main())
  • Som vist i denne kode returnerer gather resultaterne i inddata-rækkefølge, hvilket er nyttigt, når du vil bevare rækkefølgen. as_completed bruges, når du vil behandle resultater, så snart de er færdige.

Styring af samtidighed: Begrænsning af samtidige udførelser med asyncio.Semaphore

Når der er eksterne API-ratebegrænsninger eller databaseforbindelsesgrænser, kan du styre samtidige udførelser med en Semaphore.

 1import asyncio
 2import random
 3
 4sem = asyncio.Semaphore(3)  # allow up to 3 concurrent workers
 5
 6async def limited_worker(i):
 7    async with sem:
 8        # Only 3 tasks can be inside this block concurrently
 9        delay = random.random() * 2
10        await asyncio.sleep(delay)
11        print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14    tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15    await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18    asyncio.run(main())
  • Ved at bruge Semaphore med async with kan du nemt begrænse antallet af samtidige udførelser. Dette er effektivt i situationer med eksterne begrænsninger.

Eksklusiv kontrol af delte ressourcer: asyncio.Lock

Lock bruges til at forhindre samtidige opdateringer af delte data. asyncio.Lock er en eksklusiv primitive til asynkron brug.

 1import asyncio
 2
 3counter = 0
 4lock = asyncio.Lock()
 5
 6async def incrementer(n_times):
 7    global counter
 8    for _ in range(n_times):
 9        # Acquire lock to update shared counter safely
10        async with lock:
11            tmp = counter
12            await asyncio.sleep(0)  # yield control to increase race likelihood
13            counter = tmp + 1
14
15async def main():
16    tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17    await asyncio.gather(*tasks)
18    print("final counter:", counter)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Hvis flere opgaver opdaterer en delt variabel såsom en global counter, kan der opstå konflikter. Ved at omslutte handlinger med en Lock kan du bevare konsistens.

Opgavekoordinering: asyncio.Event

Event bruges, når én opgave signalerer, at den er klar, og andre opgaver venter på dette signal. Dette er en enkel måde for opgaver at dele signaler og synkronisere med hinanden.

 1import asyncio
 2
 3event = asyncio.Event()
 4
 5async def waiter(name):
 6    print(f"{name} is waiting for event")
 7    await event.wait()
 8    print(f"{name} resumed after event set")
 9
10async def setter():
11    print("setter will sleep and then set the event")
12    await asyncio.sleep(1)
13    event.set()
14    print("event set by setter")
15
16async def main():
17    tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18    await asyncio.create_task(setter())
19    await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22    asyncio.run(main())
  • Event har et boolean-flag, og ved at kalde set() genoptages alle ventende opgaver. Det er nyttigt til enkel synkronisering.

Producer-forbruger-mønster: asyncio.Queue

Ved at bruge Queue kan producenter (der opretter data) og forbrugere (der behandler data) koordinere glat og asynkront. Når køen også er fuld, venter producenterne automatisk, hvilket naturligt indfører backpressure for at forhindre overproduktion.

 1import asyncio
 2import random
 3
 4async def producer(q, n_items):
 5    for i in range(n_items):
 6        await asyncio.sleep(random.random() * 0.5)
 7        item = f"item-{i}"
 8        await q.put(item)
 9        print("produced", item)
10    # signal consumers to stop
11    await q.put(None)
12
13async def consumer(q, name):
14    while True:
15        item = await q.get()
16        if item is None:
17            # put sentinel back for other consumers and break
18            await q.put(None)
19            break
20        await asyncio.sleep(random.random() * 1)
21        print(name, "consumed", item)
22        q.task_done()
23
24async def main():
25    q = asyncio.Queue(maxsize=5)  # bounded queue to apply backpressure
26    prod = asyncio.create_task(producer(q, 10))
27    cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28    await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31    asyncio.run(main())
  • Queue hjælper med at koordinere producenter og forbrugere asynkront. Derudover får indstilling af maxsize producenten til at vente på put, når køen er fuld, hvilket forhindrer overproduktion.

Håndtering af synkrone blokerende operationer: run_in_executor

For CPU-intensive behandlinger eller når du bruger biblioteker, der ikke understøtter async, skal du bruge run_in_executor til at delegere behandlingen til en anden tråd eller proces. Dette forhindrer den primære event-loop i at stoppe, så andre asynkrone opgaver kan køre problemfrit.

 1import asyncio
 2import time
 3
 4def blocking_io(x):
 5    # simulate blocking I/O or CPU-bound work
 6    time.sleep(2)
 7    return x * x
 8
 9async def main():
10    loop = asyncio.get_running_loop()
11    # run blocking_io in default thread pool
12    result = await loop.run_in_executor(None, blocking_io, 3)
13    print("blocking result:", result)
14
15if __name__ == "__main__":
16    asyncio.run(main())
  • Direkte kald af synkrone funktioner vil blokere event-loop'en. Med run_in_executor kører koden i en separat tråd, og asynkrone opgaver kan fortsætte samtidig.

Eksempel: API-kald med ratebegrænsning (kombination af Semaphore + run_in_executor)

Følgende er et eksempelscenarie, hvor API-kald er ratebegrænsede, og der udføres tung behandling på resultaterne. Kombinationen af Semaphore og run_in_executor gør det muligt at behandle sikkert og effektivt.

 1import asyncio
 2import time
 3import random
 4
 5sem = asyncio.Semaphore(5)
 6
 7def heavy_sync_processing(data):
 8    # simulate heavy CPU-bound work
 9    time.sleep(1)
10    return f"processed-{data}"
11
12async def api_call(i):
13    await asyncio.sleep(random.random() * 0.5)  # simulate network latency
14    return f"data-{i}"
15
16async def worker(i):
17    async with sem:
18        data = await api_call(i)
19        # offload CPU-bound work to threadpool
20        loop = asyncio.get_running_loop()
21        result = await loop.run_in_executor(None, heavy_sync_processing, data)
22        print(result)
23
24async def main():
25    tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26    await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29    asyncio.run(main())
  • Vi bruger en Semaphore til at begrænse antallet af samtidige API-kald, og tung behandling af de resulterende data videregives til en trådpulje. Adskillelse af netværks- og CPU-behandling øger effektiviteten.

Opgaveannullering og oprydning

Når en opgave annulleres, er korrekt håndtering af finally og asyncio.CancelledError meget vigtig. Dette sikrer, at filer og forbindelser frigives, og at mellemliggende tilstande håndteres korrekt, hvilket opretholder konsistensen i applikationen.

 1import asyncio
 2
 3async def long_running():
 4    try:
 5        print("started long_running")
 6        await asyncio.sleep(10)
 7        print("finished long_running")
 8    except asyncio.CancelledError:
 9        print("long_running was cancelled, cleaning up")
10        # perform cleanup here
11        raise
12
13async def main():
14    task = asyncio.create_task(long_running())
15    await asyncio.sleep(1)
16    task.cancel()
17    try:
18        await task
19    except asyncio.CancelledError:
20        print("task cancelled in main")
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • Annullering leveres som en undtagelse (CancelledError), så udfør den nødvendige oprydning i except-blokken og genudløs undtagelsen, hvis det er nødvendigt.

Vigtige punkter for praktisk design

Følgende er praktiske punkter, der er nyttige til at designe asynkron behandling.

  • Styr samtidighed eksplicit Når der er ressourceloft såsom API'er eller databaser, kan du begrænse antallet af samtidige udførelser med Semaphore.

  • Håndter delte ressourcer sikkert Hvis du har brug for at opdatere tilstanden fra flere opgaver, skal du bruge Lock. Reduktion af delt tilstand og design omkring uforanderlige data gør det sikrere.

  • Vælg hvordan du vil modtage resultater Hvis du vil bearbejde opgaver, så snart de fuldføres, brug asyncio.as_completed; hvis du vil behandle resultaterne i inddata-rækkefølge, brug gather.

  • Isolér tung synkron behandling Til CPU-tunge eller synkrone biblioteksopkald, brug run_in_executor eller ProcessPoolExecutor for at undgå blokering af event-loop'en.

  • Planlæg for annullering og undtagelser Skriv korrekt undtagelseshåndtering for sikkert at rydde op i ressourcer, selv hvis en opgave annulleres undervejs.

  • Gør testning nem Abstraher sideeffekter som I/O, tid og tilfældighed, så de kan erstattes og det bliver lettere at teste asynkron kode.

Sammendrag

asyncio er kraftfuldt, men hvis du kun fokuserer på at "køre ting parallelt", kan du støde på problemer såsom kamp om delte ressourcer, overskridelse af ressourceloft eller blokering af event-loop'en. Ved at kombinere Semaphore, Lock, Event, Queue, run_in_executor og korrekt annulleringshåndtering kan du designe sikre og effektive asynkrone applikationer. Ved at udnytte mekanismer som producer-forbruger-mønsteret, samtidighedsbegrænsning eller adskillelse af asynkron og blokerende behandling kan asynkrone arbejdsgange konstrueres mere sikkert og effektivt.

Du kan følge med i ovenstående artikel ved hjælp af Visual Studio Code på vores YouTube-kanal. Husk også at tjekke YouTube-kanalen.

YouTube Video