Asynkron in- och utmatning

Asynkron in- och utmatning

Den här artikeln förklarar asynkron in- och utmatning.

Den här guiden förklarar steg för steg de koncept och mönster inom asynkron in- och utmatning som är praktiskt användbara i Python.

YouTube Video

Asynkron in- och utdata (I/O)

Konceptet med asynkron I/O

Asynkron I/O är en mekanism som möjliggör att andra operationer kan köras parallellt medan man väntar på tidskrävande I/O, som filoperationer eller nätverkskommunikation. I Python tillhandahålls asyncio som det standard asynkrona ramverket, och många bibliotek är utformade för att följa denna mekanism.

Grunder: async / await och händelseloopen

Först, så här skriver du grundläggande korutiner och ett exempel på att köra flera korutiner samtidigt med hjälp av asyncio.gather.

Koden nedan är ett minimalt exempel på hur man definierar och kör asynkrona funktioner parallellt. sleep-funktionen används för att demonstrera parallell exekvering.

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Denna kod startar händelseloopen med asyncio.run() och kör tre korutiner parallellt.

async with och asynkrona kontexthanterare

Vid asynkron bearbetning kan resurshantering som att öppna anslutningar och stänga filer lätt bli komplicerat. Det är här asynkrona kontexthanterare med async with blir användbara. Denna syntax används på samma sätt som det synkrona with-uttrycket, men den interna bearbetningen är asynkron och passar därmed naturligt in i async/await-flödet.

Det finns två huvudsakliga skäl att använda async with:.

  • För att tillförlitligt rensa upp resurser som anslutningar, filhanterare eller sessioner. Du kan vara säker på att resurser frigörs korrekt även om ett oväntat avslut sker.
  • För att automatisera initiering och städning, såsom att skapa eller stänga anslutningar och flusha, på ett asynkront sätt. Detta sparar manuellt kodarbete och gör din kod tydligare.

Nedan följer ett exempel på hur man från grunden skapar en enkel asynkron kontexthanterare.

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • Genom att definiera __aenter__ och __aexit__ kan du använda async with.
  • Bearbetningen när du går in i och lämnar async with-blocket exekveras asynkront och säkert.

Asynkron fil-I/O (aiofiles)

Filoperationer är ett klassiskt exempel på blockering. Genom att använda aiofiles kan du säkert hantera filoperationer asynkront. Internt använder det en trådpool och säkerställer att filer stängs korrekt med async with.

Följande exempel demonstrerar parallell asynkron läsning av flera filer. Du måste installera aiofiles med pip install aiofiles innan du kör denna kod.

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • Den här koden parallelliserar läsningen av varje fil. aiofiles använder ofta en trådpool internt, vilket låter dig hantera blockerande fil-I/O via ett asynkront gränssnitt.

Asynkron HTTP-klient (aiohttp)

Som ett klassiskt exempel på nätverks-I/O, så här gör du HTTP-förfrågningar asynkront. Det är särskilt kraftfullt när du behöver göra många HTTP-förfrågningar parallellt.

Nedan finns ett exempel på att hämta flera URL:er parallellt med aiohttp. Du måste installera aiohttp med pip install aiohttp.

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • Genom att använda asyncio.as_completed kan du bearbeta resultat i den ordning uppgifter slutförs. Detta är användbart för att effektivt hantera många förfrågningar.

Samsamexistens med blockerande I/O: run_in_executor

När du hanterar CPU-intensiva uppgifter eller existerande blockerande API:er i asynkron kod, använd ThreadPoolExecutor eller ProcessPoolExecutor via loop.run_in_executor.

Följande kod är ett exempel på att köra uppgifter som förutsätter blockerande I/O parallellt genom att använda en trådpool.

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Genom att använda run_in_executor kan du integrera befintlig synkron kod i asynkrona flöden utan att behöva skriva om koden i stor omfattning. Dock bör du vara uppmärksam på antalet trådar och CPU-belastning.
  • ProcessPoolExecutor är lämplig för CPU-bundna uppgifter.

Asynkron server: TCP Echo-server baserad på asyncio

Om du vill hantera sockets direkt kan du enkelt bygga en asynkron server med hjälp av asyncio.start_server.

Följande exempel är en enkel echo-server som returnerar data precis som de tas emot från klienten.

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • Vid TCP-kommunikation med asyncio spelar StreamReader och StreamWriter en central roll i asynkron in- och utdata. StreamReader läser asynkront data som skickas från klienten, medan StreamWriter används för att skicka svar från servern tillbaka till klienten.

  • Även utan att hantera detaljerade socket-operationer själv kan du starta en asynkron server enkelt och effektivt med asyncio.start_server.

  • När du skickar en hanteringsfunktion till asyncio.start_server får den funktionen reader och writer som argument. Genom att använda dessa kan du implementera kommunikationsprocesser på ett säkrare och tydligare sätt än att hantera låg-nivå socket-API:er direkt. Till exempel, genom att ta emot data med reader.read() och kombinera writer.write() med writer.drain(), kan du implementera asynkron sändning som säkerställer att överföringen är slutförd.

  • Denna uppsättning är lämplig för att hantera många samtidiga anslutningar och är idealisk för enkla protokoll eller småskaliga TCP-tjänster.

