Asynchroniczne wejście/wyjście

Asynchroniczne wejście/wyjście

Ten artykuł wyjaśnia asynchroniczne wejście/wyjście.

Ten przewodnik łagodnie, krok po kroku, wyjaśnia koncepcje i wzorce asynchronicznego wejścia/wyjścia, które są praktycznie przydatne w Pythonie.

YouTube Video

Asynchroniczny wejście/wyjście (I/O)

Koncepcja asynchronicznego wejścia/wyjścia

Asynchroniczne I/O to mechanizm pozwalający na wykonywanie innych operacji równolegle w czasie oczekiwania na czasochłonne operacje wejścia/wyjścia, takie jak operacje na plikach czy komunikacja sieciowa. W Pythonie standardowym asynchronicznym frameworkiem jest asyncio, a wiele bibliotek jest zaprojektowanych w oparciu o ten mechanizm.

Podstawy: async / await i pętla zdarzeń

Najpierw przedstawimy, jak napisać podstawowe korutyny oraz przykład równoczesnego uruchamiania kilku korutyn za pomocą asyncio.gather.

Poniższy kod to minimalny przykład definiowania i równoczesnego uruchamiania funkcji asynchronicznych. Funkcja sleep służy do zilustrowania równoległego wykonywania.

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Kod uruchamia pętlę zdarzeń za pomocą asyncio.run() i jednocześnie uruchamia trzy korutyny.

async with i asynchroniczne menedżery kontekstu

W przetwarzaniu asynchronicznym zarządzanie zasobami, takimi jak otwieranie połączeń czy zamykanie plików, może stać się skomplikowane. Właśnie tu przydają się asynchroniczne menedżery kontekstu z wykorzystaniem async with. Ta składnia jest używana podobnie jak synchroniczne with, ale przetwarzanie wewnątrz jest asynchroniczne, dzięki czemu naturalnie pasuje do przepływu async/await.

Istnieją dwa główne powody, aby używać async with:.

  • Aby niezawodnie zwalniać zasoby, takie jak połączenia, uchwyty plików czy sesje. Możesz mieć pewność, że zasoby zostaną poprawnie zwolnione nawet w przypadku nieoczekiwanego zakończenia.
  • Aby zautomatyzować inicjalizację i czyszczenie zasobów, np. ustanawianie lub zamykanie połączeń i opróżnianie buforów, w sposób asynchroniczny. Oszczędza to konieczność ręcznego kodowania i sprawia, że Twój kod jest bardziej czytelny.

Poniżej znajduje się przykład stworzenia prostego asynchronicznego menedżera kontekstu od podstaw.

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • Definiując metody __aenter__ i __aexit__, możesz używać async with.
  • Operacje podczas wchodzenia i wychodzenia z bloku async with są wykonywane asynchronicznie i bezpiecznie.

Asynchroniczne operacje na plikach (aiofiles)

Operacje na plikach to klasyczny przykład blokowania. Dzięki aiofiles możesz bezpiecznie obsługiwać operacje na plikach w sposób asynchroniczny. Wewnątrz wykorzystywana jest pula wątków, a zamykanie plików jest zapewnione przez async with.

Poniższy przykład demonstruje równoległe, asynchroniczne odczytywanie wielu plików. Przed uruchomieniem tego kodu należy zainstalować aiofiles za pomocą pip install aiofiles.

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • Ten kod równolegle odczytuje każdy plik. aiofiles często korzysta wewnętrznie z puli wątków, umożliwiając obsługę blokujących operacji plikowych poprzez interfejs asynchroniczny.

Asynchroniczny klient HTTP (aiohttp)

Klasycznym przykładem operacji sieciowych jest asynchroniczne wykonywanie zapytań HTTP. Jest to szczególnie skuteczne, gdy musisz wykonać wiele zapytań HTTP równocześnie.

Poniżej przedstawiono przykład równoległego pobierania wielu adresów URL przy użyciu aiohttp. Musisz zainstalować aiohttp za pomocą pip install aiohttp.

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • Dzięki asyncio.as_completed możesz przetwarzać wyniki w kolejności ukończenia zadań. Jest to przydatne do wydajnej obsługi wielu żądań.

