Асинхронный ввод/вывод
В этой статье объясняется асинхронный ввод/вывод.
В этом руководстве постепенно, шаг за шагом, объясняются концепции и шаблоны асинхронного ввода/вывода, которые практически полезны в Python.
YouTube Video
Асинхронный ввод/вывод (I/O)
Понятие асинхронного ввода/вывода
Асинхронный ввод/вывод — это механизм, позволяющий выполнять другие операции параллельно во время ожидания длительных операций ввода/вывода, таких как работа с файлами или сетевое взаимодействие. В Python стандартным асинхронным фреймворком является asyncio, и множество библиотек разработаны для работы с этим механизмом.
Основы: async / await и событийный цикл
Сначала рассмотрим, как писать простые корутины, и пример одновременного запуска нескольких корутин с помощью asyncio.gather.
Код ниже — минимальный пример определения и одновременного запуска асинхронных функций. Функция sleep используется для демонстрации параллельного выполнения.
1import asyncio
2
3async def worker(name: str, delay: float):
4 # Simulate I/O-bound work by sleeping
5 print(f"{name} started")
6 await asyncio.sleep(delay)
7 print(f"{name} finished after {delay}s")
8 return name, delay
9
10async def main():
11 # Run multiple coroutines concurrently
12 tasks = [
13 worker("A", 1.5),
14 worker("B", 1.0),
15 worker("C", 0.5),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Этот код запускает событийный цикл с помощью
asyncio.run()и одновременно выполняет три корутины.
async with и асинхронные контекстные менеджеры
В асинхронной обработке управление ресурсами, такими как открытие соединений и закрытие файлов, может легко усложниться. В таких случаях становятся полезными асинхронные контекстные менеджеры с использованием async with. Этот синтаксис используется так же, как и синхронный оператор with, но внутренняя обработка выполняется асинхронно, что органично вписывается в поток async/await.
Есть две основные причины использовать async with:.
- Для надежной очистки ресурсов, таких как соединения, файловые дескрипторы или сессии. Вы можете быть уверены, что ресурсы будут корректно освобождены даже при аварийном завершении.
- Для автоматизации задач инициализации и очистки, таких как установка или закрытие соединений и сброс данных, в асинхронном режиме. Это избавляет от необходимости ручного управления ресурсами и делает код более понятным.
Ниже приведён пример создания простого асинхронного контекстного менеджера с нуля.
1import asyncio
2
3class AsyncResource:
4 async def __aenter__(self):
5 print("Opening resource...")
6 await asyncio.sleep(0.5)
7 print("Resource opened")
8 return self
9
10 async def __aexit__(self, exc_type, exc, tb):
11 print("Closing resource...")
12 await asyncio.sleep(0.5)
13 print("Resource closed")
14
15async def main():
16 async with AsyncResource() as r:
17 print("Using resource...")
18
19if __name__ == "__main__":
20 asyncio.run(main())- Определив методы
__aenter__и__aexit__, вы можете использоватьasync with. - Обработка при входе и выходе из блока
async withвыполняется асинхронно и безопасно.
Асинхронный ввод/вывод файлов (aiofiles)
Операции с файлами — классический пример блокирующих операций. С помощью aiofiles вы можете безопасно выполнять файловые операции асинхронно. Внутри используется пул потоков, и файлы корректно закрываются с помощью async with.
Следующий пример демонстрирует параллельное асинхронное чтение нескольких файлов. Перед запуском этого кода необходимо установить aiofiles с помощью pip install aiofiles.
1# pip install aiofiles
2import asyncio
3import aiofiles
4from pathlib import Path
5
6async def read_file(path: Path):
7 # Read file content asynchronously
8 async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
9 contents = await f.read()
10 return path.name, len(contents)
11
12async def main():
13 files = [
14 Path("example1.txt"),
15 Path("example2.txt"),
16 Path("example3.txt")
17 ]
18 tasks = [read_file(p) for p in files]
19 results = await asyncio.gather(*tasks)
20 for name, size in results:
21 print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24 asyncio.run(main())- Этот код параллелит чтение каждого файла.
aiofilesчасто использует пул потоков внутри, позволяя обрабатывать блокирующий файловый ввод/вывод через асинхронный интерфейс.
Асинхронный HTTP-клиент (aiohttp)
Как классический пример сетевого ввода/вывода, рассмотрим выполнение HTTP-запросов асинхронно. Особенно мощно это работает, когда необходимо выполнить большое количество параллельных HTTP-запросов.
Ниже приведён пример одновременного получения данных с нескольких URL с помощью aiohttp. Для этого потребуется установить aiohttp с помощью pip install aiohttp.
1# pip install aiohttp
2import asyncio
3import aiohttp
4
5async def fetch(session: aiohttp.ClientSession, url: str):
6 # Fetch a URL asynchronously and return status and size
7 async with session.get(url) as resp:
8 text = await resp.text()
9 return url, resp.status, len(text)
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [fetch(session, u) for u in urls]
14 for coro in asyncio.as_completed(tasks):
15 url, status, size = await coro
16 print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19 urls = [
20 "https://codesparklab.com/json/example1.json",
21 "https://codesparklab.com/json/example2.json",
22 "https://codesparklab.com/json/example3.json",
23 ]
24 asyncio.run(main(urls))- Используя
asyncio.as_completed, можно обрабатывать результаты по мере завершения задач. Это полезно для эффективной обработки большого количества запросов.
Сосуществование с блокирующим вводом-выводом: run_in_executor
При работе с ресурсоемкими задачами или существующими блокирующими API в асинхронном коде используйте ThreadPoolExecutor или ProcessPoolExecutor через loop.run_in_executor.
Следующий код — пример параллельного выполнения задач, предполагающих блокирующий ввод/вывод, с использованием пула потоков.
1import asyncio
2from concurrent.futures import ThreadPoolExecutor
3import time
4
5def blocking_io(n):
6 # Simulate a blocking I/O or CPU-bound function
7 time.sleep(n)
8 return f"slept {n}s"
9
10async def main():
11 loop = asyncio.get_running_loop()
12 with ThreadPoolExecutor() as pool:
13 tasks = [
14 loop.run_in_executor(pool, blocking_io, 1),
15 loop.run_in_executor(pool, blocking_io, 2),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Blocking results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Используя
run_in_executor, вы можете включать существующий синхронный код в асинхронные процессы без значительных изменений. Однако следует обращать внимание на количество потоков и загрузку процессора. ProcessPoolExecutorподходит для задач, требующих значительных вычислений процессора.
Асинхронный сервер: TCP echo-сервер на базе asyncio
Если вы хотите напрямую работать с сокетами, асинхронный сервер можно легко создать с помощью asyncio.start_server.
Следующий пример — простой echo-сервер, который возвращает данные ровно так, как их отправил клиент.
1import asyncio
2
3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
4 # Handle a single client: read data and echo it back
5 addr = writer.get_extra_info('peername')
6 print(f"Connection from {addr}")
7 while True:
8 data = await reader.read(1024)
9 if not data:
10 break
11 writer.write(data) # echo back
12 await writer.drain()
13 writer.close()
14 await writer.wait_closed()
15 print(f"Connection closed {addr}")
16
17async def main():
18 server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19 addr = server.sockets[0].getsockname()
20 print(f"Serving on {addr}")
21 async with server:
22 await server.serve_forever()
23
24if __name__ == "__main__":
25 asyncio.run(main())-
В TCP-коммуникациях с использованием
asyncioобъектыStreamReaderиStreamWriterиграют центральную роль в асинхронном вводе-выводе.StreamReaderасинхронно читает данные, отправленные клиентом, аStreamWriterиспользуется для отправки ответов от сервера обратно клиенту. -
Даже не управляя низкоуровневыми операциями с сокетами, вы можете просто и эффективно запустить асинхронный сервер с помощью
asyncio.start_server. -
Когда вы передаёте функцию-обработчик в
asyncio.start_server, эта функция получаетreaderиwriterв качестве аргументов. Используя их, вы можете реализовать процессы коммуникации более безопасно и понятно, чем при непосредственной работе с низкоуровневыми API сокетов. Например, получая данные с помощьюreader.read()и комбинируяwriter.write()сwriter.drain(), вы можете реализовать асинхронную отправку, чтобы убедиться, что передача завершена. -
Такая архитектура подходит для обработки большого количества одновременных соединений и идеальна для простых протоколов или небольших TCP-сервисов.
Обработка больших потоковых данных
При последовательной обработке больших файлов или ответов читайте и записывайте данные частями, чтобы снизить использование памяти. Ниже приведён пример потокового чтения с помощью aiohttp.
Данный код обрабатывает HTTP-ответы по частям и записывает их на диск по мере поступления данных.
1import aiohttp
2import asyncio
3import aiofiles
4
5async def stream_download(url: str, dest: str):
6 # Stream download and write to file in chunks
7 async with aiohttp.ClientSession() as session:
8 async with session.get(url) as resp:
9 async with aiofiles.open(dest, 'wb') as f:
10 async for chunk in resp.content.iter_chunked(1024 * 64):
11 await f.write(chunk)
12
13if __name__ == "__main__":
14 asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))-
Этот код не загружает большой файл целиком; вместо этого он получает данные частями (небольшими кусками) и асинхронно записывает их в файл. В результате загрузка выполняется быстро и эффективно при низком потреблении памяти.
aiohttpасинхронно получает данные, аaiofilesзаписывает их в файл без блокировки, что позволяет легко запускать этот процесс вместе с другими. -
Этот подход подходит для эффективного скачивания и сохранения больших файлов с минимальным использованием памяти.
Асинхронное выполнение подпроцессов
Если вы хотите запускать внешние команды асинхронно и считывать их вывод в реальном времени, подойдет asyncio.create_subprocess_exec.
Ниже приведён пример запуска внешней команды и чтения её стандартного вывода в реальном времени.
1import asyncio
2
3async def run_cmd(cmd):
4 # Run external command asynchronously and capture output line by line
5 proc = await asyncio.create_subprocess_exec(
6 *cmd,
7 stdout=asyncio.subprocess.PIPE,
8 stderr=asyncio.subprocess.PIPE
9 )
10
11 async def read_stream(stream, name):
12 while True:
13 line = await stream.readline()
14 if not line:
15 break
16 print(f"[{name}] {line.decode().rstrip()}")
17
18 await asyncio.gather(
19 read_stream(proc.stdout, "stdout"),
20 read_stream(proc.stderr, "stderr"),
21 )
22 await proc.wait()
23 return proc.returncode
24
25if __name__ == "__main__":
26 asyncio.run(run_cmd(["python", "--version"]))- Управляя подпроцессами асинхронно, вы можете обрабатывать логи внешних инструментов в реальном времени или запускать несколько процессов параллельно.
Обработка отмены и тайм-аутов
Асинхронные задачи могут быть отменены. Для реализации тайм-аута удобно использовать asyncio.wait_for.
Ниже приведён пример запуска задачи с тайм-аутом.
1import asyncio
2
3async def slow_task():
4 await asyncio.sleep(5)
5 return "done"
6
7async def main():
8 try:
9 result = await asyncio.wait_for(slow_task(), timeout=2.0)
10 print("Result:", result)
11 except asyncio.TimeoutError:
12 print("Task timed out")
13
14if __name__ == "__main__":
15 asyncio.run(main())wait_forвыбрасывает исключениеTimeoutErrorпри превышении времени ожидания и при необходимости отменяет задачу. Будьте внимательны при распространении отмены задачи и при её очистке.
Ограничение параллелизма (Semaphore)
Поскольку множество параллельных соединений или запросов может исчерпать ресурсы, ограничьте параллелизм с помощью asyncio.Semaphore.
Следующий пример демонстрирует ограничение одновременных загрузок с помощью семафора.
1import asyncio
2import aiohttp
3
4semaphore = asyncio.Semaphore(3) # allow up to 3 concurrent tasks
5
6async def limited_fetch(session, url):
7 async with semaphore:
8 async with session.get(url) as resp:
9 return url, resp.status
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [limited_fetch(session, u) for u in urls]
14 results = await asyncio.gather(*tasks)
15 print(results)
16
17if __name__ == "__main__":
18 urls = ["https://codesparklab.com/json/example.json"] * 10
19 asyncio.run(main(urls))- С помощью такого подхода вы сможете обращаться к внешним сервисам аккуратно и не перегружать собственный процесс.
Обработка ошибок и стратегии повторных попыток
Ошибки неизбежны даже в асинхронной обработке. Корректно обрабатывайте исключения и реализуйте стратегии повторных попыток, такие как экспоненциальная задержка (exponential backoff).
Ниже приведена примерная реализация простых повторных попыток до N раз.
1import asyncio
2import aiohttp
3import random
4
5async def fetch_with_retry(session, url, retries=3):
6 for attempt in range(1, retries + 1):
7 try:
8 async with session.get(url) as resp:
9 resp.raise_for_status()
10 text = await resp.text()
11 return text
12 except Exception as e:
13 if attempt == retries:
14 raise
15 await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18 async with aiohttp.ClientSession() as session:
19 text = await fetch_with_retry(session, "https://codesparklab.com/")
20 print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23 asyncio.run(main())- Правильная логика повторных попыток важна для баланса между надёжностью и управлением трафиком.
Советы по отладке и регистрации (логированию)
В асинхронной обработке задачи выполняются параллельно, что затрудняет определение причин проблем. Чтобы эффективно отслеживать проблемы, учитывайте следующие моменты — это упростит отладку.
- Исключения из
asyncio.run()и задач (Task) легко пропустить, поэтому обязательно логируйте необработанные ошибки. - При использовании
loggingдобавление имени корутины или, в Python 3.8 и выше,task.get_name()в логи облегчает отслеживание. - Вы можете проверить текущее состояние задач с помощью
asyncio.Task.all_tasks(). Однако этот API предназначен для целей отладки и должен использоваться с осторожностью в продуктивных средах во избежание проблем с производительностью или непредвиденных вмешательств.
Моменты, связанные с производительностью
Хотя асинхронное программирование превосходно справляется с ожиданием I/O, неправильное использование может привести к снижению производительности. Оптимизируйте, принимая во внимание следующие моменты:.
- Асинхронная обработка отлично подходит для задач, связанных с ожиданием I/O, но не для задач, нагружающих процессор; в таких случаях используйте пул процессов.
- При использовании пулов потоков или процессов учитывайте их размер и природу выполняемых задач.
- Если запускать много мелких задач одновременно, нагрузка на событийный цикл возрастает — используйте группировку (batching) или семафоры для регулировки.
Резюме
Асинхронный ввод/вывод в Python — мощный механизм, который эффективно использует время ожидания I/O и позволяет одновременно выполнять сетевые и файловые операции. Комбинируя такие техники, как asyncio, aiohttp, aiofiles и run_in_executor, вы можете гибко строить практичные асинхронные приложения. Используя async with для автоматизации получения и освобождения ресурсов, вы сможете безопасно и надёжно управлять асинхронными ресурсами, такими как файлы, HTTP-сессии и блокировки. Правильная обработка ошибок и управление параллелизмом позволяют безопасно запускать высоконадёжные асинхронные программы.
Вы можете следовать этой статье, используя Visual Studio Code на нашем YouTube-канале. Пожалуйста, также посмотрите наш YouTube-канал.