Python의 `concurrent` 모듈

Python의 `concurrent` 모듈

이 기사에서는 Python의 concurrent 모듈에 대해 설명하겠습니다.

동시성과 병렬성의 개념을 명확히 하면서, concurrent 모듈을 사용하여 비동기 처리를 구현하는 방법을 실용적인 예시와 함께 설명하겠습니다.

YouTube Video

Python의 concurrent 모듈

Python에서 처리 속도를 높일 때는 동시성과 병렬성의 차이를 염두에 두는 것이 중요합니다. concurrent 모듈은 이러한 차이를 고려하여 비동기 처리를 안전하고 간단하게 다루는 중요한 수단입니다.

동시성과 병렬성의 차이

  • 동시성이란 여러 작업을 작은 단위로 번갈아가며 진행하도록 프로세스를 설계하는 것을 의미합니다. 작업이 실제로 동시에 실행되지 않더라도, "대기 시간"을 활용하여 전체 프로세스를 더욱 효율적으로 만들 수 있습니다.

  • 병렬성이란 여러 작업을 물리적으로 동시에 실행하는 메커니즘입니다. 여러 CPU 코어를 사용함으로써 처리가 동시에 진행됩니다.

둘 다 처리 속도를 높이기 위한 기술이지만, 동시성은 '진행 방법'이라는 설계 문제이고 병렬성은 '실행 방식'이라는 실행 문제로, 근본적으로 다릅니다.

concurrent 모듈이란?

concurrent는 Python의 표준 라이브러리로, 동시성과 병렬성을 안전하고 간단하게 다루기 위한 고수준 API를 제공합니다. 쓰레드나 프로세스의 생성·관리와 같은 저수준 동작에 신경 쓰지 않고, '작업 실행'에 집중할 수 있도록 설계되어 있습니다.

ThreadPoolExecutorProcessPoolExecutor의 역할

concurrent 모듈은 작업의 특성에 따라 두 가지 주요 옵션을 제공합니다.

  • ThreadPoolExecutor 이것은 특히 네트워크나 파일 작업 등 I/O 대기가 많은 작업의 동시성 구현에 적합합니다. 작업 간 전환을 통해 대기 시간을 효율적으로 활용할 수 있습니다.

  • ProcessPoolExecutor 이 구현은 병렬 처리를 목표로 하며, CPU 부하가 큰 작업에 적합합니다. 여러 프로세스를 사용하여 사용할 수 있는 모든 CPU 코어를 병렬로 최대한 활용합니다.

따라서 concurrent 모듈의 주요 특징은 필요에 따라 동시성과 병렬성 중 적절히 선택할 수 있는 구조를 제공한다는 점입니다.

ThreadPoolExecutor 기본 사용법 (I/O 작업용)

ThreadPoolExecutor는 네트워크 통신, 파일 작업 등 I/O 중심 작업에 적합합니다. 여러 쓰레드에 작업을 분산시켜 대기 시간을 효율적으로 활용합니다.

 1from concurrent.futures import ThreadPoolExecutor
 2import time
 3
 4def fetch_data(n):
 5    # Simulate an I/O-bound task
 6    time.sleep(1)
 7    return f"data-{n}"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    futures = [executor.submit(fetch_data, i) for i in range(5)]
11
12    for future in futures:
13        print(future.result())
  • 이 예제에서는 1초 동안 대기하는 여러 I/O 작업이 동시에 실행됩니다. submit을 사용하면 함수 호출을 비동기 작업으로 등록하고, result()로 완료를 대기하며 결과를 얻을 수 있어, 대기 시간을 효율적으로 활용하는 동시성 처리를 간결하게 구현할 수 있습니다.

map을 사용한 간단한 동시 처리

복잡한 제어가 필요 없다면 map을 사용해 코드를 더 간단하게 만들 수 있습니다.

 1from concurrent.futures import ThreadPoolExecutor
 2import time
 3
 4def fetch_data(n):
 5    # Simulate an I/O-bound task
 6    time.sleep(1)
 7    return f"data-{n}"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    results = executor.map(fetch_data, range(5))
11
12    for result in results:
13        print(result)
  • 이 예제에서는 ThreadPoolExecutor.map을 이용해 여러 I/O 작업을 동시에 실행합니다. map은 입력 순서대로 결과를 반환하므로, 순차 처리와 유사한 코드를 작성할 수 있고 비동기 처리임을 인식하지 않아도 동시 처리가 가능하다는 점이 큰 장점입니다.

ProcessPoolExecutor 기본 사용법 (CPU 중심 작업용)

