Asynkron inn-/utdata
Denne artikkelen forklarer asynkron inn-/utdata.
Denne veiledningen forklarer på en enkel måte, trinn for trinn, konseptene og mønstrene for asynkron inn-/utdata som er praktisk nyttige i Python.
YouTube Video
Asynkron inn-/utdata (I/O)
Konseptet med asynkron I/O
Asynkron I/O er en mekanisme som gjør det mulig for andre operasjoner å kjøre parallelt mens man venter på tidkrevende I/O, som filoperasjoner eller nettverkskommunikasjon. I Python tilbys asyncio som det standard asynkrone rammeverket, og mange biblioteker er designet for å følge denne mekanismen.
Grunnleggende: async / await og event loop
Først, slik skriver du grunnleggende korutiner og et eksempel på å kjøre flere korutiner samtidig med asyncio.gather.
Koden under er et minimalt eksempel på å definere og kjøre asynkrone funksjoner parallelt. sleep-funksjonen brukes for å demonstrere parallell kjøring.
1import asyncio
2
3async def worker(name: str, delay: float):
4 # Simulate I/O-bound work by sleeping
5 print(f"{name} started")
6 await asyncio.sleep(delay)
7 print(f"{name} finished after {delay}s")
8 return name, delay
9
10async def main():
11 # Run multiple coroutines concurrently
12 tasks = [
13 worker("A", 1.5),
14 worker("B", 1.0),
15 worker("C", 0.5),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Denne koden starter event loop med
asyncio.run()og kjører tre korutiner samtidig.
async with og asynkrone kontekstadministratorer
I asynkron behandling kan ressursstyring som åpning av tilkoblinger og lukking av filer lett bli komplisert. Dette er hvor asynkrone kontekstadministratorer med async with blir nyttige. Denne syntaksen brukes akkurat som den synkrone with-setningen, men den interne behandlingen er asynkron, så den passer naturlig inn i async/await-flyten.
Det er to hovedgrunner til å bruke async with:.
- For å rydde opp i ressurser som tilkoblinger, filhåndtak eller økter på en pålitelig måte. Du kan være trygg på at ressurser blir riktig frigitt selv om en unormal avslutning oppstår.
- For å automatisere initialisering og oppryddingsoppgaver, som å opprette eller lukke tilkoblinger og tømme buffere, på en asynkron måte. Dette sparer bryet med manuell koding og gjør koden din tydeligere.
Under er et eksempel på å lage en enkel asynkron kontekstadministrator fra bunnen av.
1import asyncio
2
3class AsyncResource:
4 async def __aenter__(self):
5 print("Opening resource...")
6 await asyncio.sleep(0.5)
7 print("Resource opened")
8 return self
9
10 async def __aexit__(self, exc_type, exc, tb):
11 print("Closing resource...")
12 await asyncio.sleep(0.5)
13 print("Resource closed")
14
15async def main():
16 async with AsyncResource() as r:
17 print("Using resource...")
18
19if __name__ == "__main__":
20 asyncio.run(main())- Ved å definere
__aenter__og__aexit__, kan du brukeasync with. - Behandling ved inn- og utgang fra
async with-blokken kjøres asynkront og trygt.
Asynkron fil-I/O (aiofiles)
Filoperasjoner er et klassisk eksempel på blokkering. Ved å bruke aiofiles kan du trygt håndtere filoperasjoner asynkront. Internt bruker det en trådpool og sikrer at filer blir ordentlig lukket med async with.
Følgende eksempel demonstrerer parallell asynkron lesing av flere filer. Du må installere aiofiles med pip install aiofiles før du kjører denne koden.
1# pip install aiofiles
2import asyncio
3import aiofiles
4from pathlib import Path
5
6async def read_file(path: Path):
7 # Read file content asynchronously
8 async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
9 contents = await f.read()
10 return path.name, len(contents)
11
12async def main():
13 files = [
14 Path("example1.txt"),
15 Path("example2.txt"),
16 Path("example3.txt")
17 ]
18 tasks = [read_file(p) for p in files]
19 results = await asyncio.gather(*tasks)
20 for name, size in results:
21 print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24 asyncio.run(main())- Denne koden parallelliserer lesingen av hver fil.
aiofilesbruker ofte en trådpool internt, som lar deg håndtere blokkering av fil-I/O via et asynkront grensesnitt.
Asynkron HTTP-klient (aiohttp)
Som et klassisk eksempel på nettverks-I/O, her er hvordan du utfører HTTP-forespørsler asynkront. Det er spesielt kraftig når du må utføre mange HTTP-forespørsler parallelt.
Under er et eksempel på å hente flere URL-er parallelt med aiohttp. Du må installere aiohttp med pip install aiohttp.
1# pip install aiohttp
2import asyncio
3import aiohttp
4
5async def fetch(session: aiohttp.ClientSession, url: str):
6 # Fetch a URL asynchronously and return status and size
7 async with session.get(url) as resp:
8 text = await resp.text()
9 return url, resp.status, len(text)
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [fetch(session, u) for u in urls]
14 for coro in asyncio.as_completed(tasks):
15 url, status, size = await coro
16 print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19 urls = [
20 "https://codesparklab.com/json/example1.json",
21 "https://codesparklab.com/json/example2.json",
22 "https://codesparklab.com/json/example3.json",
23 ]
24 asyncio.run(main(urls))- Ved å bruke
asyncio.as_completedkan du behandle resultater i rekkefølgen oppgaver fullføres. Dette er nyttig for å håndtere mange forespørsler effektivt.
Sammenkobling med blokkerende I/O: run_in_executor
Når du arbeider med CPU-intensive oppgaver eller eksisterende blokkerende API-er i asynkront kode, bruk ThreadPoolExecutor eller ProcessPoolExecutor via loop.run_in_executor.
Følgende kode er et eksempel på å kjøre oppgaver som forutsetter blokkerende I/O samtidig ved bruk av en trådpool.
1import asyncio
2from concurrent.futures import ThreadPoolExecutor
3import time
4
5def blocking_io(n):
6 # Simulate a blocking I/O or CPU-bound function
7 time.sleep(n)
8 return f"slept {n}s"
9
10async def main():
11 loop = asyncio.get_running_loop()
12 with ThreadPoolExecutor() as pool:
13 tasks = [
14 loop.run_in_executor(pool, blocking_io, 1),
15 loop.run_in_executor(pool, blocking_io, 2),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Blocking results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Ved å bruke
run_in_executorkan du inkludere eksisterende synkron kode i asynkrone flyter uten store omskrivinger. Du bør imidlertid være oppmerksom på antall tråder og CPU-belastning. ProcessPoolExecutorer egnet for CPU-krevende oppgaver.
Asynkron server: TCP Echo-server basert på asyncio
Hvis du vil håndtere sokler direkte, kan du enkelt bygge en asynkron server med asyncio.start_server.
Følgende eksempel er en enkel ekko-server som returnerer data nøyaktig slik de mottas fra klienten.
1import asyncio
2
3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
4 # Handle a single client: read data and echo it back
5 addr = writer.get_extra_info('peername')
6 print(f"Connection from {addr}")
7 while True:
8 data = await reader.read(1024)
9 if not data:
10 break
11 writer.write(data) # echo back
12 await writer.drain()
13 writer.close()
14 await writer.wait_closed()
15 print(f"Connection closed {addr}")
16
17async def main():
18 server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19 addr = server.sockets[0].getsockname()
20 print(f"Serving on {addr}")
21 async with server:
22 await server.serve_forever()
23
24if __name__ == "__main__":
25 asyncio.run(main())-
I TCP-kommunikasjon med
asynciospillerStreamReaderogStreamWriteren sentral rolle i asynkrone inn- og utdata.StreamReaderleser data sendt fra klienten asynkront, mensStreamWriterbrukes til å sende svar fra serveren tilbake til klienten. -
Selv uten å håndtere de detaljerte operasjonene til sockets selv, kan du starte en asynkron server enkelt og effektivt ved å bruke
asyncio.start_server. -
Når du gir en handler-funksjon til
asyncio.start_server, mottar den funksjonenreaderogwritersom argumenter. Ved å bruke disse kan du implementere kommunikasjonsprosesser på en tryggere og tydeligere måte enn ved å håndtere lavnivå socket-API-er direkte. For eksempel, ved å motta data medreader.read()og kombinerewriter.write()medwriter.drain(), kan du implementere asynkron sending som sikrer at overføringen blir fullført. -
Denne oppsettet passer for å håndtere store mengder samtidige tilkoblinger og er ideell for enkle protokoller eller småskala TCP-tjenester.
Håndtering av store datastrømmer
Ved behandling av store filer eller responser sekvensielt, les og skriv data i blokker for å holde minnebruken lav. Under er et eksempel på strømmende lesing med aiohttp.
Koden under behandler HTTP-responser i blokker og skriver til disk etter hvert som data mottas.
1import aiohttp
2import asyncio
3import aiofiles
4
5async def stream_download(url: str, dest: str):
6 # Stream download and write to file in chunks
7 async with aiohttp.ClientSession() as session:
8 async with session.get(url) as resp:
9 async with aiofiles.open(dest, 'wb') as f:
10 async for chunk in resp.content.iter_chunked(1024 * 64):
11 await f.write(chunk)
12
13if __name__ == "__main__":
14 asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))-
Denne koden laster ikke inn en stor fil på en gang; i stedet mottar den dataene i biter (små deler) og skriver dem til en fil asynkront. Som et resultat kan den utføre nedlastinger raskt og effektivt samtidig som minnebruken holdes lav.
aiohttphenter data asynkront, ogaiofilesskriver til filen uten å blokkere, noe som gjør det enkelt å kjøre dette sammen med andre prosesser. -
Dette mønsteret passer for å laste ned og lagre store filer effektivt og samtidig minimere minnebruk.
Asynkron kjøring av underprosesser
Hvis du vil kjøre eksterne kommandoer asynkront og lese utdata i sanntid, er asyncio.create_subprocess_exec nyttig.
Under er et eksempel på å starte en ekstern kommando og lese dens standardutdata i sanntid.
1import asyncio
2
3async def run_cmd(cmd):
4 # Run external command asynchronously and capture output line by line
5 proc = await asyncio.create_subprocess_exec(
6 *cmd,
7 stdout=asyncio.subprocess.PIPE,
8 stderr=asyncio.subprocess.PIPE
9 )
10
11 async def read_stream(stream, name):
12 while True:
13 line = await stream.readline()
14 if not line:
15 break
16 print(f"[{name}] {line.decode().rstrip()}")
17
18 await asyncio.gather(
19 read_stream(proc.stdout, "stdout"),
20 read_stream(proc.stderr, "stderr"),
21 )
22 await proc.wait()
23 return proc.returncode
24
25if __name__ == "__main__":
26 asyncio.run(run_cmd(["python", "--version"]))- Ved å kontrollere underprosesser asynkront kan du håndtere logger fra eksterne verktøy i sanntid eller kjøre flere prosesser parallelt.
Håndtering av avbrytelser og tidsavbrudd
Asynkrone oppgaver kan avbrytes. Når du implementerer et tidsavbrudd, er det enkelt å bruke asyncio.wait_for.
Under er et eksempel på å kjøre en oppgave med tidsavbrudd.
1import asyncio
2
3async def slow_task():
4 await asyncio.sleep(5)
5 return "done"
6
7async def main():
8 try:
9 result = await asyncio.wait_for(slow_task(), timeout=2.0)
10 print("Result:", result)
11 except asyncio.TimeoutError:
12 print("Task timed out")
13
14if __name__ == "__main__":
15 asyncio.run(main())wait_forkaster enTimeoutErrorhvis tidsavbruddet nås og avbryter oppgaven om nødvendig. Vær forsiktig med spredning og opprydding av oppgaveavbrytelser.
Styring av samtidighet (Semaphore)
Fordi mange samtidige tilkoblinger eller forespørsler kan tømme ressursene, bør samtidigheten begrenses med asyncio.Semaphore.
Følgende er et eksempel på å begrense samtidige nedlastinger med en semafor.
1import asyncio
2import aiohttp
3
4semaphore = asyncio.Semaphore(3) # allow up to 3 concurrent tasks
5
6async def limited_fetch(session, url):
7 async with semaphore:
8 async with session.get(url) as resp:
9 return url, resp.status
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [limited_fetch(session, u) for u in urls]
14 results = await asyncio.gather(*tasks)
15 print(results)
16
17if __name__ == "__main__":
18 urls = ["https://codesparklab.com/json/example.json"] * 10
19 asyncio.run(main(urls))- Med denne metoden kan du forsiktig få tilgang til eksterne tjenester og unngå å overbelaste din egen prosess.
Feilhåndtering og retry-strategier
Feil oppstår uunngåelig også i asynkron behandling. Fang unntak på riktig måte og implementer retry-strategier som eksponentiell backoff.
Under er et eksempel på enkel retry-implementering opptil N ganger.
1import asyncio
2import aiohttp
3import random
4
5async def fetch_with_retry(session, url, retries=3):
6 for attempt in range(1, retries + 1):
7 try:
8 async with session.get(url) as resp:
9 resp.raise_for_status()
10 text = await resp.text()
11 return text
12 except Exception as e:
13 if attempt == retries:
14 raise
15 await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18 async with aiohttp.ClientSession() as session:
19 text = await fetch_with_retry(session, "https://codesparklab.com/")
20 print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23 asyncio.run(main())- Riktig retry-logikk er viktig for balanse mellom konsistens og trafikkontroll.
Tips for feilsøking og logging
I asynkron behandling skjer oppgaver parallelt, noe som kan gjøre det vanskelig å identifisere årsaken til problemer. For å spore problemer effektivt, vil det å huske på følgende punkter gjøre feilsøking enklere.
- Unntak fra
asyncio.run()ogTasker lett å overse, så sørg for å logge ubehandlede unntak. - Når du bruker
logging, gjør det sporing enklere hvis du inkluderer navnet på koroutinen eller, i Python 3.8 og nyere,task.get_name()i loggene dine. - Du kan sjekke den nåværende statusen til oppgaver ved å bruke
asyncio.Task.all_tasks(). Denne API-en er imidlertid ment for feilsøkingsformål og bør brukes med forsiktighet i produksjonsmiljøer for å unngå ytelsesproblemer eller uventet innblanding.
Ytelseshensyn
Selv om asynkron programmering er utmerket for å håndtere I/O-venting, kan feil bruk faktisk redusere ytelsen. Optimaliser ved å huske på følgende punkter:.
- Asynkron behandling er utmerket for I/O-bundne oppgaver, men ikke egnet for CPU-bundne oppgaver; bruk en prosesspool i slike tilfeller.
- Når du bruker tråd- eller prosesspool, vurder størrelsen på poolen og oppgavens art.
- Star du mange små oppgaver på en gang, økes overhead for event loop—så bruk batching eller semaforer for å justere.
Sammendrag
Pythons asynkrone I/O er en kraftig mekanisme som gjør effektiv bruk av I/O-ventetid og utfører nettverks- og filoperasjoner effektivt parallelt. Ved å kombinere teknikker som asyncio, aiohttp, aiofiles og run_in_executor, kan du fleksibelt bygge praktiske asynkrone applikasjoner. Ved å bruke async with for å automatisere ressursanskaffelse og -frigjøring, kan du trygt og pålitelig administrere asynkrone ressurser som filer, HTTP-økter og låser. Ved å innføre riktig feilhåndtering og styring av samtidighet, kan du trygt kjøre asynkrone programmer med høy pålitelighet.
Du kan følge med på artikkelen ovenfor ved å bruke Visual Studio Code på vår YouTube-kanal. Vennligst sjekk ut YouTube-kanalen.