Współistnienie z blokującymi operacjami I/O: run_in_executor

Podczas pracy z zadaniami wymagającymi dużych zasobów CPU lub istniejącymi blokującymi API w kodzie asynchronicznym, używaj ThreadPoolExecutor lub ProcessPoolExecutor za pomocą loop.run_in_executor.

Poniższy kod jest przykładem jednoczesnego uruchamiania zadań zakładających blokujące operacje wejścia/wyjścia z użyciem puli wątków.

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Wykorzystując run_in_executor, możesz włączyć istniejący kod synchroniczny do asynchronicznych przepływów bez znacznych przeróbek. Należy jednak zwrócić uwagę na liczbę wątków i obciążenie CPU.
  • ProcessPoolExecutor nadaje się do zadań wymagających dużej mocy obliczeniowej CPU.

Asynchroniczny serwer: serwer TCP Echo oparty na asyncio

Jeśli chcesz obsługiwać gniazda bezpośrednio, możesz łatwo zbudować asynchroniczny serwer za pomocą asyncio.start_server.

Poniższy przykład to prosty serwer echo, który zwraca dane dokładnie w takiej formie, w jakiej otrzymał je od klienta.

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • W komunikacji TCP z użyciem asyncio, StreamReader i StreamWriter odgrywają kluczową rolę w asynchronicznym wejściu i wyjściu. StreamReader asynchronicznie odczytuje dane wysyłane przez klienta, podczas gdy StreamWriter służy do wysyłania odpowiedzi z serwera do klienta.

  • Nawet bez samodzielnej obsługi szczegółowych operacji na gniazdkach, możesz uruchomić asynchroniczny serwer w prosty i wydajny sposób za pomocą asyncio.start_server.

  • Gdy przekażesz funkcję obsługi do asyncio.start_server, ta funkcja otrzymuje reader i writer jako swoje argumenty. Korzystając z tych narzędzi, możesz zaimplementować procesy komunikacji w bezpieczniejszy i bardziej przejrzysty sposób niż poprzez bezpośrednią obsługę niskopoziomowych API gniazdek. Na przykład, odbierając dane za pomocą reader.read() i łącząc writer.write() z writer.drain(), możesz zaimplementować asynchroniczne wysyłanie, które gwarantuje zakończenie transmisji.

  • To rozwiązanie nadaje się do obsługi wielu jednoczesnych połączeń i jest idealne dla prostych protokołów lub małych usług TCP.

Obsługa dużych strumieni danych

Podczas przetwarzania dużych plików lub odpowiedzi sekwencyjnie, odczytuj i zapisuj dane w kawałkach, aby zużycie pamięci było niskie. Poniżej znajduje się przykład czytania strumieniowego przy użyciu aiohttp.

Poniższy kod przetwarza odpowiedzi HTTP w kawałkach i zapisuje na dysk w miarę otrzymywania danych.

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • Ten kod nie ładuje dużego pliku naraz; zamiast tego odbiera dane w kawałkach (małych częściach) i zapisuje je do pliku asynchronicznie. W rezultacie może pobierać dane szybko i wydajnie, utrzymując niskie zużycie pamięci. aiohttp asynchronicznie pobiera dane, a aiofiles zapisuje je do pliku bez blokowania, co ułatwia równoczesne wykonywanie innych procesów.

  • Ten wzorzec nadaje się do wydajnego pobierania i zapisywania dużych plików, jednocześnie minimalizując zużycie pamięci.

Asynchroniczne uruchamianie procesów podrzędnych

Jeśli chcesz uruchomić polecenia zewnętrzne asynchronicznie i czytać ich wyjście w czasie rzeczywistym, przydatne jest asyncio.create_subprocess_exec.

Poniżej znajduje się przykład uruchamiania polecenia zewnętrznego i odczytu standardowego wyjścia w czasie rzeczywistym.

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • Dzięki asynchronicznemu kontrolowaniu procesów podrzędnych możesz obsługiwać logi z narzędzi zewnętrznych w czasie rzeczywistym lub uruchamiać wiele procesów równolegle.

