파이썬 비동기 처리에서의 동기화 제어

파이썬 비동기 처리에서의 동기화 제어

이 글에서는 파이썬 비동기 처리에서의 동기화 제어에 대해 설명합니다.

여러분은 asyncio의 기본부터 동기화 제어에 일반적으로 사용되는 실용적인 패턴까지 단계별로 배울 수 있습니다.

YouTube Video

파이썬 비동기 처리에서의 동기화 제어

비동기 처리에서는 여러 작업을 동시에 쉽게 실행할 수 있습니다. 하지만 실제로는 동시성 제어, 작업 조정, 공유 자원의 배타적 제어, 무거운 동기식 프로세스 처리, 취소 후 정리 등 더 고급의 조정이 필요합니다.

여기에서는 asyncio의 기초부터 동기화에 자주 사용되는 실용적인 패턴까지 단계적으로 배워봅니다.

소개: 기본 (async / awaitcreate_task)

먼저 최소한의 비동기 코드를 살펴보겠습니다. await는 해당 시점에서 호출된 코루틴이 완료될 때까지 기다리고, asyncio.create_task는 작업을 동시 실행하도록 예약합니다.

 1import asyncio
 2
 3async def worker(name, delay):
 4    # Simulate async work
 5    await asyncio.sleep(delay)
 6    return f"{name} done after {delay}s"
 7
 8async def main():
 9    # Create two tasks that run concurrently.
10    t1 = asyncio.create_task(worker("A", 1))
11    t2 = asyncio.create_task(worker("B", 2))
12
13    # Await both results (this suspends until both are done).
14    result1 = await t1
15    result2 = await t2
16    print(result1, result2)
17
18if __name__ == "__main__":
19    asyncio.run(main())
  • 이 코드는 명시적으로 작업을 생성하고 병렬로 실행한 뒤, 마지막에 await로 결과를 받는 전형적인 패턴입니다. create_task는 동시 실행을 가능하게 합니다.

asyncio.gather, asyncio.wait, asyncio.as_completed의 차이점

여러 코루틴을 동시에 실행할 때, 결과를 어떻게 받고 싶은지에 따라 사용할 방법을 선택합니다. gather는 모두 끝날 때까지 기다려 입력 순서대로 결과를 반환하고, as_completed는 완료되는 즉시 순서에 상관없이 처리할 수 있습니다.

 1import asyncio
 2import random
 3
 4async def job(i):
 5    delay = random.random() * 2
 6    await asyncio.sleep(delay)
 7    return (i, delay)
 8
 9async def gather_example():
10    # gather waits for all tasks and returns results in the same order as input
11    results = await asyncio.gather(*(job(i) for i in range(5)))
12    print("gather order:", results)
13
14async def as_completed_example():
15    # as_completed yields results as they finish (useful to process early results)
16    tasks = [asyncio.create_task(job(i)) for i in range(5)]
17    for coro in asyncio.as_completed(tasks):
18        res = await coro
19        print("completed:", res)
20
21async def main():
22    await gather_example()
23    await as_completed_example()
24
25if __name__ == "__main__":
26    asyncio.run(main())
  • 이 코드에서 보듯이, gather는 입력 순서대로 결과를 반환하여 순서를 보존하고 싶을 때 유용합니다. as_completed는 결과가 완료되는 즉시 처리하고 싶을 때 사용합니다.

동시성 제어: asyncio.Semaphore로 동시 실행 제한하기

외부 API 호출 제한이나 DB 연결 제한이 있을 때, Semaphore로 동시 실행 수를 제어할 수 있습니다.

 1import asyncio
 2import random
 3
 4sem = asyncio.Semaphore(3)  # allow up to 3 concurrent workers
 5
 6async def limited_worker(i):
 7    async with sem:
 8        # Only 3 tasks can be inside this block concurrently
 9        delay = random.random() * 2
10        await asyncio.sleep(delay)
11        print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14    tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15    await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18    asyncio.run(main())
  • Semaphoreasync with와 함께 사용하면, 동시 실행 수를 쉽게 제한할 수 있습니다. 이 방법은 외부 제약이 있는 상황에서 효과적입니다.

공유 자원의 배타적 제어: asyncio.Lock

