Asynkron ind-/uddata

Asynkron ind-/uddata

Denne artikel forklarer asynkron ind-/uddata.

Denne vejledning forklarer trin-for-trin de begreber og mønstre inden for asynkron ind-/uddata, som er praktisk anvendelige i Python.

YouTube Video

Asynkron input/output (I/O)

Begrebet asynkron I/O

Asynkron I/O er en mekanisme, som tillader andre operationer at køre parallelt, mens der ventes på tidskrævende I/O, såsom filoperationer eller netværkskommunikation. I Python leveres asyncio som det standard asynkrone framework, og mange biblioteker er designet til at følge denne mekanisme.

Grundlæggende: async / await og event loopet

Først forklares, hvordan man skriver grundlæggende korutiner og et eksempel på at køre flere korutiner samtidigt med asyncio.gather.

Koden nedenfor er et minimalt eksempel på at definere og køre asynkrone funktioner samtidigt. sleep-funktionen bruges til at demonstrere parallel eksekvering.

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Denne kode starter event loopet med asyncio.run() og kører tre korutiner samtidigt.

async with og asynkrone kontekstadministratorer

I asynkron behandling kan ressourcestyring som at åbne forbindelser og lukke filer let blive komplekst. Her bliver asynkrone kontekstadministratorer med async with nyttige. Denne syntaks bruges på samme måde som den synkrone with-sætning, men den interne behandling er asynkron, så det passer naturligt ind i async/await-flowet.

Der er to hovedårsager til at bruge async with:.

  • For pålideligt at oprydde ressourcer som forbindelser, filhåndtag eller sessioner. Du kan være sikker på, at ressourcer frigives korrekt, selv hvis en unormal afbrydelse opstår.
  • For at automatisere initialiserings- og oprydningsopgaver, som at oprette eller lukke forbindelser og flush, på asynkron vis. Dette sparer dig for manuelt kodearbejde og gør din kode tydeligere.

Nedenfor er et eksempel på at oprette en simpel asynkron kontekstadministrator fra bunden.

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • Ved at definere __aenter__ og __aexit__ kan du bruge async with.
  • Behandling ved indtræden og udtræden af async with-blokken udføres asynkront og sikkert.

Asynkron fil-I/O (aiofiles)

Filoperationer er et klassisk eksempel på blokering. Ved at bruge aiofiles kan du sikkert håndtere filoperationer asynkront. Internt bruges en tråd-pool, og det sikres, at filer lukkes korrekt med async with.

Følgende eksempel demonstrerer parallel asynkron læsning af flere filer. Du skal installere aiofiles med pip install aiofiles før du kører denne kode.

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • Denne kode paralleliserer læsningen af hver fil. aiofiles bruger ofte en tråd-pool internt, hvilket gør det muligt at håndtere blokerende fil-I/O via et asynkront interface.

Asynkron HTTP-klient (aiohttp)

Som et klassisk eksempel på netværks-I/O vises her, hvordan HTTP-forespørgsler udføres asynkront. Det er særligt kraftfuldt, når du skal foretage mange HTTP-forespørgsler parallelt.

Nedenfor er et eksempel på at hente flere URLs parallelt med aiohttp. Du skal installere aiohttp med pip install aiohttp.

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • Ved at bruge asyncio.as_completed kan du behandle resultater i den rækkefølge, opgaverne fuldføres. Dette er nyttigt for effektivt at håndtere mange forespørgsler.

Sammenkobling med blokerende I/O: run_in_executor

Når du arbejder med CPU-intensitive opgaver eller eksisterende blokerende API'er i asynkront kode, skal du bruge ThreadPoolExecutor eller ProcessPoolExecutor via loop.run_in_executor.

Følgende kode er et eksempel på at køre opgaver, der antager blokerende I/O, samtidigt ved hjælp af en trådpool.

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Ved at anvende run_in_executor kan du integrere eksisterende synkron kode i asynkrone flows uden større omskrivning. Du bør dog være opmærksom på antal tråde og CPU-belastning.
  • ProcessPoolExecutor er velegnet til CPU-krævende opgaver.

Asynkron server: TCP echo-server baseret på asyncio

Hvis du vil håndtere sockets direkte, kan du nemt bygge en asynkron server med asyncio.start_server.

Følgende eksempel er en simpel echo-server, der returnerer data nøjagtigt, som de modtages fra klienten.

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • StreamReader og StreamWriter spiller en central rolle i asynkron ind- og uddata under TCP-kommunikation med asyncio. StreamReader læser asynkront data sendt fra klienten, mens StreamWriter bruges til at sende svar fra serveren tilbage til klienten.

  • Selv uden at håndtere de detaljerede socket-operationer selv, kan du nemt og effektivt starte en asynkron server ved hjælp af asyncio.start_server.

  • Når du videregiver en handlerfunktion til asyncio.start_server, modtager denne funktion reader og writer som dens argumenter. Ved at bruge disse kan du implementere kommunikationsprocesser på en mere sikker og tydelig måde end ved at håndtere low-level socket-API'er direkte. For eksempel kan du ved at modtage data med reader.read() og kombinere writer.write() med writer.drain() implementere asynkron sending, der sikrer, at overførslen er fuldført.

  • Denne opsætning er velegnet til at håndtere mange samtidige forbindelser og er ideel til simple protokoller eller mindre TCP-tjenester.