Obsługa anulowania i limitów czasowych

Zadania asynchroniczne mogą zostać anulowane. Aby zaimplementować limit czasu, wystarczy użyć asyncio.wait_for.

Poniżej znajduje się przykład uruchamiania zadania z limitem czasu.

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • wait_for rzuca wyjątek TimeoutError, jeśli limit czasu zostanie przekroczony, i anuluje zadanie w razie potrzeby. Należy zachować ostrożność przy propagacji anulowania zadań i sprzątaniu po nich.

Kontrola współbieżności (Semaphore)

Ponieważ wykonywanie wielu współbieżnych połączeń lub zapytań może wyczerpać zasoby, ograniczaj współbieżność za pomocą asyncio.Semaphore.

Poniżej znajduje się przykład ograniczania liczby równoczesnych pobrań za pomocą semafora.

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • Dzięki tej metodzie możesz delikatnie uzyskiwać dostęp do usług zewnętrznych i unikać przeciążenia własnego procesu.

Obsługa błędów i strategie ponawiania

Błędy nieuchronnie pojawiają się także w przetwarzaniu asynchronicznym. Odpowiednio przechwytuj wyjątki i wdrażaj strategie ponawiania, takie jak wykładnicze opóźnianie próby ponownej.

Poniżej znajduje się przykład implementacji prostego ponawiania do N razy.

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • Właściwa logika ponawiania jest ważna dla równowagi między spójnością a kontrolą ruchu.

Wskazówki dotyczące debugowania i logowania

W przetwarzaniu asynchronicznym zadania przebiegają równolegle, co może utrudniać identyfikację przyczyny problemów. Aby skutecznie śledzić problemy, pamiętaj o poniższych kwestiach – usprawni to debugowanie.

  • Wyjątki pochodzące z asyncio.run() i Task łatwo przeoczyć, więc koniecznie loguj nieobsłużone wyjątki.
  • Podczas korzystania z logging, uwzględnienie nazwy korutyny lub, w Pythonie 3.8 i nowszych, task.get_name() w logach ułatwia śledzenie.
  • Możesz sprawdzić aktualny status zadań za pomocą asyncio.Task.all_tasks(). Jednakże to API jest przeznaczone do celów debugowania i powinno być używane ostrożnie w środowiskach produkcyjnych, aby uniknąć problemów z wydajnością lub nieoczekiwanych zakłóceń.

Uwagi dotyczące wydajności

Chociaż programowanie asynchroniczne znakomicie radzi sobie z oczekiwaniem na operacje I/O, niewłaściwe użycie może pogorszyć wydajność. Optymalizuj, pamiętając o następujących kwestiach:.

  • Przetwarzanie asynchroniczne sprawdza się przy zadaniach ograniczonych przez I/O, ale nie nadaje się do zadań ograniczonych przez CPU; w takich przypadkach użyj puli procesów.
  • Korzystając z puli wątków lub procesów, weź pod uwagę ich wielkość i charakter zadań.
  • Jeśli uruchamiasz jednocześnie wiele małych zadań, narzut na pętlę zdarzeń rośnie – stosuj więc grupowanie zadań lub semafory, aby dostosować obciążenie.

Podsumowanie

Asynchroniczne I/O w Pythonie to potężny mechanizm, który skutecznie wykorzystuje czas oczekiwania na I/O i efektywnie równolegle realizuje operacje sieciowe oraz plikowe. Łącząc techniki takie jak asyncio, aiohttp, aiofiles oraz run_in_executor, możesz elastycznie tworzyć praktyczne aplikacje asynchroniczne. Wykorzystując async with do automatycznego przejmowania i zwalniania zasobów, możesz bezpiecznie i niezawodnie zarządzać asynchronicznymi zasobami, takimi jak pliki, sesje HTTP i blokady. Wprowadzając odpowiednią obsługę błędów i zarządzanie współbieżnością, możesz bezpiecznie uruchamiać wysoko niezawodne programy asynchroniczne.

Możesz śledzić ten artykuł, korzystając z Visual Studio Code na naszym kanale YouTube. Proszę również sprawdzić nasz kanał YouTube.

YouTube Video