Lock은 공유 데이터의 동시 업데이트를 방지하는 데 사용됩니다. asyncio.Lock은 비동기 환경에서 사용하는 배타적 동기화 도구입니다.

 1import asyncio
 2
 3counter = 0
 4lock = asyncio.Lock()
 5
 6async def incrementer(n_times):
 7    global counter
 8    for _ in range(n_times):
 9        # Acquire lock to update shared counter safely
10        async with lock:
11            tmp = counter
12            await asyncio.sleep(0)  # yield control to increase race likelihood
13            counter = tmp + 1
14
15async def main():
16    tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17    await asyncio.gather(*tasks)
18    print("final counter:", counter)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • 여러 작업이 전역 counter와 같은 공유 변수를 동시에 업데이트하면 충돌이 발생할 수 있습니다. Lock으로 작업을 감싸면 일관성을 유지할 수 있습니다.

작업 조정: asyncio.Event

Event는 한 작업이 준비 신호를 보내고, 다른 작업들이 이 신호를 대기할 때 사용합니다. 작업 간에 신호를 공유하고 동기화할 수 있는 간단한 방법입니다.

 1import asyncio
 2
 3event = asyncio.Event()
 4
 5async def waiter(name):
 6    print(f"{name} is waiting for event")
 7    await event.wait()
 8    print(f"{name} resumed after event set")
 9
10async def setter():
11    print("setter will sleep and then set the event")
12    await asyncio.sleep(1)
13    event.set()
14    print("event set by setter")
15
16async def main():
17    tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18    await asyncio.create_task(setter())
19    await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22    asyncio.run(main())
  • Event는 불린 플래그를 가지며, set()을 호출하면 대기 중인 모든 작업이 재개됩니다. 간단한 동기화에 유용합니다.

생산자-소비자 패턴: asyncio.Queue

Queue를 사용하면 데이터를 생성하는 생산자와, 처리하는 소비자가 비동기적으로 원활하게 협력할 수 있습니다. 또한 큐가 가득 차면 생산자가 자동으로 대기하여 과잉 생산을 방지하는 백프레셔(역압력)가 자연스럽게 구현됩니다.

 1import asyncio
 2import random
 3
 4async def producer(q, n_items):
 5    for i in range(n_items):
 6        await asyncio.sleep(random.random() * 0.5)
 7        item = f"item-{i}"
 8        await q.put(item)
 9        print("produced", item)
10    # signal consumers to stop
11    await q.put(None)
12
13async def consumer(q, name):
14    while True:
15        item = await q.get()
16        if item is None:
17            # put sentinel back for other consumers and break
18            await q.put(None)
19            break
20        await asyncio.sleep(random.random() * 1)
21        print(name, "consumed", item)
22        q.task_done()
23
24async def main():
25    q = asyncio.Queue(maxsize=5)  # bounded queue to apply backpressure
26    prod = asyncio.create_task(producer(q, 10))
27    cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28    await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31    asyncio.run(main())
  • Queue는 생산자와 소비자가 비동기적으로 협업하는 데 도움을 줍니다. maxsize를 지정하면 큐가 가득 찼을 때 생산자가 put에서 대기하게 되어 과잉 생산을 방지할 수 있습니다.

동기 블로킹 작업 다루기: run_in_executor

CPU 집약적인 처리나 비동기를 지원하지 않는 라이브러리를 사용할 때는 run_in_executor로 별도의 스레드나 프로세스에 처리를 위임합니다. 이렇게 하면 메인 이벤트 루프가 멈추지 않아 다른 비동기 작업도 원활하게 실행됩니다.

 1import asyncio
 2import time
 3
 4def blocking_io(x):
 5    # simulate blocking I/O or CPU-bound work
 6    time.sleep(2)
 7    return x * x
 8
 9async def main():
10    loop = asyncio.get_running_loop()
11    # run blocking_io in default thread pool
12    result = await loop.run_in_executor(None, blocking_io, 3)
13    print("blocking result:", result)
14
15if __name__ == "__main__":
16    asyncio.run(main())
  • 동기 함수를 직접 호출하면 이벤트 루프가 블록됩니다. run_in_executor를 사용하면 코드가 별도의 스레드에서 실행되어 비동기 작업이 동시에 계속 진행될 수 있습니다.

예시: API 호출 제한 상황 (Semaphore + run_in_executor 조합)

