비동기 입력/출력

비동기 입력/출력

이 글은 비동기 입력/출력에 대해 설명합니다.

이 가이드는 파이썬에서 실제로 유용한 비동기 입력/출력의 개념과 패턴을 단계별로 쉽게 설명합니다.

YouTube Video

비동기 입력/출력(I/O)

비동기 I/O의 개념

비동기 I/O는 파일 조작이나 네트워크 통신 등 시간이 많이 걸리는 입출력을 기다리는 동안 다른 작업을 병렬로 실행할 수 있게 해 주는 메커니즘입니다. 파이썬에서는 표준 비동기 프레임워크로 asyncio가 제공되며, 많은 라이브러리들이 이 메커니즘을 따르도록 설계되어 있습니다.

기본: async / await 및 이벤트 루프

먼저, 기본 코루틴 작성 방법과 asyncio.gather를 사용해 여러 코루틴을 동시에 실행하는 예시를 소개합니다.

아래 코드는 비동기 함수를 정의하고 동시에 실행하는 최소 예제입니다. sleep 함수를 활용하여 병렬 처리를 보여줍니다.

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • 이 코드는 asyncio.run()으로 이벤트 루프를 시작하고 세 개의 코루틴을 동시에 실행합니다.

async with 및 비동기 컨텍스트 매니저

비동기 처리에서는 연결을 열거나 파일을 닫는 등 리소스 관리가 복잡해지기 쉽습니다. 이럴 때 async with를 사용하는 비동기 컨텍스트 매니저가 유용하게 쓰입니다. 이 문법은 동기 with 문과 동일하게 사용하지만, 내부 처리가 비동기로 동작하므로 async/await 흐름에 자연스럽게 맞춰집니다.

async with를 사용하는 주된 이유는 두 가지입니다:.

  • 연결, 파일 핸들, 세션 등 리소스를 확실하게 정리하기 위해서입니다. 비정상적인 종료가 발생해도 리소스가 확실하게 해제되어 안심할 수 있습니다.
  • 비동기적으로 연결 설정 및 해제, 플러시 등 초기화와 정리 작업을 자동화하기 위해서입니다. 수작업 코딩의 번거로움을 줄이고, 코드를 명확하게 할 수 있습니다.

아래는 간단한 비동기 컨텍스트 매니저를 직접 작성하는 예시입니다.

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • __aenter____aexit__를 정의하면 async with를 사용할 수 있습니다.
  • async with 블록 진입 및 종료 시 처리가 비동기적으로 안전하게 실행됩니다.

비동기 파일 I/O (aiofiles)

파일 조작은 대표적인 블로킹 작업입니다. aiofiles를 사용하면 파일 처리를 안전하게 비동기로 처리할 수 있습니다. 내부적으로 스레드 풀을 사용하며, async with로 파일을 확실히 닫아줍니다.

아래 예시에서는 여러 파일을 비동기로 병렬 읽기하는 방법을 보여줍니다. 이 코드를 실행하려면 pip install aiofilesaiofiles를 설치해야 합니다.

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • 이 코드는 각 파일의 읽기를 병렬로 처리합니다. aiofiles는 내부적으로 스레드 풀을 자주 사용하며, 이를 통해 블로킹 파일 I/O를 비동기 인터페이스로 다룰 수 있습니다.

비동기 HTTP 클라이언트 (aiohttp)

네트워크 I/O의 대표적인 예로, HTTP 요청을 비동기로 처리하는 방법을 설명합니다. 특히 많은 HTTP 요청을 병렬로 처리할 때 강력합니다.

아래는 aiohttp를 사용해 여러 URL을 병렬로 받아오는 예시입니다. pip install aiohttpaiohttp를 설치해야 합니다.

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • asyncio.as_completed를 사용하면 작업이 완료되는 순서대로 결과를 처리할 수 있습니다. 이 방법은 많은 요청을 효율적으로 처리하는 데 유용합니다.

블로킹 I/O와의 공존: run_in_executor

비동기 코드에서 CPU 집약적 작업이나 기존 블로킹 API를 다룰 때는 loop.run_in_executor를 통해 ThreadPoolExecutor 또는 ProcessPoolExecutor를 사용하세요.

아래 코드는 블로킹 I/O가 발생하는 작업을 스레드 풀로 동시에 실행하는 예제입니다.

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • run_in_executor를 활용하면 기존 동기 코드를 크게 수정하지 않고도 비동기 흐름에 통합할 수 있습니다. 하지만 스레드 수와 CPU 부하에는 주의해야 합니다.
  • ProcessPoolExecutor는 CPU 집약적 작업에 적합합니다.

비동기 서버: asyncio 기반 TCP 에코 서버

소켓을 직접 다루고 싶다면 asyncio.start_server로 손쉽게 비동기 서버를 구축할 수 있습니다.

아래 예시는 클라이언트에서 받은 데이터를 그대로 반환하는 간단한 에코 서버입니다.

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • asyncio를 이용한 TCP 통신에서 StreamReaderStreamWriter는 비동기 입출력의 중심적인 역할을 합니다. StreamReader는 클라이언트로부터 전송된 데이터를 비동기적으로 읽고, StreamWriter는 서버에서 클라이언트로 응답을 보내는 데 사용됩니다.

  • 소켓의 세부적인 작업을 직접 다루지 않아도, asyncio.start_server를 사용하면 간단하고 효율적으로 비동기 서버를 실행할 수 있습니다.

  • asyncio.start_server에 핸들러 함수를 전달하면, 그 함수는 인수로 readerwriter를 받게 됩니다. 이들을 사용함으로써 저수준 소켓 API를 직접 다루는 것보다 더 안전하고 명확하게 통신 처리를 구현할 수 있습니다. 예를 들어, reader.read()로 데이터를 받고, writer.write()writer.drain()을 조합하면 전송이 완료되는 것을 보장하는 비동기 전송을 구현할 수 있습니다.

  • 이 방식은 많은 동시 접속을 처리하기에 적합하며, 간단한 프로토콜 또는 소규모 TCP 서비스에 이상적입니다.

