Pythonにおける`threading`モジュール

Pythonにおける`threading`モジュール

この記事ではPythonにおけるthreadingモジュールについて説明します。

YouTube Video

Pythonにおけるthreadingモジュール

Pythonのthreadingモジュールは、マルチスレッドプログラミングをサポートするための標準ライブラリです。スレッドを使用することで、複数の処理を並行して実行でき、特にI/O待ちのようなブロッキング操作が発生する場合に、プログラムのパフォーマンスを向上させることが可能です。Pythonは**Global Interpreter Lock (GIL)**の影響で、CPUバウンドの処理ではマルチスレッドの効果が限定的ですが、I/Oバウンドの処理では有効に機能します。

以下では、threadingモジュールの基本的な使い方やスレッドの制御方法について説明します。

スレッドの基本的な使い方

スレッドの作成と実行

スレッドを作成して並行処理を行うには、threading.Threadクラスを使用します。ターゲット関数を指定してスレッドを作成し、そのスレッドを実行します。

 1import threading
 2import time
 3
 4# Function to be executed in a thread
 5def worker():
 6    print("Worker thread started")
 7    time.sleep(2)
 8    print("Worker thread finished")
 9
10# Create and start the thread
11thread = threading.Thread(target=worker)
12thread.start()
13
14# Processing in the main thread
15print("Main thread continues to run")
16
17# Wait for the thread to finish
18thread.join()
19print("Main thread finished")
  • この例では、worker関数が別スレッドで実行され、メインスレッドはその間も動作し続けます。join()メソッドを呼ぶことで、メインスレッドはサブスレッドの終了を待機します。

スレッドに名前をつける

スレッドに意味のある名前を付けるとログやデバッグが楽になります。name 引数で指定可能です。

 1import threading
 2import time
 3
 4# Function to be executed in a thread
 5def worker():
 6    print("Worker thread started")
 7    time.sleep(2)
 8    print("Worker thread finished")
 9
10t = threading.Thread(
11    target=worker,
12    args=("named-worker", 0.3),
13    name="MyWorkerThread"
14)
15
16t.start()
17
18print("Active threads:", threading.active_count())
19for th in threading.enumerate():
20    print(" -", th.name)
21
22t.join()
  • threading.enumerate() は現在のスレッド一覧を返すので、デバッグや状態監視に便利です。

スレッドクラスの継承

スレッドを実行するクラスをカスタマイズしたい場合、threading.Threadクラスを継承して新しいクラスを定義できます。

 1import threading
 2import time
 3
 4# Inherit from the Thread class
 5class WorkerThread(threading.Thread):
 6    def __init__(self, name, delay, repeat=3):
 7        super().__init__(name=name)
 8        self.delay = delay
 9        self.repeat = repeat
10        self.results = []
11
12    def run(self):
13        for i in range(self.repeat):
14            msg = f"{self.name} step {i+1}"
15            print(msg)
16            self.results.append(msg)
17            time.sleep(self.delay)
18
19# Create and start the threads
20t1 = WorkerThread("Worker-A", delay=0.4, repeat=3)
21t2 = WorkerThread("Worker-B", delay=0.2, repeat=5)
22
23t1.start()
24t2.start()
25
26t1.join()
27t2.join()
28
29print("Results A:", t1.results)
30print("Results B:", t2.results)
  • この例では run() をオーバーライドして処理内容を定義し、各スレッドが独自のデータを保持できるようになっています。複雑な処理を行うスレッドや、スレッドごとに独立したデータを持たせたいときに便利です。

スレッド間の同期

複数のスレッドが同時に共有リソースにアクセスすると、データ競合が発生する可能性があります。これを防ぐため、threadingモジュールにはいくつかの同期メカニズムが用意されています。

ロック (Lock)

Lockオブジェクトを使って、スレッド間でのリソースの排他制御を行います。1つのスレッドがリソースをロックしている間、他のスレッドはそのリソースにアクセスできません。

 1import threading
 2
 3lock = threading.Lock()
 4shared_resource = 0
 5
 6def worker():
 7    global shared_resource
 8    with lock:  # Acquire the lock
 9        local_copy = shared_resource
10        local_copy += 1
11        shared_resource = local_copy
12
13threads = [threading.Thread(target=worker) for _ in range(5)]
14
15for t in threads:
16    t.start()
17
18for t in threads:
19    t.join()
20
21print(f"Final value of shared resource: {shared_resource}")
  • この例では、5つのスレッドが共有リソースにアクセスしますが、Lockを使って同時に複数のスレッドがデータを変更することを防ぎます。