다음은 API 호출에 제한이 있고, 결과에 대해 무거운 처리를 하는 샘플 시나리오입니다. Semaphorerun_in_executor를 조합하면 안전하고 효율적으로 처리를 진행할 수 있습니다.

 1import asyncio
 2import time
 3import random
 4
 5sem = asyncio.Semaphore(5)
 6
 7def heavy_sync_processing(data):
 8    # simulate heavy CPU-bound work
 9    time.sleep(1)
10    return f"processed-{data}"
11
12async def api_call(i):
13    await asyncio.sleep(random.random() * 0.5)  # simulate network latency
14    return f"data-{i}"
15
16async def worker(i):
17    async with sem:
18        data = await api_call(i)
19        # offload CPU-bound work to threadpool
20        loop = asyncio.get_running_loop()
21        result = await loop.run_in_executor(None, heavy_sync_processing, data)
22        print(result)
23
24async def main():
25    tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26    await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29    asyncio.run(main())
  • 동시 API 호출 수는 Semaphore로 제한하고, 결과 데이터의 무거운 처리는 스레드 풀에 위임합니다. 네트워크 처리와 CPU 처리를 분리하면 효율성이 향상됩니다.

작업 취소와 정리

작업이 취소될 때 finallyasyncio.CancelledError를 적절히 처리하는 것이 매우 중요합니다. 이렇게 하면 파일과 연결을 해제하고, 중간 상태를 적절히 처리하여 애플리케이션의 일관성을 유지할 수 있습니다.

 1import asyncio
 2
 3async def long_running():
 4    try:
 5        print("started long_running")
 6        await asyncio.sleep(10)
 7        print("finished long_running")
 8    except asyncio.CancelledError:
 9        print("long_running was cancelled, cleaning up")
10        # perform cleanup here
11        raise
12
13async def main():
14    task = asyncio.create_task(long_running())
15    await asyncio.sleep(1)
16    task.cancel()
17    try:
18        await task
19    except asyncio.CancelledError:
20        print("task cancelled in main")
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • 취소는 예외(CancelledError)로 전달되므로, except 블록에서 필요한 정리를 수행하고, 필요하면 예외를 다시 발생시키십시오.

실용적인 설계의 핵심 포인트

다음은 비동기 처리를 설계할 때 유용한 실용적 포인트입니다.

  • 동시성 명확히 제어 API나 DB 등 리소스 제한이 있을 때는 Semaphore로 동시 실행 수를 제한할 수 있습니다.

  • 공유 자원을 안전하게 다루기 여러 작업에서 상태를 수정해야 할 때는 Lock을 사용하세요. 공유 상태를 줄이고 불변 데이터 중심으로 설계하면 더욱 안전합니다.

  • 결과 수신 방법 선택 작업 완료 즉시 처리하려면 asyncio.as_completed를, 입력 순서대로 처리하려면 gather를 사용하세요.

  • 무거운 동기 처리 분리 CPU 집약적 처리나 동기 라이브러리 호출은 이벤트 루프 블로킹 방지를 위해 run_in_executorProcessPoolExecutor를 사용하세요.

  • 취소 및 예외 처리 계획 작업이 중간에 취소되어도 안전하게 리소스를 정리할 수 있도록 예외 처리를 적절히 작성하세요.

  • 테스트 용이성 확보 I/O, 시간, 난수 등의 부작용을 추상화하여 테스트 코드를 쉽게 작성할 수 있습니다.

요약

asyncio는 강력하지만 “동시에 실행하기”에만 집중하면 공유 자원 충돌, 리소스 한계 초과, 이벤트 루프 블로킹 등의 문제가 발생할 수 있습니다. Semaphore, Lock, Event, Queue, run_in_executor 및 적절한 취소 처리를 조합하면 안전하고 효율적인 비동기 애플리케이션을 설계할 수 있습니다. 생산자-소비자 패턴, 동시성 제한, 비동기와 블로킹 처리 분리 같은 메커니즘을 활용하면 비동기 워크플로우를 더 안전하고 효율적으로 구성할 수 있습니다.

위의 기사를 보면서 Visual Studio Code를 사용해 우리 유튜브 채널에서 함께 따라할 수 있습니다. 유튜브 채널도 확인해 주세요.

YouTube Video