대용량 스트리밍 데이터 처리

대용량 파일이나 응답을 순차적으로 처리할 때는 데이터를 청크 단위로 읽고 써서 메모리 사용을 최소화해야 합니다. 아래는 aiohttp를 사용한 스트리밍 읽기 예시입니다.

아래 코드는 HTTP 응답을 청크 단위로 받아올 때마다 바로 디스크에 기록합니다.

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • 이 코드는 대용량 파일을 한 번에 로드하지 않고, 데이터를 청크(작은 조각) 단위로 받아서 파일에 비동기적으로 기록합니다. 그 결과, 메모리 사용량을 낮게 유지하면서 빠르고 효율적으로 다운로드를 수행할 수 있습니다. aiohttp는 데이터를 비동기적으로 받아오고, aiofiles는 블로킹 없이 파일에 기록하여 다른 프로세스와 쉽게 병행 실행할 수 있습니다.

  • 이 방법은 대용량 파일을 다운로드하고 저장할 때 메모리 사용을 최소화하면서 효율적으로 처리할 수 있습니다.

비동기 서브프로세스 실행

외부 명령을 비동기로 실행하고 출력을 실시간으로 읽고 싶다면 asyncio.create_subprocess_exec가 유용합니다.

아래는 외부 명령을 실행하고 표준 출력을 실시간으로 읽는 예시입니다.

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • 서브프로세스를 비동기로 제어하면 외부 도구의 로그를 실시간으로 처리하거나 여러 프로세스를 병렬로 실행할 수 있습니다.

취소 및 타임아웃 처리

비동기 작업은 취소할 수 있습니다. 타임아웃 처리는 asyncio.wait_for로 간단히 구현할 수 있습니다.

아래는 타임아웃과 함께 작업을 실행하는 예시입니다.

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • wait_for는 제한 시간이 지나면 TimeoutError를 발생시키고 필요시 작업을 취소합니다. 작업 취소 전파 및 정리에는 주의해야 합니다.

동시성 제어(Semaphore)

동시 연결이나 요청이 많아지면 리소스가 고갈될 수 있으므로 asyncio.Semaphore로 동시성을 제한해야 합니다.

아래는 세마포어를 사용해 동시 다운로드 수를 제한하는 예시입니다.

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • 이 방법으로 외부 서비스를 무리 없이 접근하고, 자체 프로세스의 과부하도 방지할 수 있습니다.

오류 처리 및 재시도 전략

비동기 처리에서도 오류는 불가피하게 발생합니다. 예외를 적절하게 포착하고, 지수 백오프 등 재시도 전략을 구현하세요.

아래는 최대 N회까지 간단히 재시도하는 구현 예시입니다.

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • 적절한 재시도 로직은 일관성과 트래픽 제어의 균형을 맞추는 데 중요합니다.

디버깅 및 로깅 팁

비동기 처리에서는 여러 작업이 동시 진행되어 문제의 원인을 파악하기 어렵습니다. 아래 포인트에 유의하면 이슈 추적 및 디버깅을 보다 효율적으로 할 수 있습니다.

  • asyncio.run()이나 Task에서 발생하는 예외는 놓치기 쉬우므로, 처리되지 않은 예외는 반드시 로깅해야 합니다.
  • logging을 사용할 때 코루틴 이름이나 Python 3.8 이상에서는 task.get_name()을 로그에 포함시키면 추적이 더 쉬워집니다.
  • asyncio.Task.all_tasks()를 사용하여 현재 태스크의 상태를 확인할 수 있습니다. 하지만 이 API는 디버깅 용도로 설계되었으므로, 성능 문제나 예기치 않은 간섭을 피하기 위해 실제 운영 환경에서는 신중하게 사용해야 합니다.

성능 고려사항

비동기 프로그래밍은 I/O 대기 처리에 강점이 있지만 잘못 사용하면 오히려 성능이 저하될 수 있습니다. 아래 포인트를 유념하여 최적화하세요:.

  • 비동기 처리는 I/O 바운드 작업에 강하고, CPU 바운드 작업에는 적합하지 않으므로 이런 경우 프로세스 풀을 사용하세요.
  • 스레드 풀이나 프로세스 풀을 사용할 때는 풀 크기와 작업 특성을 고려하세요.
  • 작은 작업을 한 번에 많이 시작하면 이벤트 루프 오버헤드가 증가하므로 일괄 처리나 세마포어로 조정하세요.

요약

파이썬의 비동기 I/O는 I/O 대기 시간을 효과적으로 활용하며, 네트워크 및 파일 작업을 병렬로 효율적으로 실행할 수 있는 강력한 메커니즘입니다. asyncio, aiohttp, aiofiles, run_in_executor 등의 기술을 조합하면 실용적인 비동기 애플리케이션을 유연하게 구축할 수 있습니다. async with를 사용해 리소스 획득 및 반환을 자동화하면 파일, HTTP 세션, 락 등 비동기 리소스를 안전하고 확실하게 관리할 수 있습니다. 적절한 오류 처리와 동시성 제어를 도입하면 높은 신뢰성의 비동기 프로그램을 안전하게 운영할 수 있습니다.

위의 기사를 보면서 Visual Studio Code를 사용해 우리 유튜브 채널에서 함께 따라할 수 있습니다. 유튜브 채널도 확인해 주세요.

YouTube Video