Асинхронный ввод/вывод

Асинхронный ввод/вывод

В этой статье объясняется асинхронный ввод/вывод.

В этом руководстве постепенно, шаг за шагом, объясняются концепции и шаблоны асинхронного ввода/вывода, которые практически полезны в Python.

YouTube Video

Асинхронный ввод/вывод (I/O)

Понятие асинхронного ввода/вывода

Асинхронный ввод/вывод — это механизм, позволяющий выполнять другие операции параллельно во время ожидания длительных операций ввода/вывода, таких как работа с файлами или сетевое взаимодействие. В Python стандартным асинхронным фреймворком является asyncio, и множество библиотек разработаны для работы с этим механизмом.

Основы: async / await и событийный цикл

Сначала рассмотрим, как писать простые корутины, и пример одновременного запуска нескольких корутин с помощью asyncio.gather.

Код ниже — минимальный пример определения и одновременного запуска асинхронных функций. Функция sleep используется для демонстрации параллельного выполнения.

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Этот код запускает событийный цикл с помощью asyncio.run() и одновременно выполняет три корутины.

async with и асинхронные контекстные менеджеры

В асинхронной обработке управление ресурсами, такими как открытие соединений и закрытие файлов, может легко усложниться. В таких случаях становятся полезными асинхронные контекстные менеджеры с использованием async with. Этот синтаксис используется так же, как и синхронный оператор with, но внутренняя обработка выполняется асинхронно, что органично вписывается в поток async/await.

Есть две основные причины использовать async with:.

  • Для надежной очистки ресурсов, таких как соединения, файловые дескрипторы или сессии. Вы можете быть уверены, что ресурсы будут корректно освобождены даже при аварийном завершении.
  • Для автоматизации задач инициализации и очистки, таких как установка или закрытие соединений и сброс данных, в асинхронном режиме. Это избавляет от необходимости ручного управления ресурсами и делает код более понятным.

Ниже приведён пример создания простого асинхронного контекстного менеджера с нуля.

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • Определив методы __aenter__ и __aexit__, вы можете использовать async with.
  • Обработка при входе и выходе из блока async with выполняется асинхронно и безопасно.

Асинхронный ввод/вывод файлов (aiofiles)

Операции с файлами — классический пример блокирующих операций. С помощью aiofiles вы можете безопасно выполнять файловые операции асинхронно. Внутри используется пул потоков, и файлы корректно закрываются с помощью async with.

Следующий пример демонстрирует параллельное асинхронное чтение нескольких файлов. Перед запуском этого кода необходимо установить aiofiles с помощью pip install aiofiles.

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • Этот код параллелит чтение каждого файла. aiofiles часто использует пул потоков внутри, позволяя обрабатывать блокирующий файловый ввод/вывод через асинхронный интерфейс.

Асинхронный HTTP-клиент (aiohttp)

Как классический пример сетевого ввода/вывода, рассмотрим выполнение HTTP-запросов асинхронно. Особенно мощно это работает, когда необходимо выполнить большое количество параллельных HTTP-запросов.

Ниже приведён пример одновременного получения данных с нескольких URL с помощью aiohttp. Для этого потребуется установить aiohttp с помощью pip install aiohttp.

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • Используя asyncio.as_completed, можно обрабатывать результаты по мере завершения задач. Это полезно для эффективной обработки большого количества запросов.

Сосуществование с блокирующим вводом-выводом: run_in_executor

При работе с ресурсоемкими задачами или существующими блокирующими API в асинхронном коде используйте ThreadPoolExecutor или ProcessPoolExecutor через loop.run_in_executor.

Следующий код — пример параллельного выполнения задач, предполагающих блокирующий ввод/вывод, с использованием пула потоков.

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Используя run_in_executor, вы можете включать существующий синхронный код в асинхронные процессы без значительных изменений. Однако следует обращать внимание на количество потоков и загрузку процессора.
  • ProcessPoolExecutor подходит для задач, требующих значительных вычислений процессора.

Асинхронный сервер: TCP echo-сервер на базе asyncio

Если вы хотите напрямую работать с сокетами, асинхронный сервер можно легко создать с помощью asyncio.start_server.

Следующий пример — простой echo-сервер, который возвращает данные ровно так, как их отправил клиент.

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • В TCP-коммуникациях с использованием asyncio объекты StreamReader и StreamWriter играют центральную роль в асинхронном вводе-выводе. StreamReader асинхронно читает данные, отправленные клиентом, а StreamWriter используется для отправки ответов от сервера обратно клиенту.

  • Даже не управляя низкоуровневыми операциями с сокетами, вы можете просто и эффективно запустить асинхронный сервер с помощью asyncio.start_server.

  • Когда вы передаёте функцию-обработчик в asyncio.start_server, эта функция получает reader и writer в качестве аргументов. Используя их, вы можете реализовать процессы коммуникации более безопасно и понятно, чем при непосредственной работе с низкоуровневыми API сокетов. Например, получая данные с помощью reader.read() и комбинируя writer.write() с writer.drain(), вы можете реализовать асинхронную отправку, чтобы убедиться, что передача завершена.

  • Такая архитектура подходит для обработки большого количества одновременных соединений и идеальна для простых протоколов или небольших TCP-сервисов.