再入可能ロック(RLock

同一スレッドが再びロックを取得する必要がある場合は RLock(再入可能ロック)を使います。再帰的呼び出しやロックを跨ぐライブラリ呼び出しで役立ちます。

 1import threading
 2
 3rlock = threading.RLock()
 4shared = []
 5
 6def outer():
 7    with rlock:
 8        shared.append("outer")
 9        inner()
10
11def inner():
12    with rlock:
13        shared.append("inner")
14
15t = threading.Thread(target=outer)
16t.start()
17t.join()
18print(shared)
  • RLock を使うと、同じスレッドが既に保持しているロックを再取得できるため、ネストしたロック取得でデッドロックを避けられます。

コンディション (Condition)

Conditionは、スレッドが特定の条件が満たされるのを待つために使用します。スレッドが条件を満たしたときに、別のスレッドへ通知するnotify()や、全ての待機スレッドに通知するnotify_all()を呼び出します。

以下は、Conditionを用いた、プロデューサーとコンシューマーの例です。

 1import threading
 2
 3condition = threading.Condition()
 4shared_data = []
 5
 6def producer():
 7    with condition:
 8        shared_data.append(1)
 9        print("Produced an item")
10        condition.notify()  # Notify the consumer
11
12def consumer():
13    with condition:
14        condition.wait()  # Wait until the condition is met
15
16        item = shared_data.pop(0)
17        print(f"Consumed an item: {item}")
18
19# Create the threads
20producer_thread = threading.Thread(target=producer)
21consumer_thread = threading.Thread(target=consumer)
22
23consumer_thread.start()
24producer_thread.start()
25
26producer_thread.join()
27consumer_thread.join()
  • このコードは、Condition を使ってプロデューサーがデータ追加を通知し、コンシューマーがその通知を待ってからデータを取り出す同期処理を行っています。

スレッドのデーモン化

デーモンスレッドは、メインスレッドが終了した時点で強制的に終了されます。通常のスレッドが終了するのを待たなければならないのに対し、デーモンスレッドは自動的に終了します。

 1import threading
 2import time
 3
 4def worker():
 5    while True:
 6        print("Working...")
 7        time.sleep(1)
 8
 9# Create a daemon thread
10thread = threading.Thread(target=worker)
11thread.daemon = True  # Set as a daemon thread
12
13thread.start()
14
15# Processing in the main thread
16time.sleep(3)
17print("Main thread finished")
  • この例では、workerスレッドはデーモン化されているため、メインスレッドが終了すると強制的に終了します。

ThreadPoolExecutorによるスレッド管理

threadingモジュールとは別に、concurrent.futuresモジュールのThreadPoolExecutorを使用すると、スレッドのプールを管理し、タスクを並列に実行できます。

 1from concurrent.futures import ThreadPoolExecutor
 2import time
 3
 4def worker(seconds):
 5    print(f"Sleeping for {seconds} second(s)")
 6    time.sleep(seconds)
 7    return f"Finished sleeping for {seconds} second(s)"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    futures = [executor.submit(worker, i) for i in range(1, 4)]
11    for future in futures:
12        print(future.result())
  • ThreadPoolExecutorは、スレッドのプールを作成し、効率的にタスクを処理します。max_workersで同時に実行するスレッドの数を指定します。

スレッド間でのイベント通信

threading.Eventを使用すると、スレッド間でフラグを立てて、他のスレッドにイベントの発生を知らせることができます。

 1import threading
 2import time
 3
 4event = threading.Event()
 5
 6def worker():
 7    print("Waiting for event to be set")
 8    event.wait()  # Wait until the event is set
 9
10    print("Event received, continuing work")
11
12thread = threading.Thread(target=worker)
13thread.start()
14
15time.sleep(2)
16print("Setting the event")
17event.set()  # Set the event and notify the thread
  • このコードは、ワーカー・スレッドが Event のシグナルが送られるまで待機し、メインスレッドが event.set() を呼ぶと処理を再開する仕組みを示しています。

スレッドでの例外処理とスレッド終了の扱い

スレッド内で例外が発生してもメインスレッドには直接伝わらないため、例外を取得して共有するパターンが必要です。

 1import threading
 2import queue
 3
 4def worker(err_q):
 5    try:
 6        raise ValueError("Something bad")
 7    except Exception as e:
 8        err_q.put(e)
 9
10q = queue.Queue()
11t = threading.Thread(target=worker, args=(q,))
12t.start()
13t.join()
14if not q.empty():
15    exc = q.get()
16    print("Worker raised:", exc)
  • 例外を Queue に入れてメインスレッドで取り出すことで、失敗を確実に検出できます。concurrent.futures.ThreadPoolExecutor を使うと例外は future.result() で再スローされ、扱いやすくなります。

GIL(Global Interpreter Lock)とその影響

CPython の GIL(Global Interpreter Lock) の仕組みにより、基本的に同じプロセスの中では複数の Python バイトコードが同時に動くことはありません。大量の計算のように CPU を多く使う処理では multiprocessing を使う方法が向いています。一方で、ファイル読み込みやネットワーク通信のように I/O が中心の処理では threading が効果的に働きます。

まとめ

Pythonのthreadingモジュールを使うことで、マルチスレッドプログラムを実装し、複数の処理を並行して実行できます。LockConditionなどの同期メカニズムを使って、安全に共有リソースへアクセスし、複雑な同期処理を行うことが可能です。また、デーモンスレッドやThreadPoolExecutorなどを使うことで、スレッドの管理や効率的な並列処理が簡単に行えます。

YouTubeチャンネルでは、Visual Studio Codeを用いて上記の記事を見ながら確認できます。 ぜひYouTubeチャンネルもご覧ください。

YouTube Video