Hantering av stora strömningsdata

Vid sekventiell bearbetning av stora filer eller svar, läs och skriv data i segment för att hålla minnesanvändningen låg. Nedan följer ett exempel på strömningsläsning med hjälp av aiohttp.

Följande kod behandlar HTTP-svar i segment och skriver till disk allteftersom data tas emot.

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • Denna kod laddar inte en stor fil på en gång; istället tar den emot data i bitar (små delar) och skriver dem asynkront till en fil. Som resultat kan den genomföra nedladdningar snabbt och effektivt samtidigt som minnesanvändningen hålls låg. aiohttp hämtar data asynkront och aiofiles skriver till filen utan blockering, vilket gör det enkelt att köra parallellt med andra processer.

  • Detta mönster är lämpligt för att ladda ner och spara stora filer effektivt samtidigt som minnesanvändningen minimeras.

Asynkron körning av underprocesser

Om du vill köra externa kommandon asynkront och läsa deras utdata i realtid är asyncio.create_subprocess_exec användbart.

Nedan följer ett exempel på att starta ett externt kommando och läsa dess standardutdata i realtid.

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • Genom att styra underprocesser asynkront kan du hantera loggar från externa verktyg i realtid eller köra flera processer parallellt.

Hantering av avbrott och timeout

Asynkrona uppgifter kan avbrytas. Vid implementation av timeout är det enkelt att använda asyncio.wait_for.

Nedan följer ett exempel på att köra en uppgift med timeout.

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • wait_for kastar ett TimeoutError om timeout uppnås och avbryter uppgiften vid behov. Var försiktig med spridning av avbrott och avslutning av uppgifter.

Styrning av samtidighet (Semaphore)

Eftersom många samtidiga anslutningar eller förfrågningar kan förbruka resurser, begränsa samtidigt antal med asyncio.Semaphore.

Följande är ett exempel på att begränsa samtidiga nedladdningar med hjälp av en semafor.

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • Med denna metod kan du komma åt externa tjänster försiktigt och undvika att överbelasta din egen process.

Felfångst och återförsöksstrategier

Fel uppstår oundvikligen även vid asynkron bearbetning. Fånga undantag på rätt sätt och implementera återförsöksstrategier såsom exponentiell backoff.

Nedan följer ett exempel på en enkel implementation av återförsök upp till N gånger.

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • Rätt återförsökslogik är viktig för att balansera mellan konsistens och trafikstyrning.

Tips för felsökning och loggning

Vid asynkron bearbetning fortskrider uppbyggnaden parallellt, vilket kan göra det svårt att identifiera orsaken till problem. För att effektivt följa upp problem, tänk på följande punkter så blir felsökningen smidigare.

  • Undantag från asyncio.run() och Task är lätta att missa, så se till att logga icke-hanterade undantag.
  • När du använder logging, gör det det enklare att spåra om du inkluderar koroutinnamnet eller, i Python 3.8 och senare, task.get_name() i dina loggar.
  • Du kan kontrollera den aktuella statusen för uppgifter genom att använda asyncio.Task.all_tasks(). Denna API är dock avsedd för felsökningsändamål och bör användas med försiktighet i produktionsmiljöer för att undvika prestandaproblem eller oväntade störningar.

Prestandaöverväganden

Även om asynkron programmering utmärker sig vid hantering av I/O-väntan kan felaktig användning faktiskt försämra prestandan. Optimera genom att tänka på följande punkter:.

  • Asynkron bearbetning är utmärkt för I/O-bundna uppgifter men är inte lämplig för CPU-bundna uppgifter; använd en processpool i sådana fall.
  • Vid användning av tråd- eller processpooler, tänk på poolens storlek och uppgiftens karaktär.
  • Om du startar många små uppgifter samtidigt ökar overhead för händelseloopen—använd därför batchning eller semaforer för att justera.

Sammanfattning

Pythons asynkrona I/O är en kraftfull mekanism som utnyttjar I/O-väntetider effektivt och kör nätverks- och filoperationer parallellt. Genom att kombinera tekniker som asyncio, aiohttp, aiofiles och run_in_executor kan du flexibelt bygga praktiska asynkrona applikationer. Genom att använda async with för att automatisera anskaffning och frigöring av resurser kan du säkert och pålitligt hantera asynkrona resurser som filer, HTTP-sessioner och lås. Genom att inkludera korrekt felfångst och hantering av samtidighet kan du köra asynkrona program med hög tillförlitlighet på ett säkert sätt.

Du kan följa med i artikeln ovan med hjälp av Visual Studio Code på vår YouTube-kanal. Vänligen kolla även in YouTube-kanalen.

YouTube Video