Asynchroniczne wejście/wyjście
Ten artykuł wyjaśnia asynchroniczne wejście/wyjście.
Ten przewodnik łagodnie, krok po kroku, wyjaśnia koncepcje i wzorce asynchronicznego wejścia/wyjścia, które są praktycznie przydatne w Pythonie.
YouTube Video
Asynchroniczny wejście/wyjście (I/O)
Koncepcja asynchronicznego wejścia/wyjścia
Asynchroniczne I/O to mechanizm pozwalający na wykonywanie innych operacji równolegle w czasie oczekiwania na czasochłonne operacje wejścia/wyjścia, takie jak operacje na plikach czy komunikacja sieciowa. W Pythonie standardowym asynchronicznym frameworkiem jest asyncio, a wiele bibliotek jest zaprojektowanych w oparciu o ten mechanizm.
Podstawy: async / await i pętla zdarzeń
Najpierw przedstawimy, jak napisać podstawowe korutyny oraz przykład równoczesnego uruchamiania kilku korutyn za pomocą asyncio.gather.
Poniższy kod to minimalny przykład definiowania i równoczesnego uruchamiania funkcji asynchronicznych. Funkcja sleep służy do zilustrowania równoległego wykonywania.
1import asyncio
2
3async def worker(name: str, delay: float):
4 # Simulate I/O-bound work by sleeping
5 print(f"{name} started")
6 await asyncio.sleep(delay)
7 print(f"{name} finished after {delay}s")
8 return name, delay
9
10async def main():
11 # Run multiple coroutines concurrently
12 tasks = [
13 worker("A", 1.5),
14 worker("B", 1.0),
15 worker("C", 0.5),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Kod uruchamia pętlę zdarzeń za pomocą
asyncio.run()i jednocześnie uruchamia trzy korutyny.
async with i asynchroniczne menedżery kontekstu
W przetwarzaniu asynchronicznym zarządzanie zasobami, takimi jak otwieranie połączeń czy zamykanie plików, może stać się skomplikowane. Właśnie tu przydają się asynchroniczne menedżery kontekstu z wykorzystaniem async with. Ta składnia jest używana podobnie jak synchroniczne with, ale przetwarzanie wewnątrz jest asynchroniczne, dzięki czemu naturalnie pasuje do przepływu async/await.
Istnieją dwa główne powody, aby używać async with:.
- Aby niezawodnie zwalniać zasoby, takie jak połączenia, uchwyty plików czy sesje. Możesz mieć pewność, że zasoby zostaną poprawnie zwolnione nawet w przypadku nieoczekiwanego zakończenia.
- Aby zautomatyzować inicjalizację i czyszczenie zasobów, np. ustanawianie lub zamykanie połączeń i opróżnianie buforów, w sposób asynchroniczny. Oszczędza to konieczność ręcznego kodowania i sprawia, że Twój kod jest bardziej czytelny.
Poniżej znajduje się przykład stworzenia prostego asynchronicznego menedżera kontekstu od podstaw.
1import asyncio
2
3class AsyncResource:
4 async def __aenter__(self):
5 print("Opening resource...")
6 await asyncio.sleep(0.5)
7 print("Resource opened")
8 return self
9
10 async def __aexit__(self, exc_type, exc, tb):
11 print("Closing resource...")
12 await asyncio.sleep(0.5)
13 print("Resource closed")
14
15async def main():
16 async with AsyncResource() as r:
17 print("Using resource...")
18
19if __name__ == "__main__":
20 asyncio.run(main())- Definiując metody
__aenter__i__aexit__, możesz używaćasync with. - Operacje podczas wchodzenia i wychodzenia z bloku
async withsą wykonywane asynchronicznie i bezpiecznie.
Asynchroniczne operacje na plikach (aiofiles)
Operacje na plikach to klasyczny przykład blokowania. Dzięki aiofiles możesz bezpiecznie obsługiwać operacje na plikach w sposób asynchroniczny. Wewnątrz wykorzystywana jest pula wątków, a zamykanie plików jest zapewnione przez async with.
Poniższy przykład demonstruje równoległe, asynchroniczne odczytywanie wielu plików. Przed uruchomieniem tego kodu należy zainstalować aiofiles za pomocą pip install aiofiles.
1# pip install aiofiles
2import asyncio
3import aiofiles
4from pathlib import Path
5
6async def read_file(path: Path):
7 # Read file content asynchronously
8 async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
9 contents = await f.read()
10 return path.name, len(contents)
11
12async def main():
13 files = [
14 Path("example1.txt"),
15 Path("example2.txt"),
16 Path("example3.txt")
17 ]
18 tasks = [read_file(p) for p in files]
19 results = await asyncio.gather(*tasks)
20 for name, size in results:
21 print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24 asyncio.run(main())- Ten kod równolegle odczytuje każdy plik.
aiofilesczęsto korzysta wewnętrznie z puli wątków, umożliwiając obsługę blokujących operacji plikowych poprzez interfejs asynchroniczny.
Asynchroniczny klient HTTP (aiohttp)
Klasycznym przykładem operacji sieciowych jest asynchroniczne wykonywanie zapytań HTTP. Jest to szczególnie skuteczne, gdy musisz wykonać wiele zapytań HTTP równocześnie.
Poniżej przedstawiono przykład równoległego pobierania wielu adresów URL przy użyciu aiohttp. Musisz zainstalować aiohttp za pomocą pip install aiohttp.
1# pip install aiohttp
2import asyncio
3import aiohttp
4
5async def fetch(session: aiohttp.ClientSession, url: str):
6 # Fetch a URL asynchronously and return status and size
7 async with session.get(url) as resp:
8 text = await resp.text()
9 return url, resp.status, len(text)
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [fetch(session, u) for u in urls]
14 for coro in asyncio.as_completed(tasks):
15 url, status, size = await coro
16 print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19 urls = [
20 "https://codesparklab.com/json/example1.json",
21 "https://codesparklab.com/json/example2.json",
22 "https://codesparklab.com/json/example3.json",
23 ]
24 asyncio.run(main(urls))- Dzięki
asyncio.as_completedmożesz przetwarzać wyniki w kolejności ukończenia zadań. Jest to przydatne do wydajnej obsługi wielu żądań.
Współistnienie z blokującymi operacjami I/O: run_in_executor
Podczas pracy z zadaniami wymagającymi dużych zasobów CPU lub istniejącymi blokującymi API w kodzie asynchronicznym, używaj ThreadPoolExecutor lub ProcessPoolExecutor za pomocą loop.run_in_executor.
Poniższy kod jest przykładem jednoczesnego uruchamiania zadań zakładających blokujące operacje wejścia/wyjścia z użyciem puli wątków.
1import asyncio
2from concurrent.futures import ThreadPoolExecutor
3import time
4
5def blocking_io(n):
6 # Simulate a blocking I/O or CPU-bound function
7 time.sleep(n)
8 return f"slept {n}s"
9
10async def main():
11 loop = asyncio.get_running_loop()
12 with ThreadPoolExecutor() as pool:
13 tasks = [
14 loop.run_in_executor(pool, blocking_io, 1),
15 loop.run_in_executor(pool, blocking_io, 2),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Blocking results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Wykorzystując
run_in_executor, możesz włączyć istniejący kod synchroniczny do asynchronicznych przepływów bez znacznych przeróbek. Należy jednak zwrócić uwagę na liczbę wątków i obciążenie CPU. ProcessPoolExecutornadaje się do zadań wymagających dużej mocy obliczeniowej CPU.
Asynchroniczny serwer: serwer TCP Echo oparty na asyncio
Jeśli chcesz obsługiwać gniazda bezpośrednio, możesz łatwo zbudować asynchroniczny serwer za pomocą asyncio.start_server.
Poniższy przykład to prosty serwer echo, który zwraca dane dokładnie w takiej formie, w jakiej otrzymał je od klienta.
1import asyncio
2
3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
4 # Handle a single client: read data and echo it back
5 addr = writer.get_extra_info('peername')
6 print(f"Connection from {addr}")
7 while True:
8 data = await reader.read(1024)
9 if not data:
10 break
11 writer.write(data) # echo back
12 await writer.drain()
13 writer.close()
14 await writer.wait_closed()
15 print(f"Connection closed {addr}")
16
17async def main():
18 server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19 addr = server.sockets[0].getsockname()
20 print(f"Serving on {addr}")
21 async with server:
22 await server.serve_forever()
23
24if __name__ == "__main__":
25 asyncio.run(main())-
W komunikacji TCP z użyciem
asyncio,StreamReaderiStreamWriterodgrywają kluczową rolę w asynchronicznym wejściu i wyjściu.StreamReaderasynchronicznie odczytuje dane wysyłane przez klienta, podczas gdyStreamWritersłuży do wysyłania odpowiedzi z serwera do klienta. -
Nawet bez samodzielnej obsługi szczegółowych operacji na gniazdkach, możesz uruchomić asynchroniczny serwer w prosty i wydajny sposób za pomocą
asyncio.start_server. -
Gdy przekażesz funkcję obsługi do
asyncio.start_server, ta funkcja otrzymujereaderiwriterjako swoje argumenty. Korzystając z tych narzędzi, możesz zaimplementować procesy komunikacji w bezpieczniejszy i bardziej przejrzysty sposób niż poprzez bezpośrednią obsługę niskopoziomowych API gniazdek. Na przykład, odbierając dane za pomocąreader.read()i łączącwriter.write()zwriter.drain(), możesz zaimplementować asynchroniczne wysyłanie, które gwarantuje zakończenie transmisji. -
To rozwiązanie nadaje się do obsługi wielu jednoczesnych połączeń i jest idealne dla prostych protokołów lub małych usług TCP.
Obsługa dużych strumieni danych
Podczas przetwarzania dużych plików lub odpowiedzi sekwencyjnie, odczytuj i zapisuj dane w kawałkach, aby zużycie pamięci było niskie. Poniżej znajduje się przykład czytania strumieniowego przy użyciu aiohttp.
Poniższy kod przetwarza odpowiedzi HTTP w kawałkach i zapisuje na dysk w miarę otrzymywania danych.
1import aiohttp
2import asyncio
3import aiofiles
4
5async def stream_download(url: str, dest: str):
6 # Stream download and write to file in chunks
7 async with aiohttp.ClientSession() as session:
8 async with session.get(url) as resp:
9 async with aiofiles.open(dest, 'wb') as f:
10 async for chunk in resp.content.iter_chunked(1024 * 64):
11 await f.write(chunk)
12
13if __name__ == "__main__":
14 asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))-
Ten kod nie ładuje dużego pliku naraz; zamiast tego odbiera dane w kawałkach (małych częściach) i zapisuje je do pliku asynchronicznie. W rezultacie może pobierać dane szybko i wydajnie, utrzymując niskie zużycie pamięci.
aiohttpasynchronicznie pobiera dane, aaiofileszapisuje je do pliku bez blokowania, co ułatwia równoczesne wykonywanie innych procesów. -
Ten wzorzec nadaje się do wydajnego pobierania i zapisywania dużych plików, jednocześnie minimalizując zużycie pamięci.
Asynchroniczne uruchamianie procesów podrzędnych
Jeśli chcesz uruchomić polecenia zewnętrzne asynchronicznie i czytać ich wyjście w czasie rzeczywistym, przydatne jest asyncio.create_subprocess_exec.
Poniżej znajduje się przykład uruchamiania polecenia zewnętrznego i odczytu standardowego wyjścia w czasie rzeczywistym.
1import asyncio
2
3async def run_cmd(cmd):
4 # Run external command asynchronously and capture output line by line
5 proc = await asyncio.create_subprocess_exec(
6 *cmd,
7 stdout=asyncio.subprocess.PIPE,
8 stderr=asyncio.subprocess.PIPE
9 )
10
11 async def read_stream(stream, name):
12 while True:
13 line = await stream.readline()
14 if not line:
15 break
16 print(f"[{name}] {line.decode().rstrip()}")
17
18 await asyncio.gather(
19 read_stream(proc.stdout, "stdout"),
20 read_stream(proc.stderr, "stderr"),
21 )
22 await proc.wait()
23 return proc.returncode
24
25if __name__ == "__main__":
26 asyncio.run(run_cmd(["python", "--version"]))- Dzięki asynchronicznemu kontrolowaniu procesów podrzędnych możesz obsługiwać logi z narzędzi zewnętrznych w czasie rzeczywistym lub uruchamiać wiele procesów równolegle.
Obsługa anulowania i limitów czasowych
Zadania asynchroniczne mogą zostać anulowane. Aby zaimplementować limit czasu, wystarczy użyć asyncio.wait_for.
Poniżej znajduje się przykład uruchamiania zadania z limitem czasu.
1import asyncio
2
3async def slow_task():
4 await asyncio.sleep(5)
5 return "done"
6
7async def main():
8 try:
9 result = await asyncio.wait_for(slow_task(), timeout=2.0)
10 print("Result:", result)
11 except asyncio.TimeoutError:
12 print("Task timed out")
13
14if __name__ == "__main__":
15 asyncio.run(main())wait_forrzuca wyjątekTimeoutError, jeśli limit czasu zostanie przekroczony, i anuluje zadanie w razie potrzeby. Należy zachować ostrożność przy propagacji anulowania zadań i sprzątaniu po nich.
Kontrola współbieżności (Semaphore)
Ponieważ wykonywanie wielu współbieżnych połączeń lub zapytań może wyczerpać zasoby, ograniczaj współbieżność za pomocą asyncio.Semaphore.
Poniżej znajduje się przykład ograniczania liczby równoczesnych pobrań za pomocą semafora.
1import asyncio
2import aiohttp
3
4semaphore = asyncio.Semaphore(3) # allow up to 3 concurrent tasks
5
6async def limited_fetch(session, url):
7 async with semaphore:
8 async with session.get(url) as resp:
9 return url, resp.status
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [limited_fetch(session, u) for u in urls]
14 results = await asyncio.gather(*tasks)
15 print(results)
16
17if __name__ == "__main__":
18 urls = ["https://codesparklab.com/json/example.json"] * 10
19 asyncio.run(main(urls))- Dzięki tej metodzie możesz delikatnie uzyskiwać dostęp do usług zewnętrznych i unikać przeciążenia własnego procesu.
Obsługa błędów i strategie ponawiania
Błędy nieuchronnie pojawiają się także w przetwarzaniu asynchronicznym. Odpowiednio przechwytuj wyjątki i wdrażaj strategie ponawiania, takie jak wykładnicze opóźnianie próby ponownej.
Poniżej znajduje się przykład implementacji prostego ponawiania do N razy.
1import asyncio
2import aiohttp
3import random
4
5async def fetch_with_retry(session, url, retries=3):
6 for attempt in range(1, retries + 1):
7 try:
8 async with session.get(url) as resp:
9 resp.raise_for_status()
10 text = await resp.text()
11 return text
12 except Exception as e:
13 if attempt == retries:
14 raise
15 await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18 async with aiohttp.ClientSession() as session:
19 text = await fetch_with_retry(session, "https://codesparklab.com/")
20 print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23 asyncio.run(main())- Właściwa logika ponawiania jest ważna dla równowagi między spójnością a kontrolą ruchu.
Wskazówki dotyczące debugowania i logowania
W przetwarzaniu asynchronicznym zadania przebiegają równolegle, co może utrudniać identyfikację przyczyny problemów. Aby skutecznie śledzić problemy, pamiętaj o poniższych kwestiach – usprawni to debugowanie.
- Wyjątki pochodzące z
asyncio.run()iTaskłatwo przeoczyć, więc koniecznie loguj nieobsłużone wyjątki. - Podczas korzystania z
logging, uwzględnienie nazwy korutyny lub, w Pythonie 3.8 i nowszych,task.get_name()w logach ułatwia śledzenie. - Możesz sprawdzić aktualny status zadań za pomocą
asyncio.Task.all_tasks(). Jednakże to API jest przeznaczone do celów debugowania i powinno być używane ostrożnie w środowiskach produkcyjnych, aby uniknąć problemów z wydajnością lub nieoczekiwanych zakłóceń.
Uwagi dotyczące wydajności
Chociaż programowanie asynchroniczne znakomicie radzi sobie z oczekiwaniem na operacje I/O, niewłaściwe użycie może pogorszyć wydajność. Optymalizuj, pamiętając o następujących kwestiach:.
- Przetwarzanie asynchroniczne sprawdza się przy zadaniach ograniczonych przez I/O, ale nie nadaje się do zadań ograniczonych przez CPU; w takich przypadkach użyj puli procesów.
- Korzystając z puli wątków lub procesów, weź pod uwagę ich wielkość i charakter zadań.
- Jeśli uruchamiasz jednocześnie wiele małych zadań, narzut na pętlę zdarzeń rośnie – stosuj więc grupowanie zadań lub semafory, aby dostosować obciążenie.
Podsumowanie
Asynchroniczne I/O w Pythonie to potężny mechanizm, który skutecznie wykorzystuje czas oczekiwania na I/O i efektywnie równolegle realizuje operacje sieciowe oraz plikowe. Łącząc techniki takie jak asyncio, aiohttp, aiofiles oraz run_in_executor, możesz elastycznie tworzyć praktyczne aplikacje asynchroniczne. Wykorzystując async with do automatycznego przejmowania i zwalniania zasobów, możesz bezpiecznie i niezawodnie zarządzać asynchronicznymi zasobami, takimi jak pliki, sesje HTTP i blokady. Wprowadzając odpowiednią obsługę błędów i zarządzanie współbieżnością, możesz bezpiecznie uruchamiać wysoko niezawodne programy asynchroniczne.
Możesz śledzić ten artykuł, korzystając z Visual Studio Code na naszym kanale YouTube. Proszę również sprawdzić nasz kanał YouTube.