Controllo della sincronizzazione nell'elaborazione asincrona in Python

Controllo della sincronizzazione nell'elaborazione asincrona in Python

Questo articolo spiega il controllo della sincronizzazione nell'elaborazione asincrona in Python.

Imparerai passo dopo passo, dalle basi di asyncio ai modelli pratici comunemente utilizzati per il controllo della sincronizzazione.

YouTube Video

Controllo della sincronizzazione nell'elaborazione asincrona in Python

Nell'elaborazione asincrona è possibile eseguire facilmente più attività contemporaneamente. Tuttavia, nella pratica, sono necessari aggiustamenti più avanzati, come il controllo della concorrenza, il coordinamento dei task, il controllo esclusivo delle risorse condivise, la gestione di processi sincroni pesanti e la pulizia dopo le cancellazioni.

Qui impareremo passo dopo passo dalle basi di asyncio ai modelli pratici comunemente usati per la sincronizzazione.

Introduzione: Basi (async / await e create_task)

Vediamo prima un esempio minimal di codice asincrono. await attende a quel punto fino al completamento della coroutine chiamata, e asyncio.create_task pianifica un task per l'esecuzione concorrente.

 1import asyncio
 2
 3async def worker(name, delay):
 4    # Simulate async work
 5    await asyncio.sleep(delay)
 6    return f"{name} done after {delay}s"
 7
 8async def main():
 9    # Create two tasks that run concurrently.
10    t1 = asyncio.create_task(worker("A", 1))
11    t2 = asyncio.create_task(worker("B", 2))
12
13    # Await both results (this suspends until both are done).
14    result1 = await t1
15    result2 = await t2
16    print(result1, result2)
17
18if __name__ == "__main__":
19    asyncio.run(main())
  • Questo codice rappresenta un modello tipico in cui i task sono creati esplicitamente, eseguiti in parallelo e i risultati vengono ricevuti alla fine con await. create_task consente l'esecuzione concorrente.

Differenze tra asyncio.gather, asyncio.wait e asyncio.as_completed

Quando si eseguono più coroutine contemporaneamente, si sceglie quale usare a seconda di come si desidera recuperare i risultati. gather attende che tutti terminino e restituisce i risultati nell'ordine di input, mentre as_completed consente di elaborare i risultati appena terminano, indipendentemente dall'ordine.

 1import asyncio
 2import random
 3
 4async def job(i):
 5    delay = random.random() * 2
 6    await asyncio.sleep(delay)
 7    return (i, delay)
 8
 9async def gather_example():
10    # gather waits for all tasks and returns results in the same order as input
11    results = await asyncio.gather(*(job(i) for i in range(5)))
12    print("gather order:", results)
13
14async def as_completed_example():
15    # as_completed yields results as they finish (useful to process early results)
16    tasks = [asyncio.create_task(job(i)) for i in range(5)]
17    for coro in asyncio.as_completed(tasks):
18        res = await coro
19        print("completed:", res)
20
21async def main():
22    await gather_example()
23    await as_completed_example()
24
25if __name__ == "__main__":
26    asyncio.run(main())
  • Come mostrato in questo codice, gather restituisce i risultati nell'ordine di input, rendendolo utile quando si desidera preservare l'ordine. as_completed viene utilizzato quando si desidera elaborare i risultati appena terminano.

Controllo della concorrenza: limitare le esecuzioni simultanee con asyncio.Semaphore

Quando ci sono limiti di richiesta API esterne o limiti di connessione al DB, puoi controllare le esecuzioni concorrenti con un Semaphore.

 1import asyncio
 2import random
 3
 4sem = asyncio.Semaphore(3)  # allow up to 3 concurrent workers
 5
 6async def limited_worker(i):
 7    async with sem:
 8        # Only 3 tasks can be inside this block concurrently
 9        delay = random.random() * 2
10        await asyncio.sleep(delay)
11        print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14    tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15    await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18    asyncio.run(main())
  • Utilizzando Semaphore con async with, puoi facilmente limitare il numero di esecuzioni simultanee. Ciò è efficace in situazioni con vincoli esterni.

Controllo esclusivo delle risorse condivise: asyncio.Lock