CPU를 최대로 활용하는 무거운 계산 작업에는 쓰레드가 아닌 프로세스를 사용하는 것이 좋습니다. 이를 통해 Global Interpreter Lock(GIL) 제약을 피할 수 있습니다.

 1from concurrent.futures import ProcessPoolExecutor
 2
 3def heavy_calculation(n):
 4    # Simulate a CPU-bound task
 5    total = 0
 6    for i in range(10_000_000):
 7        total += i * n
 8    return total
 9
10if __name__ == "__main__":
11    with ProcessPoolExecutor(max_workers=4) as executor:
12        results = executor.map(heavy_calculation, range(4))
13
14        for result in results:
15            print(result)

이 예제에서는 ProcessPoolExecutor를 이용하여 CPU 사용량이 많은 계산을 병렬로 실행합니다. 프로세스 생성을 포함하기 때문에 __main__ 가드가 필요하며, 여러 CPU 코어를 활용한 안전한 병렬 처리가 가능합니다.

as_completed를 사용한 완료 순서 처리

as_completed는 결과가 완료되는 순서대로 처리하고 싶을 때 유용합니다.

 1from concurrent.futures import ThreadPoolExecutor, as_completed
 2import time
 3
 4def fetch_data(n):
 5    # Simulate an I/O-bound task
 6    time.sleep(1)
 7    return f"data-{n}"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    futures = [executor.submit(fetch_data, i) for i in range(5)]
11
12    for future in as_completed(futures):
13        print(future.result())
  • 이 예제에서는 여러 비동기 작업이 동시에 실행되며, 결과를 완료 순서대로 수집합니다. as_completed를 사용하면 작업 순서와 상관없이 결과를 빠르게 처리할 수 있어, 진행 상황 표시나 순차 처리가 필요한 상황에 적합합니다.

예외 처리

concurrent에서는 result()를 호출할 때 예외가 발생합니다.

 1from concurrent.futures import ThreadPoolExecutor
 2
 3def risky_task(n):
 4    # Simulate a task that may fail for a specific input
 5    if n == 3:
 6        raise ValueError("Something went wrong")
 7    return n * 2
 8
 9with ThreadPoolExecutor() as executor:
10    futures = [executor.submit(risky_task, i) for i in range(5)]
11
12    for future in futures:
13        try:
14            print(future.result())
15        except Exception as e:
16            print("Error:", e)
  • 이 예제는 일부 작업에서 예외가 발생해도 나머지는 계속 실행되며, 결과를 가져올 때 개별적으로 예외를 처리할 수 있음을 보여줍니다. concurrentFuture를 사용하면 비동기 처리의 성공과 실패를 안전하게 다룰 수 있다는 것이 중요합니다.

쓰레드와 프로세스 선택 가이드라인

동시성과 병렬성을 효과적으로 활용하려면 작업의 특성에 따라 올바른 방식을 선택하는 것이 중요합니다.

실제로는 다음 기준이 선택에 도움이 됩니다.

  • 통신이나 파일 작업처럼 I/O 대기가 많은 프로세스에는 ThreadPoolExecutor를 사용하세요.
  • CPU 중심의 무거운 계산 작업에는 ProcessPoolExecutor를 사용하세요.
  • 단순한 작업이 많을 경우 map을 사용하면 코드를 더 간결하게 작성할 수 있습니다.
  • 실행 순서나 예외 처리의 정밀한 제어가 필요하다면 submitas_completed를 함께 사용하세요.

concurrent 사용의 이점

concurrent 모듈을 사용하면 비동기 처리를 안전하고 직관적으로 다룰 수 있습니다.

주요 이점은 다음과 같습니다:.

  • 저수준의 쓰레드나 프로세스 관리를 신경 쓸 필요가 없습니다.
  • Python 표준 라이브러리의 일부로 제공되기 때문에 안심하고 사용할 수 있습니다.
  • 코드의 가독성과 유지보수성이 향상됩니다.
  • 동시성과 병렬성을 배우는 첫걸음으로 이상적입니다.

이러한 가이드라인만 기억해도 concurrent를 사용하는 구현에서의 실패를 크게 줄일 수 있습니다.

요약

concurrent 모듈은 Python에서 실용적인 동시성과 병렬성을 위한 표준 옵션입니다. 처리 내용을 크게 변경하지 않고 성능을 향상시킬 수 있기 때문에 실제 상황에서 매우 큰 이점이 있습니다. concurrent를 사용하면 예외 처리와 실행 제어를 안전하게 관리하면서 비동기 처리를 간결하게 구현할 수 있습니다.

위의 기사를 보면서 Visual Studio Code를 사용해 우리 유튜브 채널에서 함께 따라할 수 있습니다. 유튜브 채널도 확인해 주세요.

YouTube Video