Input/Output Asincrono
Questo articolo spiega l'input/output asincrono.
Questa guida spiega con calma, passo dopo passo, i concetti e i modelli di input/output asincrono praticamente utili in Python.
YouTube Video
Input/Output (I/O) Asincrono
Concetto di I/O Asincrono
L'I/O asincrono è un meccanismo che permette di eseguire altre operazioni in parallelo mentre si attendono I/O che richiedono tempo, come operazioni su file o comunicazioni di rete. In Python, asyncio è fornito come framework asincrono standard e molte librerie sono progettate per seguire questo meccanismo.
Basi: async / await e l'Event Loop
Per prima cosa, ecco come scrivere coroutine di base e un esempio di esecuzione simultanea di più coroutine usando asyncio.gather.
Il codice seguente è un esempio minimale di definizione ed esecuzione concorrente di funzioni asincrone. La funzione sleep viene utilizzata per dimostrare l'esecuzione in parallelo.
1import asyncio
2
3async def worker(name: str, delay: float):
4 # Simulate I/O-bound work by sleeping
5 print(f"{name} started")
6 await asyncio.sleep(delay)
7 print(f"{name} finished after {delay}s")
8 return name, delay
9
10async def main():
11 # Run multiple coroutines concurrently
12 tasks = [
13 worker("A", 1.5),
14 worker("B", 1.0),
15 worker("C", 0.5),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Questo codice avvia l'event loop usando
asyncio.run()ed esegue tre coroutine in parallelo.
async with e Gestori di Contesto Asincroni
Nel processamento asincrono, la gestione delle risorse come l'apertura di connessioni e la chiusura dei file può diventare facilmente complessa. Qui diventano utili i gestori di contesto asincroni usando async with. Questa sintassi viene utilizzata come il comando with sincrono, ma l'elaborazione interna è asincrona e si integra naturalmente nel flusso async/await.
Ci sono due principali motivi per utilizzare async with:.
- Per liberare in modo affidabile risorse come connessioni, handle di file o sessioni. Si può essere certi che le risorse vengano rilasciate correttamente anche in caso di terminazioni anomale.
- Per automatizzare le operazioni di inizializzazione e di pulizia, come l'apertura/chiusura di connessioni o lo svuotamento (flushing), in modo asincrono. Questo evita la necessità di gestire manualmente queste parti e rende il codice più leggibile.
Di seguito un esempio di creazione di un semplice gestore di contesto asincrono da zero.
1import asyncio
2
3class AsyncResource:
4 async def __aenter__(self):
5 print("Opening resource...")
6 await asyncio.sleep(0.5)
7 print("Resource opened")
8 return self
9
10 async def __aexit__(self, exc_type, exc, tb):
11 print("Closing resource...")
12 await asyncio.sleep(0.5)
13 print("Resource closed")
14
15async def main():
16 async with AsyncResource() as r:
17 print("Using resource...")
18
19if __name__ == "__main__":
20 asyncio.run(main())- Definendo
__aenter__e__aexit__, si può utilizzareasync with. - L'elaborazione all'entrata e uscita del blocco
async withviene eseguita in modo asincrono e sicuro.
I/O File Asincrono (aiofiles)
Le operazioni sui file sono un classico esempio di operazione bloccante. Utilizzando aiofiles, è possibile gestire le operazioni sui file in modo sicuro ed asincrono. Internamente utilizza un pool di thread e garantisce che i file vengano chiusi correttamente tramite async with.
L'esempio seguente mostra la lettura asincrona e parallela di più file. È necessario installare aiofiles con pip install aiofiles prima di eseguire questo codice.
1# pip install aiofiles
2import asyncio
3import aiofiles
4from pathlib import Path
5
6async def read_file(path: Path):
7 # Read file content asynchronously
8 async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
9 contents = await f.read()
10 return path.name, len(contents)
11
12async def main():
13 files = [
14 Path("example1.txt"),
15 Path("example2.txt"),
16 Path("example3.txt")
17 ]
18 tasks = [read_file(p) for p in files]
19 results = await asyncio.gather(*tasks)
20 for name, size in results:
21 print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24 asyncio.run(main())- Questo codice parallelizza la lettura di ogni file.
aiofilesspesso utilizza un pool di thread internamente, permettendo di gestire I/O file bloccanti attraverso un'interfaccia asincrona.
Client HTTP Asincrono (aiohttp)
Come esempio classico di I/O di rete, ecco come effettuare richieste HTTP in modo asincrono. È particolarmente potente quando occorre effettuare un gran numero di richieste HTTP in parallelo.
Di seguito un esempio di recupero di molteplici URL in parallelo usando aiohttp. Sarà necessario installare aiohttp con pip install aiohttp.
1# pip install aiohttp
2import asyncio
3import aiohttp
4
5async def fetch(session: aiohttp.ClientSession, url: str):
6 # Fetch a URL asynchronously and return status and size
7 async with session.get(url) as resp:
8 text = await resp.text()
9 return url, resp.status, len(text)
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [fetch(session, u) for u in urls]
14 for coro in asyncio.as_completed(tasks):
15 url, status, size = await coro
16 print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19 urls = [
20 "https://codesparklab.com/json/example1.json",
21 "https://codesparklab.com/json/example2.json",
22 "https://codesparklab.com/json/example3.json",
23 ]
24 asyncio.run(main(urls))- Utilizzando
asyncio.as_completed, è possibile elaborare i risultati nell'ordine in cui i task vengono completati. Questo è utile per gestire efficacemente molte richieste.
Coesistenza con I/O bloccante: run_in_executor
Quando si gestiscono attività che richiedono molta CPU o API bloccanti esistenti nel codice asincrono, utilizzare ThreadPoolExecutor o ProcessPoolExecutor tramite loop.run_in_executor.
Il codice seguente è un esempio di esecuzione simultanea di attività che presumono I/O bloccante utilizzando un thread pool.
1import asyncio
2from concurrent.futures import ThreadPoolExecutor
3import time
4
5def blocking_io(n):
6 # Simulate a blocking I/O or CPU-bound function
7 time.sleep(n)
8 return f"slept {n}s"
9
10async def main():
11 loop = asyncio.get_running_loop()
12 with ThreadPoolExecutor() as pool:
13 tasks = [
14 loop.run_in_executor(pool, blocking_io, 1),
15 loop.run_in_executor(pool, blocking_io, 2),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Blocking results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Utilizzando
run_in_executor, è possibile integrare il codice sincrono esistente nei flussi asincroni senza riscritture significative. Tuttavia, occorre prestare attenzione al numero di thread e al carico della CPU. ProcessPoolExecutorè adatto per attività che richiedono molta CPU.
Server Asincrono: TCP Echo Server basato su asyncio
Se si desidera gestire direttamente i socket, si può facilmente costruire un server asincrono usando asyncio.start_server.
L'esempio seguente è un semplice echo server che restituisce i dati così come ricevuti dal client.
1import asyncio
2
3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
4 # Handle a single client: read data and echo it back
5 addr = writer.get_extra_info('peername')
6 print(f"Connection from {addr}")
7 while True:
8 data = await reader.read(1024)
9 if not data:
10 break
11 writer.write(data) # echo back
12 await writer.drain()
13 writer.close()
14 await writer.wait_closed()
15 print(f"Connection closed {addr}")
16
17async def main():
18 server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19 addr = server.sockets[0].getsockname()
20 print(f"Serving on {addr}")
21 async with server:
22 await server.serve_forever()
23
24if __name__ == "__main__":
25 asyncio.run(main())-
Nella comunicazione TCP con
asyncio,StreamReadereStreamWritersvolgono un ruolo centrale nell'input e output asincroni.StreamReaderlegge asincronamente i dati inviati dal client, mentreStreamWriterviene utilizzato per inviare risposte dal server al client. -
Anche senza gestire direttamente le operazioni dettagliate dei socket, puoi avviare un server asincrono in modo semplice ed efficiente utilizzando
asyncio.start_server. -
Quando passi una funzione handler a
asyncio.start_server, quella funzione ricevereaderewritercome argomenti. Utilizzando questi strumenti, puoi implementare processi di comunicazione in modo più sicuro e chiaro rispetto alla gestione diretta delle API socket di basso livello. Ad esempio, ricevendo dati conreader.read()e combinandowriter.write()conwriter.drain(), puoi implementare una trasmissione asincrona che garantisce il completamento dell'invio. -
Questa configurazione è adatta per gestire un gran numero di connessioni simultanee ed è ideale per protocolli semplici o servizi TCP di piccola scala.
Gestione di Grandi Flussi di Dati (Streaming)
Quando si elaborano file di grandi dimensioni o risposte in modo sequenziale, leggere e scrivere i dati a blocchi per mantenere basso il consumo di memoria. Ecco un esempio di lettura in streaming usando aiohttp.
Il seguente codice elabora le risposte HTTP a blocchi e scrive su disco man mano che i dati vengono ricevuti.
1import aiohttp
2import asyncio
3import aiofiles
4
5async def stream_download(url: str, dest: str):
6 # Stream download and write to file in chunks
7 async with aiohttp.ClientSession() as session:
8 async with session.get(url) as resp:
9 async with aiofiles.open(dest, 'wb') as f:
10 async for chunk in resp.content.iter_chunked(1024 * 64):
11 await f.write(chunk)
12
13if __name__ == "__main__":
14 asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))-
Questo codice non carica un file di grandi dimensioni tutto in una volta; invece, riceve i dati a blocchi (piccole parti) e li scrive su un file in modo asincrono. Di conseguenza, può eseguire download rapidamente ed efficientemente mantenendo basso l'uso della memoria.
aiohttprecupera i dati in modo asincrono eaiofilesscrive sul file senza bloccare, rendendo semplice l'esecuzione insieme ad altri processi. -
Questo pattern è adatto per scaricare e salvare grandi file in modo efficiente riducendo al minimo l'uso di memoria.
Esecuzione Asincrona di Sottoprocessi
Se si vuole eseguire comandi esterni in modo asincrono e leggere il loro output in tempo reale, asyncio.create_subprocess_exec è molto utile.
Ecco un esempio di avvio di un comando esterno e lettura in tempo reale dello standard output.
1import asyncio
2
3async def run_cmd(cmd):
4 # Run external command asynchronously and capture output line by line
5 proc = await asyncio.create_subprocess_exec(
6 *cmd,
7 stdout=asyncio.subprocess.PIPE,
8 stderr=asyncio.subprocess.PIPE
9 )
10
11 async def read_stream(stream, name):
12 while True:
13 line = await stream.readline()
14 if not line:
15 break
16 print(f"[{name}] {line.decode().rstrip()}")
17
18 await asyncio.gather(
19 read_stream(proc.stdout, "stdout"),
20 read_stream(proc.stderr, "stderr"),
21 )
22 await proc.wait()
23 return proc.returncode
24
25if __name__ == "__main__":
26 asyncio.run(run_cmd(["python", "--version"]))- Gestendo i sottoprocessi asincronamente, è possibile trattare i log di strumenti esterni in tempo reale o eseguire più processi in parallelo.
Gestione di Annullamento e Timeout
I task asincroni possono essere annullati. Per implementare un timeout, è semplice usare asyncio.wait_for.
Di seguito un esempio di esecuzione di un task con un timeout.
1import asyncio
2
3async def slow_task():
4 await asyncio.sleep(5)
5 return "done"
6
7async def main():
8 try:
9 result = await asyncio.wait_for(slow_task(), timeout=2.0)
10 print("Result:", result)
11 except asyncio.TimeoutError:
12 print("Task timed out")
13
14if __name__ == "__main__":
15 asyncio.run(main())wait_forsolleva unTimeoutErrorse si supera il tempo massimo e annulla il task se necessario. Fai attenzione alla propagazione della cancellazione dei task e alla pulizia.
Controllo della Concorrenza (Semaphore)
Poiché molte connessioni o richieste concorrenti possono esaurire le risorse, limita la concorrenza con asyncio.Semaphore.
Quanto segue è un esempio di limitazione dei download simultanei tramite un semaforo.
1import asyncio
2import aiohttp
3
4semaphore = asyncio.Semaphore(3) # allow up to 3 concurrent tasks
5
6async def limited_fetch(session, url):
7 async with semaphore:
8 async with session.get(url) as resp:
9 return url, resp.status
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [limited_fetch(session, u) for u in urls]
14 results = await asyncio.gather(*tasks)
15 print(results)
16
17if __name__ == "__main__":
18 urls = ["https://codesparklab.com/json/example.json"] * 10
19 asyncio.run(main(urls))- Con questo metodo puoi accedere delicatamente ai servizi esterni ed evitare di sovraccaricare il tuo processo.
Gestione degli Errori e Strategie di Retry
Gli errori si verificano inevitabilmente anche nell'elaborazione asincrona. Cattura le eccezioni in modo appropriato e implementa strategie di retry come l'exponential backoff.
Di seguito un esempio di implementazione di retry semplici fino ad N tentativi.
1import asyncio
2import aiohttp
3import random
4
5async def fetch_with_retry(session, url, retries=3):
6 for attempt in range(1, retries + 1):
7 try:
8 async with session.get(url) as resp:
9 resp.raise_for_status()
10 text = await resp.text()
11 return text
12 except Exception as e:
13 if attempt == retries:
14 raise
15 await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18 async with aiohttp.ClientSession() as session:
19 text = await fetch_with_retry(session, "https://codesparklab.com/")
20 print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23 asyncio.run(main())- Una corretta logica di retry è importante per bilanciare coerenza e controllo del traffico.
Consigli per il Debug e il Logging
Nel processamento asincrono, i task procedono contemporaneamente, il che può rendere difficile individuare la causa dei problemi. Per tracciare i problemi in modo efficiente, tenere presenti i seguenti punti renderà il debug più fluido.
- Le eccezioni da
asyncio.run()e daiTaskpossono passare inosservate, quindi assicurati di registrare le eccezioni non gestite. - Quando si utilizza
logging, includere il nome della coroutine o, in Python 3.8 e successivi,task.get_name()nei log facilita il tracciamento. - Puoi controllare lo stato attuale delle attività utilizzando
asyncio.Task.all_tasks(). Tuttavia, questa API è pensata per scopi di debugging e dovrebbe essere utilizzata con cautela in ambienti di produzione per evitare problemi di prestazioni o interferenze inattese.
Considerazioni sulle Prestazioni
Sebbene la programmazione asincrona sia eccellente nella gestione delle attese dell'I/O, un uso improprio può peggiorare le prestazioni. Ottimizza tenendo a mente i seguenti punti:.
- L'elaborazione asincrona è ottima per task I/O-bound ma non è adatta per task CPU-bound; in questi casi utilizza un process pool.
- Quando usi thread o process pool, considera la dimensione del pool e la natura dei task.
- Se avvii molti piccoli task contemporaneamente, l'overhead dell'event loop aumenta—usa quindi batch o semafori per bilanciare.
Riepilogo
L'I/O asincrono di Python è un meccanismo potente che sfrutta efficacemente i tempi di attesa I/O ed esegue contemporaneamente operazioni di rete e file in modo efficiente. Combinando tecniche come asyncio, aiohttp, aiofiles e run_in_executor, puoi costruire applicazioni asincrone pratiche in modo flessibile. Utilizzando async with per automatizzare acquisizione e rilascio delle risorse, si possono gestire in modo sicuro e affidabile risorse asincrone come file, sessioni HTTP e lock. Incorporando una corretta gestione degli errori e della concorrenza, puoi eseguire in sicurezza programmi asincroni affidabili.
Puoi seguire l'articolo sopra utilizzando Visual Studio Code sul nostro canale YouTube. Controlla anche il nostro canale YouTube.