Lock viene utilizzato per prevenire aggiornamenti simultanei su dati condivisi. asyncio.Lock è una primitiva esclusiva per l'uso asincrono.

 1import asyncio
 2
 3counter = 0
 4lock = asyncio.Lock()
 5
 6async def incrementer(n_times):
 7    global counter
 8    for _ in range(n_times):
 9        # Acquire lock to update shared counter safely
10        async with lock:
11            tmp = counter
12            await asyncio.sleep(0)  # yield control to increase race likelihood
13            counter = tmp + 1
14
15async def main():
16    tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17    await asyncio.gather(*tasks)
18    print("final counter:", counter)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Se più task aggiornano una variabile condivisa come un counter globale, possono verificarsi conflitti. Racchiudendo le operazioni in un Lock, puoi mantenere la coerenza.

Coordinamento dei task: asyncio.Event

Event viene utilizzato quando un task segnala che è pronto e altri task attendono questo segnale. Questo è un modo semplice per i task di condividere segnali e sincronizzarsi tra loro.

 1import asyncio
 2
 3event = asyncio.Event()
 4
 5async def waiter(name):
 6    print(f"{name} is waiting for event")
 7    await event.wait()
 8    print(f"{name} resumed after event set")
 9
10async def setter():
11    print("setter will sleep and then set the event")
12    await asyncio.sleep(1)
13    event.set()
14    print("event set by setter")
15
16async def main():
17    tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18    await asyncio.create_task(setter())
19    await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22    asyncio.run(main())
  • Event ha un flag booleano e chiamando set() si risvegliano tutti i task in attesa. È utile per una sincronizzazione semplice.

Modello produttore-consumatore: asyncio.Queue

Utilizzando Queue, i produttori (che creano dati) e i consumatori (che elaborano dati) possono coordinarsi in modo fluido e asincrono. Inoltre, quando la coda è piena, i produttori attendono automaticamente, implementando una contropressione per prevenire la sovrapproduzione.

 1import asyncio
 2import random
 3
 4async def producer(q, n_items):
 5    for i in range(n_items):
 6        await asyncio.sleep(random.random() * 0.5)
 7        item = f"item-{i}"
 8        await q.put(item)
 9        print("produced", item)
10    # signal consumers to stop
11    await q.put(None)
12
13async def consumer(q, name):
14    while True:
15        item = await q.get()
16        if item is None:
17            # put sentinel back for other consumers and break
18            await q.put(None)
19            break
20        await asyncio.sleep(random.random() * 1)
21        print(name, "consumed", item)
22        q.task_done()
23
24async def main():
25    q = asyncio.Queue(maxsize=5)  # bounded queue to apply backpressure
26    prod = asyncio.create_task(producer(q, 10))
27    cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28    await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31    asyncio.run(main())
  • Queue aiuta a coordinare produttori e consumatori in modo asincrono. Inoltre, impostando maxsize, il produttore attenderà su put quando la coda è piena, prevenendo la sovrapproduzione.

Gestione delle operazioni sincrone bloccanti: run_in_executor

Per elaborazioni intensive di CPU o quando si utilizzano librerie che non supportano l'asincronia, usa run_in_executor per delegare l'elaborazione a un altro thread o processo. Facendo ciò si evita che il loop degli eventi principale si blocchi, consentendo agli altri task asincroni di funzionare senza problemi.

 1import asyncio
 2import time
 3
 4def blocking_io(x):
 5    # simulate blocking I/O or CPU-bound work
 6    time.sleep(2)
 7    return x * x
 8
 9async def main():
10    loop = asyncio.get_running_loop()
11    # run blocking_io in default thread pool
12    result = await loop.run_in_executor(None, blocking_io, 3)
13    print("blocking result:", result)
14
15if __name__ == "__main__":
16    asyncio.run(main())
  • Chiamare direttamente funzioni sincrone bloccherà il loop degli eventi. Con run_in_executor, il codice viene eseguito in un thread separato e i task asincroni possono continuare a progredire contemporaneamente.

Esempio: chiamate API con limitazione di frequenza (combinazione di Semaphore + run_in_executor)

