Python의 `threading` 모듈
이 글은 Python의 threading 모듈에 대해 설명합니다.
YouTube Video
Python의 threading 모듈
Python의 threading 모듈은 멀티스레드 프로그래밍을 지원하는 표준 라이브러리입니다. 스레드를 사용하면 여러 프로세스를 동시에 실행할 수 있어, 특히 I/O 대기와 같은 블로킹 작업의 경우 프로그램 성능을 향상시킬 수 있습니다. 파이썬의 **글로벌 인터프리터 락(GIL)**로 인해 CPU 중심 작업에서는 멀티스레드의 효과가 제한적이지만, I/O 중심 작업에서는 효율적으로 작동합니다.
다음 섹션에서는 threading 모듈의 기본 사용법과 스레드 제어 방법을 설명합니다.
스레드의 기본 사용법
스레드 생성 및 실행
스레드를 생성하고 동시에 처리를 수행하려면 threading.Thread 클래스를 사용하세요. 대상 함수를 지정하여 스레드를 생성하고 해당 스레드를 실행합니다.
1import threading
2import time
3
4# Function to be executed in a thread
5def worker():
6 print("Worker thread started")
7 time.sleep(2)
8 print("Worker thread finished")
9
10# Create and start the thread
11thread = threading.Thread(target=worker)
12thread.start()
13
14# Processing in the main thread
15print("Main thread continues to run")
16
17# Wait for the thread to finish
18thread.join()
19print("Main thread finished")- 이 예제에서
worker함수는 별도의 스레드에서 실행되고, 메인 스레드는 계속 작업을 수행합니다.join()메서드를 호출하면 메인 스레드는 하위 스레드가 완료될 때까지 기다립니다.
스레드 이름 지정
스레드에 의미 있는 이름을 지정하면 로깅과 디버깅이 쉬워집니다. name 인자로 지정할 수 있습니다.
1import threading
2import time
3
4# Function to be executed in a thread
5def worker():
6 print("Worker thread started")
7 time.sleep(2)
8 print("Worker thread finished")
9
10t = threading.Thread(
11 target=worker,
12 args=("named-worker", 0.3),
13 name="MyWorkerThread"
14)
15
16t.start()
17
18print("Active threads:", threading.active_count())
19for th in threading.enumerate():
20 print(" -", th.name)
21
22t.join()threading.enumerate()는 현재의 스레드 목록을 반환하므로, 디버깅이나 상태 모니터링에 유용합니다.
Thread 클래스 상속
스레드 실행 클래스를 커스터마이즈하려면 threading.Thread 클래스를 상속하여 새 클래스를 정의할 수 있습니다.
1import threading
2import time
3
4# Inherit from the Thread class
5class WorkerThread(threading.Thread):
6 def __init__(self, name, delay, repeat=3):
7 super().__init__(name=name)
8 self.delay = delay
9 self.repeat = repeat
10 self.results = []
11
12 def run(self):
13 for i in range(self.repeat):
14 msg = f"{self.name} step {i+1}"
15 print(msg)
16 self.results.append(msg)
17 time.sleep(self.delay)
18
19# Create and start the threads
20t1 = WorkerThread("Worker-A", delay=0.4, repeat=3)
21t2 = WorkerThread("Worker-B", delay=0.2, repeat=5)
22
23t1.start()
24t2.start()
25
26t1.join()
27t2.join()
28
29print("Results A:", t1.results)
30print("Results B:", t2.results)- 이 예제에서는
run()메서드를 오버라이드하여 스레드의 동작을 정의하며, 각 스레드가 자신만의 데이터를 유지할 수 있도록 합니다. 이는 스레드가 복잡한 처리를 하거나 각 스레드가 독립적인 데이터를 가져야 할 때 유용합니다.
스레드 간 동기화
여러 스레드가 공유 자원에 동시에 접근할 경우 데이터 레이스가 발생할 수 있습니다. 이 문제를 방지하기 위해 threading 모듈은 여러 가지 동기화 메커니즘을 제공합니다.
락(Lock)
Lock 객체는 스레드 간 자원 독점 제어를 구현하는 데 사용됩니다. 한 스레드가 자원을 잠그고 있는 동안 다른 스레드는 해당 자원에 접근할 수 없습니다.
1import threading
2
3lock = threading.Lock()
4shared_resource = 0
5
6def worker():
7 global shared_resource
8 with lock: # Acquire the lock
9 local_copy = shared_resource
10 local_copy += 1
11 shared_resource = local_copy
12
13threads = [threading.Thread(target=worker) for _ in range(5)]
14
15for t in threads:
16 t.start()
17
18for t in threads:
19 t.join()
20
21print(f"Final value of shared resource: {shared_resource}")- 이 예제에서는 5개의 스레드가 공유 자원에 접근하지만,
Lock을 사용하여 여러 스레드가 동시에 데이터를 수정하지 못하도록 합니다.
재진입 가능 락(RLock)
같은 스레드가 여러 번 락을 획득해야 하는 경우, RLock(재진입 가능 락)을 사용하세요. 이는 재귀 호출이나 여러 곳에서 락을 획득할 수 있는 라이브러리 호출에 유용합니다.
1import threading
2
3rlock = threading.RLock()
4shared = []
5
6def outer():
7 with rlock:
8 shared.append("outer")
9 inner()
10
11def inner():
12 with rlock:
13 shared.append("inner")
14
15t = threading.Thread(target=outer)
16t.start()
17t.join()
18print(shared)RLock을 사용하면 동일한 스레드가 이미 획득한 락을 다시 획득할 수 있어, 중첩된 락 획득 시 데드락을 방지할 수 있습니다.
조건(Condition)
Condition은 스레드가 특정 조건을 충족할 때까지 대기하도록 사용하는 데 사용됩니다. 스레드가 조건을 만족하면 notify()를 호출하여 다른 스레드에게 알리거나, notify_all()을 호출하여 대기 중인 모든 스레드에게 알릴 수 있습니다.
아래는 Condition을 사용하는 생산자와 소비자 예제입니다.
1import threading
2
3condition = threading.Condition()
4shared_data = []
5
6def producer():
7 with condition:
8 shared_data.append(1)
9 print("Produced an item")
10 condition.notify() # Notify the consumer
11
12def consumer():
13 with condition:
14 condition.wait() # Wait until the condition is met
15
16 item = shared_data.pop(0)
17 print(f"Consumed an item: {item}")
18
19# Create the threads
20producer_thread = threading.Thread(target=producer)
21consumer_thread = threading.Thread(target=consumer)
22
23consumer_thread.start()
24producer_thread.start()
25
26producer_thread.join()
27consumer_thread.join()- 이 코드는 생산자가 데이터를 추가할 때
Condition을 통해 알리고, 소비자는 해당 알림을 기다렸다가 데이터를 가져가므로 동기화를 이룹니다.
스레드 데몬화
데몬 스레드는 메인 스레드가 종료되면 강제로 종료됩니다. 일반 스레드는 종료를 위해 대기해야 하지만, 데몬 스레드는 자동으로 종료됩니다.
1import threading
2import time
3
4def worker():
5 while True:
6 print("Working...")
7 time.sleep(1)
8
9# Create a daemon thread
10thread = threading.Thread(target=worker)
11thread.daemon = True # Set as a daemon thread
12
13thread.start()
14
15# Processing in the main thread
16time.sleep(3)
17print("Main thread finished")- 이 예에서는
worker스레드가 데몬화 되어 메인 스레드 종료 시 강제로 종료됩니다.
ThreadPoolExecutor를 사용한 스레드 관리
threading 모듈 이외에도, concurrent.futures 모듈의 ThreadPoolExecutor를 사용하여 스레드 풀을 관리하고 병렬로 작업을 실행할 수 있습니다.
1from concurrent.futures import ThreadPoolExecutor
2import time
3
4def worker(seconds):
5 print(f"Sleeping for {seconds} second(s)")
6 time.sleep(seconds)
7 return f"Finished sleeping for {seconds} second(s)"
8
9with ThreadPoolExecutor(max_workers=3) as executor:
10 futures = [executor.submit(worker, i) for i in range(1, 4)]
11 for future in futures:
12 print(future.result())ThreadPoolExecutor는 스레드 풀을 생성하고 작업을 효율적으로 처리합니다.max_workers를 사용하여 동시에 실행할 스레드 수를 지정합니다.
스레드 간 이벤트 통신
threading.Event를 사용하여 스레드 간의 플래그를 설정하고 이벤트 발생을 다른 스레드에 알릴 수 있습니다.
1import threading
2import time
3
4event = threading.Event()
5
6def worker():
7 print("Waiting for event to be set")
8 event.wait() # Wait until the event is set
9
10 print("Event received, continuing work")
11
12thread = threading.Thread(target=worker)
13thread.start()
14
15time.sleep(2)
16print("Setting the event")
17event.set() # Set the event and notify the thread- 이 코드는 워커 스레드가
Event신호를 기다렸다가, 메인 스레드가event.set()을 호출하면 처리를 계속하는 메커니즘을 보여줍니다.
스레드에서의 예외 처리와 스레드 종료
스레드에서 예외가 발생하면 메인 스레드로 직접 전달되지 않기 때문에, 예외를 포착하여 공유하는 패턴이 필요합니다.
1import threading
2import queue
3
4def worker(err_q):
5 try:
6 raise ValueError("Something bad")
7 except Exception as e:
8 err_q.put(e)
9
10q = queue.Queue()
11t = threading.Thread(target=worker, args=(q,))
12t.start()
13t.join()
14if not q.empty():
15 exc = q.get()
16 print("Worker raised:", exc)Queue에 예외를 넣고 메인 스레드에서 이를 가져오면 실패를 신뢰성 있게 감지할 수 있습니다.concurrent.futures.ThreadPoolExecutor를 사용하면, 예외가future.result()를 통해 다시 발생하므로 더 쉽게 처리할 수 있습니다.
GIL(글로벌 인터프리터 락)과 그 영향
CPython의 GIL(글로벌 인터프리터 락) 메커니즘 때문에, 하나의 프로세스 내에서 여러 파이썬 바이트코드가 실제로 동시에 실행되지는 않습니다. 복잡한 계산 등 CPU 집약적인 작업에는 multiprocessing 사용을 권장합니다. 반면 파일 읽기나 네트워크 통신 등 I/O 중심의 작업에는 threading이 효과적입니다.
요약
Python의 threading 모듈을 사용하여 멀티스레드 프로그램을 구현하고 여러 프로세스를 동시에 실행할 수 있습니다. Lock과 Condition 같은 동기화 메커니즘을 사용하여 공유 자원에 안전하게 접근하고 복잡한 동기화를 수행할 수 있습니다. 또한, 데몬 스레드나 ThreadPoolExecutor를 사용하면 스레드 관리 및 효율적인 병렬 처리가 더욱 쉬워집니다.
위의 기사를 보면서 Visual Studio Code를 사용해 우리 유튜브 채널에서 함께 따라할 수 있습니다. 유튜브 채널도 확인해 주세요.