Moduł `threading` w Pythonie

Moduł `threading` w Pythonie

Ten artykuł wyjaśnia moduł threading w Pythonie.

YouTube Video

Moduł threading w Pythonie

Moduł threading w Pythonie to biblioteka standardowa wspierająca programowanie wielowątkowe. Używanie wątków pozwala na równoczesne wykonywanie wielu procesów, co może poprawić wydajność programów, zwłaszcza w przypadku operacji blokujących, takich jak oczekiwanie na I/O. Ze względu na Global Interpreter Lock (GIL) w Pythonie efektywność wielowątkowości jest ograniczona w operacjach wymagających intensywnego użycia CPU, ale działa wydajnie dla operacji zależnych od I/O.

W poniższych sekcjach omówiono podstawy korzystania z modułu threading oraz sposób kontroli wątków.

Podstawowe użycie wątków

Tworzenie i uruchamianie wątków

Aby stworzyć wątek i wykonać przetwarzanie równoległe, użyj klasy threading.Thread. Określ funkcję docelową, aby utworzyć wątek i go wykonać.

 1import threading
 2import time
 3
 4# Function to be executed in a thread
 5def worker():
 6    print("Worker thread started")
 7    time.sleep(2)
 8    print("Worker thread finished")
 9
10# Create and start the thread
11thread = threading.Thread(target=worker)
12thread.start()
13
14# Processing in the main thread
15print("Main thread continues to run")
16
17# Wait for the thread to finish
18thread.join()
19print("Main thread finished")
  • W tym przykładzie funkcja worker jest wykonywana w osobnym wątku, podczas gdy wątek główny kontynuuje działanie. Wywołując metodę join(), wątek główny czeka na zakończenie wątku podrzędnego.

Nazywanie wątków

Nadawanie wątkom znaczących nazw ułatwia logowanie i debugowanie. Możesz to określić za pomocą argumentu name.

 1import threading
 2import time
 3
 4# Function to be executed in a thread
 5def worker():
 6    print("Worker thread started")
 7    time.sleep(2)
 8    print("Worker thread finished")
 9
10t = threading.Thread(
11    target=worker,
12    args=("named-worker", 0.3),
13    name="MyWorkerThread"
14)
15
16t.start()
17
18print("Active threads:", threading.active_count())
19for th in threading.enumerate():
20    print(" -", th.name)
21
22t.join()
  • threading.enumerate() zwraca listę bieżących wątków, co jest przydatne podczas debugowania i monitorowania stanu.

Dziedziczenie klasy Thread

Jeśli chcesz dostosować klasę wykonującą wątek, możesz zdefiniować nową klasę, dziedzicząc po klasie threading.Thread.

 1import threading
 2import time
 3
 4# Inherit from the Thread class
 5class WorkerThread(threading.Thread):
 6    def __init__(self, name, delay, repeat=3):
 7        super().__init__(name=name)
 8        self.delay = delay
 9        self.repeat = repeat
10        self.results = []
11
12    def run(self):
13        for i in range(self.repeat):
14            msg = f"{self.name} step {i+1}"
15            print(msg)
16            self.results.append(msg)
17            time.sleep(self.delay)
18
19# Create and start the threads
20t1 = WorkerThread("Worker-A", delay=0.4, repeat=3)
21t2 = WorkerThread("Worker-B", delay=0.2, repeat=5)
22
23t1.start()
24t2.start()
25
26t1.join()
27t2.join()
28
29print("Results A:", t1.results)
30print("Results B:", t2.results)
  • W tym przykładzie metoda run() jest nadpisana w celu zdefiniowania zachowania wątku, co pozwala każdemu wątkowi przechowywać własne dane. To jest przydatne, gdy wątki wykonują złożone operacje lub gdy chcesz, aby każdy wątek miał własne, niezależne dane.

Synchronizacja pomiędzy wątkami

Gdy wiele wątków jednocześnie uzyskuje dostęp do współdzielonych zasobów, mogą wystąpić wyścigi danych. Aby temu zapobiec, moduł threading udostępnia kilka mechanizmów synchronizacji.

Blokada (Lock)

Obiekt Lock służy do implementacji wyłącznej kontroli zasobów pomiędzy wątkami. Podczas gdy jeden wątek blokuje zasób, inne wątki nie mogą uzyskać do niego dostępu.

 1import threading
 2
 3lock = threading.Lock()
 4shared_resource = 0
 5
 6def worker():
 7    global shared_resource
 8    with lock:  # Acquire the lock
 9        local_copy = shared_resource
10        local_copy += 1
11        shared_resource = local_copy
12
13threads = [threading.Thread(target=worker) for _ in range(5)]
14
15for t in threads:
16    t.start()
17
18for t in threads:
19    t.join()
20
21print(f"Final value of shared resource: {shared_resource}")
  • W tym przykładzie pięć wątków uzyskuje dostęp do współdzielonego zasobu, ale Lock jest używane, aby zapobiec jednoczesnemu modyfikowaniu danych przez wiele wątków.

Blokada rekurencyjna (RLock)

Jeśli ten sam wątek musi zdobyć blokadę wielokrotnie, użyj RLock (blokady rekurencyjnej). To przydaje się przy wywołaniach rekurencyjnych albo w wywołaniach bibliotecznych, które mogą zakładać blokady podczas różnych wywołań.

 1import threading
 2
 3rlock = threading.RLock()
 4shared = []
 5
 6def outer():
 7    with rlock:
 8        shared.append("outer")
 9        inner()
10
11def inner():
12    with rlock:
13        shared.append("inner")
14
15t = threading.Thread(target=outer)
16t.start()
17t.join()
18print(shared)
  • Dzięki RLock ten sam wątek może ponownie zdobyć blokadę, którą już posiada, co pomaga zapobiec zakleszczeniom podczas zagnieżdżonego blokowania.