Обработка больших потоковых данных

При последовательной обработке больших файлов или ответов читайте и записывайте данные частями, чтобы снизить использование памяти. Ниже приведён пример потокового чтения с помощью aiohttp.

Данный код обрабатывает HTTP-ответы по частям и записывает их на диск по мере поступления данных.

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • Этот код не загружает большой файл целиком; вместо этого он получает данные частями (небольшими кусками) и асинхронно записывает их в файл. В результате загрузка выполняется быстро и эффективно при низком потреблении памяти. aiohttp асинхронно получает данные, а aiofiles записывает их в файл без блокировки, что позволяет легко запускать этот процесс вместе с другими.

  • Этот подход подходит для эффективного скачивания и сохранения больших файлов с минимальным использованием памяти.

Асинхронное выполнение подпроцессов

Если вы хотите запускать внешние команды асинхронно и считывать их вывод в реальном времени, подойдет asyncio.create_subprocess_exec.

Ниже приведён пример запуска внешней команды и чтения её стандартного вывода в реальном времени.

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • Управляя подпроцессами асинхронно, вы можете обрабатывать логи внешних инструментов в реальном времени или запускать несколько процессов параллельно.

Обработка отмены и тайм-аутов

Асинхронные задачи могут быть отменены. Для реализации тайм-аута удобно использовать asyncio.wait_for.

Ниже приведён пример запуска задачи с тайм-аутом.

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • wait_for выбрасывает исключение TimeoutError при превышении времени ожидания и при необходимости отменяет задачу. Будьте внимательны при распространении отмены задачи и при её очистке.

Ограничение параллелизма (Semaphore)

Поскольку множество параллельных соединений или запросов может исчерпать ресурсы, ограничьте параллелизм с помощью asyncio.Semaphore.

Следующий пример демонстрирует ограничение одновременных загрузок с помощью семафора.

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • С помощью такого подхода вы сможете обращаться к внешним сервисам аккуратно и не перегружать собственный процесс.

Обработка ошибок и стратегии повторных попыток

Ошибки неизбежны даже в асинхронной обработке. Корректно обрабатывайте исключения и реализуйте стратегии повторных попыток, такие как экспоненциальная задержка (exponential backoff).

Ниже приведена примерная реализация простых повторных попыток до N раз.

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • Правильная логика повторных попыток важна для баланса между надёжностью и управлением трафиком.

Советы по отладке и регистрации (логированию)

В асинхронной обработке задачи выполняются параллельно, что затрудняет определение причин проблем. Чтобы эффективно отслеживать проблемы, учитывайте следующие моменты — это упростит отладку.

  • Исключения из asyncio.run() и задач (Task) легко пропустить, поэтому обязательно логируйте необработанные ошибки.
  • При использовании logging добавление имени корутины или, в Python 3.8 и выше, task.get_name() в логи облегчает отслеживание.
  • Вы можете проверить текущее состояние задач с помощью asyncio.Task.all_tasks(). Однако этот API предназначен для целей отладки и должен использоваться с осторожностью в продуктивных средах во избежание проблем с производительностью или непредвиденных вмешательств.

Моменты, связанные с производительностью

Хотя асинхронное программирование превосходно справляется с ожиданием I/O, неправильное использование может привести к снижению производительности. Оптимизируйте, принимая во внимание следующие моменты:.

  • Асинхронная обработка отлично подходит для задач, связанных с ожиданием I/O, но не для задач, нагружающих процессор; в таких случаях используйте пул процессов.
  • При использовании пулов потоков или процессов учитывайте их размер и природу выполняемых задач.
  • Если запускать много мелких задач одновременно, нагрузка на событийный цикл возрастает — используйте группировку (batching) или семафоры для регулировки.

Резюме

Асинхронный ввод/вывод в Python — мощный механизм, который эффективно использует время ожидания I/O и позволяет одновременно выполнять сетевые и файловые операции. Комбинируя такие техники, как asyncio, aiohttp, aiofiles и run_in_executor, вы можете гибко строить практичные асинхронные приложения. Используя async with для автоматизации получения и освобождения ресурсов, вы сможете безопасно и надёжно управлять асинхронными ресурсами, такими как файлы, HTTP-сессии и блокировки. Правильная обработка ошибок и управление параллелизмом позволяют безопасно запускать высоконадёжные асинхронные программы.

Вы можете следовать этой статье, используя Visual Studio Code на нашем YouTube-канале. Пожалуйста, также посмотрите наш YouTube-канал.

YouTube Video