Python의 `multiprocessing` 모듈
이 글은 Python의 multiprocessing 모듈을 설명합니다.
multiprocessing 모듈을 사용하여 안전하고 효율적인 병렬 처리 코드를 작성하는 실용적인 팁을 소개합니다.
YouTube Video
Python의 multiprocessing 모듈
기초: 왜 multiprocessing을 사용할까요?
multiprocessing은 프로세스 단위로 병렬 처리를 가능하게 하여 파이썬의 GIL(Global Interpreter Lock)의 제약 없이 CPU 중심 작업을 병렬화할 수 있습니다. I/O 중심 작업에는 threading이나 asyncio가 더 간단하고 적합할 수 있습니다.
Process의 간단한 사용법
먼저, Process를 사용하여 별도의 프로세스에서 함수를 실행하는 기본 예시입니다. 이 예제는 프로세스를 시작하고, 완료될 때까지 기다리며, 인자를 전달하는 방법을 보여줍니다.
1# Explanation:
2# This example starts a separate process to run `worker` which prints messages.
3# It demonstrates starting, joining, and passing arguments.
4
5from multiprocessing import Process
6import time
7
8def worker(name, delay):
9 # English comment in code per user's preference
10 for i in range(3):
11 print(f"Worker {name}: iteration {i}")
12 time.sleep(delay)
13
14if __name__ == "__main__":
15 p = Process(target=worker, args=("A", 0.5))
16 p.start()
17 print("Main: waiting for worker to finish")
18 p.join()
19 print("Main: worker finished")- 이 코드는 메인 프로세스가 자식 프로세스
worker를 실행하고,join()을 사용하여 완료를 기다리는 흐름을 보여줍니다.args를 사용하면 인자를 전달할 수 있습니다.
Pool로 간단한 병렬 처리 (고수준 API)
여러 독립적인 작업에 동일한 함수를 적용하고자 할 때는 Pool.map이 유용합니다. 워커 프로세스는 내부적으로 자동 관리됩니다.
1# Explanation:
2# Use Pool.map to parallelize a CPU-bound function across available processes.
3# Good for "embarrassingly parallel" workloads.
4
5from multiprocessing import Pool, cpu_count
6import math
7import time
8
9def is_prime(n):
10 # Check primality (inefficient but CPU-heavy for demo)
11 if n < 2:
12 return False
13 for i in range(2, int(math.sqrt(n)) + 1):
14 if n % i == 0:
15 return False
16 return True
17
18if __name__ == "__main__":
19 nums = [10_000_000 + i for i in range(50)]
20 start = time.time()
21 with Pool(processes=cpu_count()) as pool:
22 results = pool.map(is_prime, nums)
23 end = time.time()
24 print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")Pool은 워커 수를 자동으로 제어하며,map은 결과를 원래 순서대로 반환합니다.
프로세스 간 통신: Queue를 이용한 생산자/소비자 패턴
Queue는 프로세스 간에 객체를 안전하게 전송하는 선입선출(FIFO) 큐입니다. 아래는 몇 가지 일반적인 패턴입니다.
1# Explanation:
2# Demonstrates a producer putting items into a Queue
3# and consumer reading them.
4# This is useful for task pipelines between processes.
5
6from multiprocessing import Process, Queue
7import time
8import random
9
10def producer(q, n):
11 for i in range(n):
12 item = f"item-{i}"
13 print("Producer: putting", item)
14 q.put(item)
15 time.sleep(random.random() * 0.5)
16 q.put(None) # sentinel to signal consumer to stop
17
18def consumer(q):
19 while True:
20 item = q.get()
21 if item is None:
22 break
23 print("Consumer: got", item)
24 time.sleep(0.2)
25
26if __name__ == "__main__":
27 q = Queue()
28 p = Process(target=producer, args=(q, 5))
29 c = Process(target=consumer, args=(q,))
30 p.start()
31 c.start()
32 p.join()
33 c.join()
34 print("Main: done")Queue를 사용하면 데이터를 프로세스 간에 안전하게 전달할 수 있습니다. 종료를 알리기 위해None과 같은 특수 값을 사용하는 것이 일반적입니다.
공유 메모리: Value와 Array
작은 숫자나 배열을 프로세스 간에 공유하려면 Value와 Array를 사용할 수 있습니다. 충돌을 피하려면 락을 사용해야 합니다.
1# Explanation:
2# Use Value to share a single integer counter
3# and Array for a small numeric array.
4# Show how to use a Lock to avoid race conditions.
5
6from multiprocessing import Process, Value, Array, Lock
7import time
8
9def increment(counter, lock, times):
10 for _ in range(times):
11 with lock:
12 counter.value += 1
13
14def update_array(arr):
15 for i in range(len(arr)):
16 arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19 lock = Lock()
20 counter = Value('i', 0) # 'i' = signed int
21 shared_arr = Array('i', [0, 0, 0])
22
23 p1 = Process(target=increment, args=(counter, lock, 1000))
24 p2 = Process(target=increment, args=(counter, lock, 1000))
25 a = Process(target=update_array, args=(shared_arr,))
26
27 p1.start(); p2.start(); a.start()
28 p1.join(); p2.join(); a.join()
29
30 print("Counter:", counter.value)
31 print("Array:", list(shared_arr))Value와Array는 **저수준 메커니즘(C 언어 수준의 공유 메모리)**을 사용하여 데이터를 프로세스 간에 공유하며, 파이썬 자체가 아닌 방식을 사용합니다. 따라서 적은 양의 데이터를 빠르게 읽고 쓰는 데는 적합하지만, 많은 양의 데이터를 처리하는 데는 적합하지 않습니다..
고급 공유: Manager를 활용한 공유 객체 (dict, list 등)
리스트나 딕셔너리와 같이 더 유연한 공유 객체를 사용하려면 Manager()를 이용하세요.
1# Explanation:
2# Manager provides proxy objects like dict/list
3# that can be shared across processes.
4# Good for moderate-size shared state and easier programming model.
5
6from multiprocessing import Process, Manager
7import time
8
9def worker(shared_dict, key, value):
10 shared_dict[key] = value
11
12if __name__ == "__main__":
13 with Manager() as manager:
14 d = manager.dict()
15 processes = []
16 for i in range(5):
17 p = Process(target=worker, args=(d, f"k{i}", i*i))
18 p.start()
19 processes.append(p)
20 for p in processes:
21 p.join()
22 print("Shared dict:", dict(d))Manager는 딕셔너리와 리스트를 공유할 때 편리하지만, 접근할 때마다 프로세스 간에 데이터를 전송하고pickle변환이 필요합니다. 따라서 대량의 데이터를 자주 업데이트하면 처리 속도가 느려집니다.
동기화 메커니즘: Lock 및 Semaphore 사용 방법
Lock이나 Semaphore를 사용하여 공유 리소스에 대한 동시 접근을 제어하세요. with 문을 사용하여 간결하게 관리할 수 있습니다.
1# Explanation:
2# Demonstrates using Lock to prevent simultaneous access to a critical section.
3# Locks are necessary when shared resources are not atomic.
4
5from multiprocessing import Process, Lock, Value
6
7def safe_add(counter, lock):
8 for _ in range(10000):
9 with lock:
10 counter.value += 1
11
12if __name__ == "__main__":
13 lock = Lock()
14 counter = Value('i', 0)
15 p1 = Process(target=safe_add, args=(counter, lock))
16 p2 = Process(target=safe_add, args=(counter, lock))
17 p1.start(); p2.start()
18 p1.join(); p2.join()
19 print("Counter:", counter.value)- 락은 데이터 경합을 방지하지만, 락이 걸리는 영역이 너무 크면 병렬 처리 성능이 저하됩니다. 필요한 부분만 임계 구역으로 보호해야 합니다.
UNIX의 fork와 Windows의 동작 차이
UNIX 시스템에서는 기본적으로 fork를 사용하여 프로세스를 복제하므로, 메모리의 copy-on-write가 효율적으로 작동합니다. Windows에서는 프로세스를 spawn 방식으로 시작(모듈을 재임포트함)하므로, 엔트리 포인트 보호와 전역 초기화에 유의해야 합니다.
1# Explanation: Check start method (fork/spawn) and set it if needed.
2# Useful for debugging platform-dependent behavior.
3
4from multiprocessing import get_start_method, set_start_method
5
6if __name__ == "__main__":
7 print("Start method:", get_start_method())
8
9 # uncomment to force spawn on Unix for testing
10 # set_start_method('spawn')set_start_method는 프로그램 시작 시 단 한 번만 호출할 수 있습니다. 라이브러리 내부에서 임의로 변경하지 않는 것이 안전합니다.
실전 예시: CPU 중심 작업의 벤치마킹 (비교)
아래는 multiprocessing을 사용하여 병렬화했을 때 처리 속도가 얼마나 빨라지는지 간단히 비교하는 스크립트입니다. 여기서는 Pool을 사용합니다.
1# Explanation:
2# Compare sequential vs parallel execution times for CPU-bound task.
3# Helps understand speedup and overhead.
4
5import time
6from multiprocessing import Pool, cpu_count
7import math
8
9def heavy_task(n):
10 s = 0
11 for i in range(1, n):
12 s += math.sqrt(i)
13 return s
14
15def run_sequential(nums):
16 return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19 with Pool(processes=cpu_count()) as p:
20 return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23 nums = [2000000] * 8 # heavy tasks
24 t0 = time.time()
25 run_sequential(nums)
26 seq = time.time() - t0
27 t1 = time.time()
28 run_parallel(nums)
29 par = time.time() - t1
30 print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")- 이 예제는 작업 부하 및 프로세스 수에 따라 병렬 처리가 오버헤드로 인해 비효율적일 수 있음을 보여줍니다. 작업이 '무겁고' 독립적일수록 병렬 처리의 이점이 커집니다.
중요한 기본 규칙
아래는 multiprocessing을 안전하고 효율적으로 사용하기 위한 기본 요점입니다.
- Windows에서는 자식 프로세스가 시작될 때 모듈이 다시 임포트되므로, 반드시
if __name__ == "__main__":로 스크립트의 엔트리 포인트를 보호해야 합니다. - 프로세스 간 통신은 직렬화(
pickle변환 포함)되므로, 큰 객체를 전송하는 데 비용이 많이 듭니다. multiprocessing은 프로세스를 여러 개 생성하므로, 일반적으로multiprocessing.cpu_count()를 기준으로 프로세스 수를 결정합니다.- 워커 내부에서 다른
Pool을 생성하면 복잡해지므로, 가능한 한Pool인스턴스의 중첩을 피해야 합니다. - 자식 프로세스에서 발생하는 예외는 메인 프로세스에서 감지하기 어려우므로, 명시적으로 로깅과 오류 처리를 구현해야 합니다.
- 프로세스 수는 CPU에 맞게 정하고, I/O 중심 작업에는 스레드 사용도 고려하세요.
실전 설계 팁
아래는 병렬 처리를 설계할 때 유용한 개념과 패턴입니다.
- 파이프라이닝을 통해 입력(I/O), 전처리(다중 CPU), 통합(직렬) 등 역할별로 프로세스를 분리하면 효율적입니다.
- 디버깅을 쉽게 하려면 우선 단일 프로세스에서 동작을 확인한 뒤 병렬화하세요.
- 로깅은 프로세스별로 구분해서 로그를 출력(예: 파일명에 PID 포함)하여 문제의 원인 분리를 쉽게 하세요.
- 프로세스가 멈추더라도 안전하게 복구할 수 있도록 재시도 및 타임아웃 기법을 준비하세요.
요약 (즉시 활용 가능한 핵심 포인트)
병렬 처리는 강력하지만, 작업 특성, 데이터 크기, 프로세스 간 통신 비용을 올바르게 파악하는 것이 중요합니다. multiprocessing은 CPU 중심 처리에 효과적이지만, 설계 미흡이나 동기화 오류가 있으면 성능이 저하될 수 있습니다. 기본 규칙과 패턴을 따르면 안전하고 효율적인 병렬 프로그램을 만들 수 있습니다.
위의 기사를 보면서 Visual Studio Code를 사용해 우리 유튜브 채널에서 함께 따라할 수 있습니다. 유튜브 채널도 확인해 주세요.