Stan (Condition)

Condition jest używany, aby wątki czekały, aż zostanie spełniony określony warunek. Gdy wątek spełni warunek, możesz wywołać notify(), aby powiadomić inny wątek lub notify_all(), aby powiadomić wszystkie oczekujące wątki.

Poniżej znajduje się przykład producenta i konsumenta korzystających z Condition.

 1import threading
 2
 3condition = threading.Condition()
 4shared_data = []
 5
 6def producer():
 7    with condition:
 8        shared_data.append(1)
 9        print("Produced an item")
10        condition.notify()  # Notify the consumer
11
12def consumer():
13    with condition:
14        condition.wait()  # Wait until the condition is met
15
16        item = shared_data.pop(0)
17        print(f"Consumed an item: {item}")
18
19# Create the threads
20producer_thread = threading.Thread(target=producer)
21consumer_thread = threading.Thread(target=consumer)
22
23consumer_thread.start()
24producer_thread.start()
25
26producer_thread.join()
27consumer_thread.join()
  • Ten kod używa Condition, dzięki czemu producent powiadamia o dodaniu danych, a konsument czeka na to powiadomienie przed pobraniem danych, osiągając synchronizację.

Demonizowanie wątków

Wątki demonów są przerywane siłą, gdy kończy się główny wątek. Podczas gdy normalne wątki muszą czekać na zakończenie, wątki demony kończą się automatycznie.

 1import threading
 2import time
 3
 4def worker():
 5    while True:
 6        print("Working...")
 7        time.sleep(1)
 8
 9# Create a daemon thread
10thread = threading.Thread(target=worker)
11thread.daemon = True  # Set as a daemon thread
12
13thread.start()
14
15# Processing in the main thread
16time.sleep(3)
17print("Main thread finished")
  • W tym przykładzie wątek worker jest demonizowany, więc zostanie siłą zakończony, gdy główny wątek się skończy.

Zarządzanie wątkami za pomocą ThreadPoolExecutor

Oprócz modułu threading można używać ThreadPoolExecutor z modułu concurrent.futures do zarządzania pulą wątków i równoległego wykonywania zadań.

 1from concurrent.futures import ThreadPoolExecutor
 2import time
 3
 4def worker(seconds):
 5    print(f"Sleeping for {seconds} second(s)")
 6    time.sleep(seconds)
 7    return f"Finished sleeping for {seconds} second(s)"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    futures = [executor.submit(worker, i) for i in range(1, 4)]
11    for future in futures:
12        print(future.result())
  • ThreadPoolExecutor tworzy pulę wątków i efektywnie przetwarza zadania. Określ liczbę wątków do uruchomienia równocześnie za pomocą max_workers.

Komunikacja zdarzeń między wątkami

Korzystając z threading.Event, możesz ustawić flagi między wątkami, aby powiadomić inne wątki o wystąpieniu zdarzenia.

 1import threading
 2import time
 3
 4event = threading.Event()
 5
 6def worker():
 7    print("Waiting for event to be set")
 8    event.wait()  # Wait until the event is set
 9
10    print("Event received, continuing work")
11
12thread = threading.Thread(target=worker)
13thread.start()
14
15time.sleep(2)
16print("Setting the event")
17event.set()  # Set the event and notify the thread
  • Ten kod pokazuje mechanizm, w którym wątek roboczy czeka na sygnał Event i wznawia przetwarzanie, gdy wątek główny wywoła event.set().

Obsługa wyjątków i kończenie wątków

Gdy wyjątki występują w wątkach, nie są one bezpośrednio przekazywane do wątku głównego, dlatego potrzebny jest wzorzec, aby je przechwytywać i udostępniać.

 1import threading
 2import queue
 3
 4def worker(err_q):
 5    try:
 6        raise ValueError("Something bad")
 7    except Exception as e:
 8        err_q.put(e)
 9
10q = queue.Queue()
11t = threading.Thread(target=worker, args=(q,))
12t.start()
13t.join()
14if not q.empty():
15    exc = q.get()
16    print("Worker raised:", exc)
  • Poprzez umieszczenie wyjątków w kolejce (Queue) i pobranie ich w głównym wątku, możesz niezawodnie wykrywać błędy. Jeśli używasz concurrent.futures.ThreadPoolExecutor, wyjątki są ponownie zgłaszane za pomocą future.result(), co ułatwia ich obsługę.

GIL (Global Interpreter Lock) i jego skutki

Ze względu na mechanizm GIL (Global Interpreter Lock) w CPython, wiele bajtkodów Pythona nie działa faktycznie równocześnie w tym samym procesie. Dla zadań intensywnych obliczeniowo, takich jak skomplikowane obliczenia, zaleca się użycie multiprocessing. Z kolei dla zadań związanych z operacjami wejścia-wyjścia, jak czytanie plików czy komunikacja sieciowa, threading działa skutecznie.

Podsumowanie

Korzystając z modułu threading w Pythonie, możesz implementować programy wielowątkowe i wykonywać wiele procesów równocześnie. Dzięki mechanizmom synchronizacji, takim jak Lock i Condition, możesz bezpiecznie uzyskiwać dostęp do współdzielonych zasobów i przeprowadzać złożoną synchronizację. Ponadto dzięki stosowaniu wątków demonicznych lub ThreadPoolExecutor zarządzanie wątkami i wydajne przetwarzanie równoległe staje się łatwiejsze.

Możesz śledzić ten artykuł, korzystając z Visual Studio Code na naszym kanale YouTube. Proszę również sprawdzić nasz kanał YouTube.

YouTube Video