Di seguito è riportato uno scenario di esempio in cui le chiamate API sono limitate e viene eseguita un'elaborazione pesante sui risultati. Combinando Semaphore e run_in_executor, l'elaborazione può procedere in modo sicuro ed efficiente.

 1import asyncio
 2import time
 3import random
 4
 5sem = asyncio.Semaphore(5)
 6
 7def heavy_sync_processing(data):
 8    # simulate heavy CPU-bound work
 9    time.sleep(1)
10    return f"processed-{data}"
11
12async def api_call(i):
13    await asyncio.sleep(random.random() * 0.5)  # simulate network latency
14    return f"data-{i}"
15
16async def worker(i):
17    async with sem:
18        data = await api_call(i)
19        # offload CPU-bound work to threadpool
20        loop = asyncio.get_running_loop()
21        result = await loop.run_in_executor(None, heavy_sync_processing, data)
22        print(result)
23
24async def main():
25    tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26    await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29    asyncio.run(main())
  • Usiamo un Semaphore per limitare il numero di chiamate API concorrenti e l'elaborazione pesante dei dati risultanti viene delegata a un thread pool. Separare l'elaborazione di rete e quella della CPU migliora l'efficienza.

Cancellazione dei task e pulizia

Quando un task viene annullato, è molto importante gestire correttamente finally e asyncio.CancelledError. Questo garantisce che file e connessioni vengano rilasciati e che gli stati intermedi siano gestiti correttamente, mantenendo la coerenza dell'applicazione.

 1import asyncio
 2
 3async def long_running():
 4    try:
 5        print("started long_running")
 6        await asyncio.sleep(10)
 7        print("finished long_running")
 8    except asyncio.CancelledError:
 9        print("long_running was cancelled, cleaning up")
10        # perform cleanup here
11        raise
12
13async def main():
14    task = asyncio.create_task(long_running())
15    await asyncio.sleep(1)
16    task.cancel()
17    try:
18        await task
19    except asyncio.CancelledError:
20        print("task cancelled in main")
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • La cancellazione viene notificata come eccezione (CancelledError), quindi eseguire la pulizia necessaria nel blocco except e rilanciare l'eccezione se necessario.

Punti chiave per una progettazione pratica

Di seguito sono riportati punti pratici utili per la progettazione dell'elaborazione asincrona.

  • Controlla la concorrenza in modo esplicito Quando vi sono limiti alle risorse come API o DB, puoi limitare il numero di esecuzioni concorrenti con Semaphore.

  • Gestisci in modo sicuro le risorse condivise Se devi aggiornare lo stato da più task, usa Lock. Ridurre lo stato condiviso e progettare attorno a dati immutabili rende il sistema più sicuro.

  • Scegli come ricevere i risultati Se vuoi elaborare i task non appena terminano, usa asyncio.as_completed; se vuoi processare i risultati nell'ordine di input, usa gather.

  • Isola l'elaborazione sincrona pesante Per chiamate a librerie sincrone o operazioni pesanti sulla CPU, usa run_in_executor o ProcessPoolExecutor per evitare il blocco del loop degli eventi.

  • Prevedi cancellazioni ed eccezioni Scrivi una corretta gestione delle eccezioni per liberare le risorse in sicurezza anche se un task viene annullato a metà.

  • Rendi facile il testing Astrea effetti collaterali come I/O, tempo e casualità per poterli sostituire, rendendo più semplice il test del codice asincrono.

Riepilogo

asyncio è potente, ma se ti concentri solo sull'esecuzione in parallelo, potresti incontrare problemi come contese sulle risorse condivise, violazione dei limiti delle risorse o blocco del loop degli eventi. Combinando Semaphore, Lock, Event, Queue, run_in_executor e una corretta gestione delle cancellazioni, puoi progettare applicazioni asincrone sicure ed efficienti. Utilizzando meccanismi come il modello produttore-consumatore, la limitazione della concorrenza o la separazione dell'elaborazione asincrona da quella bloccante, i flussi di lavoro asincroni possono essere costruiti in modo più sicuro ed efficiente.

Puoi seguire l'articolo sopra utilizzando Visual Studio Code sul nostro canale YouTube. Controlla anche il nostro canale YouTube.

YouTube Video