Håndtering af store streaming-data

Når store filer eller svar behandles sekventielt, bør data læses og skrives i bidder for at holde hukommelsesforbruget lavt. Nedenfor er et eksempel på streaming-læsninger med aiohttp.

Følgende kode behandler HTTP-svar i bidder og skriver til disk, efterhånden som data modtages.

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • Denne kode indlæser ikke en stor fil på én gang; i stedet modtager den data i stykker (små dele) og skriver det til en fil asynkront. Som følge heraf kan den udføre downloads hurtigt og effektivt, mens den holder hukommelsesforbruget lavt. aiohttp henter data asynkront, og aiofiles skriver til filen uden at blokere, hvilket gør det nemt at køre sammen med andre processer.

  • Dette mønster er velegnet til at hente og gemme store filer effektivt, mens hukommelsesforbruget minimeres.

Asynkron eksekvering af underprocesser

Hvis du vil køre eksterne kommandoer asynkront og læse deres output i realtid, er asyncio.create_subprocess_exec nyttig.

Nedenfor er et eksempel på at starte en ekstern kommando og læse dens standardoutput i realtid.

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • Ved at styre underprocesser asynkront kan du håndtere logs fra eksterne værktøjer i realtid eller køre flere processer parallelt.

Håndtering af annullering og timeout

Asynkrone opgaver kan annulleres. Ved implementering af timeout er det nemt at bruge asyncio.wait_for.

Nedenfor er et eksempel på at køre en opgave med timeout.

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • wait_for kaster en TimeoutError hvis timeout opnås og annullerer opgaven om nødvendigt. Vær opmærksom på opgaveannullering og oprydning.

Styring af konkurrence (Semaphore)

Da mange samtidige forbindelser eller forespørgsler kan udtømme ressourcer, begrænses samtidighed med asyncio.Semaphore.

Følgende er et eksempel på at begrænse samtidige downloads med en semaphore.

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • Med denne metode kan du tilgå eksterne tjenester skånsomt og undgå overbelastning af din egen proces.

Fejlhåndtering og retry-strategier

Fejl opstår uundgåeligt selv i asynkron behandling. Håndter undtagelser passende og implementer retry-strategier som eksponentiel backoff.

Nedenfor er et eksempel på simpel retry-implementering op til N gange.

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • Korrekt retry-logik er vigtig for at balancere konsistens og trafikstyring.

Tips til fejlfinding og logging

Ved asynkron behandling forløber opgaver samtidigt, hvilket kan gøre det svært at identificere årsagen til problemer. For effektiv sporing af problemer vil det være lettere at fejlfinde, hvis du husker følgende punkter.

  • Undtagelser fra asyncio.run() og Task overses let, så sørg for at logge ikke-håndterede fejl.
  • Når du bruger logging, gør det sporing lettere at inkludere koroutinens navn eller, i Python 3.8 og nyere, task.get_name() i dine logs.
  • Du kan kontrollere den nuværende status for opgaver ved hjælp af asyncio.Task.all_tasks(). Dog er dette API beregnet til fejlsøgningsformål og bør bruges med forsigtighed i produktionsmiljøer for at undgå præstationsproblemer eller uventet forstyrrelse.

Ydelsesmæssige overvejelser

Selvom asynkron programmering er fremragende til at håndtere I/O-ventetider, kan forkert brug forringe ydelsen. Optimer ved at huske følgende:.

  • Asynkron behandling er bedst til I/O-relaterede opgaver og ikke til CPU-tunge opgaver; brug en process-pool i sådanne tilfælde.
  • Overvej pool-størrelse og opgavens art ved brug af tråd- eller process-pools.
  • Starter du mange små opgaver på én gang, øges overhead i event loopet – brug batching eller semaforer for at justere.

Sammendrag

Pythons asynkrone I/O er en kraftfuld mekanisme, som udnytter I/O-ventetider effektivt og udfører netværks- og filoperationer samtidigt. Ved at kombinere teknikker som asyncio, aiohttp, aiofiles og run_in_executor kan du fleksibelt bygge praktiske, asynkrone applikationer. Ved at bruge async with til at automatisere ressourcetildeling og -frigivelse kan du sikkert og pålideligt administrere asynkrone ressourcer som filer, HTTP-sessioner og locks. Ved at indarbejde korrekt fejlhåndtering og styring af samtidighed kan du køre asynkrone programmer med høj pålidelighed sikkert.

Du kan følge med i ovenstående artikel ved hjælp af Visual Studio Code på vores YouTube-kanal. Husk også at tjekke YouTube-kanalen.

YouTube Video