Модуль `threading` в Python

Модуль `threading` в Python

Эта статья объясняет модуль threading в Python.

YouTube Video

Модуль threading в Python

Модуль threading в Python — это стандартная библиотека, которая поддерживает многопоточное программирование. Использование потоков позволяет нескольким процессам выполняться одновременно, что может улучшить производительность программ, особенно в случаях с блокирующими операциями, такими как ожидание ввода-вывода. Из-за глобальной блокировки интерпретатора (GIL) в Python эффективность многопоточности ограничена для операций, нагружающих процессор, но она хорошо работает для операций ввода-вывода.

Следующие разделы объясняют основы использования модуля threading и управление потоками.

Основное использование потоков

Создание и запуск потоков

Чтобы создать поток и выполнять параллельную обработку, используйте класс threading.Thread. Укажите целевую функцию для создания потока и выполнения этой функции.

 1import threading
 2import time
 3
 4# Function to be executed in a thread
 5def worker():
 6    print("Worker thread started")
 7    time.sleep(2)
 8    print("Worker thread finished")
 9
10# Create and start the thread
11thread = threading.Thread(target=worker)
12thread.start()
13
14# Processing in the main thread
15print("Main thread continues to run")
16
17# Wait for the thread to finish
18thread.join()
19print("Main thread finished")
  • В этом примере функция worker выполняется в отдельном потоке, в то время как главный поток продолжает свою работу. Вызвав метод join(), главный поток ожидает завершения дочернего потока.

Именование потоков

Присвоение потокам осмысленных имён облегчает ведение журнала и отладку. Вы можете указать имя с помощью аргумента name.

 1import threading
 2import time
 3
 4# Function to be executed in a thread
 5def worker():
 6    print("Worker thread started")
 7    time.sleep(2)
 8    print("Worker thread finished")
 9
10t = threading.Thread(
11    target=worker,
12    args=("named-worker", 0.3),
13    name="MyWorkerThread"
14)
15
16t.start()
17
18print("Active threads:", threading.active_count())
19for th in threading.enumerate():
20    print(" -", th.name)
21
22t.join()
  • threading.enumerate() возвращает список текущих потоков, что полезно для отладки и мониторинга состояния.

Наследование класса Thread

Если вы хотите настроить класс выполнения потока, вы можете определить новый класс, унаследовав его от threading.Thread.

 1import threading
 2import time
 3
 4# Inherit from the Thread class
 5class WorkerThread(threading.Thread):
 6    def __init__(self, name, delay, repeat=3):
 7        super().__init__(name=name)
 8        self.delay = delay
 9        self.repeat = repeat
10        self.results = []
11
12    def run(self):
13        for i in range(self.repeat):
14            msg = f"{self.name} step {i+1}"
15            print(msg)
16            self.results.append(msg)
17            time.sleep(self.delay)
18
19# Create and start the threads
20t1 = WorkerThread("Worker-A", delay=0.4, repeat=3)
21t2 = WorkerThread("Worker-B", delay=0.2, repeat=5)
22
23t1.start()
24t2.start()
25
26t1.join()
27t2.join()
28
29print("Results A:", t1.results)
30print("Results B:", t2.results)
  • В этом примере метод run() переопределяется для определения поведения потока, позволяя каждому потоку иметь свои собственные данные. Это полезно, когда потоки выполняют сложную обработку или когда требуется, чтобы у каждого потока были собственные независимые данные.

Синхронизация между потоками

Когда несколько потоков одновременно получают доступ к общим ресурсам, могут возникать состояния гонки данных. Для предотвращения этого модуль threading предоставляет несколько механизмов синхронизации.

Блокировка (Lock)

Объект Lock используется для реализации эксклюзивного управления ресурсами между потоками. Пока один поток блокирует ресурс, другие потоки не имеют к нему доступа.

 1import threading
 2
 3lock = threading.Lock()
 4shared_resource = 0
 5
 6def worker():
 7    global shared_resource
 8    with lock:  # Acquire the lock
 9        local_copy = shared_resource
10        local_copy += 1
11        shared_resource = local_copy
12
13threads = [threading.Thread(target=worker) for _ in range(5)]
14
15for t in threads:
16    t.start()
17
18for t in threads:
19    t.join()
20
21print(f"Final value of shared resource: {shared_resource}")
  • В этом примере пять потоков получают доступ к общему ресурсу, но Lock используется для предотвращения одновременного изменения данных несколькими потоками.

Рекурсивная блокировка (RLock)

Если одному и тому же потоку необходимо несколько раз захватить блокировку, используйте RLock (рекурсивную блокировку). Это полезно для рекурсивных вызовов или при использовании библиотек, которые могут захватывать блокировки в разных вызовах.

 1import threading
 2
 3rlock = threading.RLock()
 4shared = []
 5
 6def outer():
 7    with rlock:
 8        shared.append("outer")
 9        inner()
10
11def inner():
12    with rlock:
13        shared.append("inner")
14
15t = threading.Thread(target=outer)
16t.start()
17t.join()
18print(shared)
  • С помощью RLock один и тот же поток может повторно захватывать уже удерживаемую им блокировку, что помогает избежать взаимных блокировок при вложенном захвате.

Условие (Condition)

Condition используется для приостановки потоков до выполнения определенного условия. Когда поток удовлетворяет условию, вы можете вызвать notify(), чтобы уведомить другой поток, или notify_all(), чтобы уведомить все ожидающие потоки.

Ниже приведён пример производителя и потребителя с использованием Condition.

 1import threading
 2
 3condition = threading.Condition()
 4shared_data = []
 5
 6def producer():
 7    with condition:
 8        shared_data.append(1)
 9        print("Produced an item")
10        condition.notify()  # Notify the consumer
11
12def consumer():
13    with condition:
14        condition.wait()  # Wait until the condition is met
15
16        item = shared_data.pop(0)
17        print(f"Consumed an item: {item}")
18
19# Create the threads
20producer_thread = threading.Thread(target=producer)
21consumer_thread = threading.Thread(target=consumer)
22
23consumer_thread.start()
24producer_thread.start()
25
26producer_thread.join()
27consumer_thread.join()
  • Этот код использует Condition, чтобы производитель уведомлял о добавлении данных, а потребитель ждал этого уведомления перед извлечением данных, обеспечивая синхронизацию.

Демонизация потоков

Демон-потоки принудительно завершаются при завершении основного потока. Обычные потоки должны дождаться завершения, в то время как демон-потоки завершаются автоматически.

 1import threading
 2import time
 3
 4def worker():
 5    while True:
 6        print("Working...")
 7        time.sleep(1)
 8
 9# Create a daemon thread
10thread = threading.Thread(target=worker)
11thread.daemon = True  # Set as a daemon thread
12
13thread.start()
14
15# Processing in the main thread
16time.sleep(3)
17print("Main thread finished")
  • В этом примере поток worker является демонизированным, поэтому он принудительно завершается при завершении основного потока.

Управление потоками с помощью ThreadPoolExecutor

Помимо модуля threading, вы можете использовать ThreadPoolExecutor из модуля concurrent.futures для управления пулом потоков и выполнения задач параллельно.

 1from concurrent.futures import ThreadPoolExecutor
 2import time
 3
 4def worker(seconds):
 5    print(f"Sleeping for {seconds} second(s)")
 6    time.sleep(seconds)
 7    return f"Finished sleeping for {seconds} second(s)"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    futures = [executor.submit(worker, i) for i in range(1, 4)]
11    for future in futures:
12        print(future.result())
  • ThreadPoolExecutor создает пул потоков и эффективно обрабатывает задачи. Укажите количество потоков для одновременного выполнения с помощью max_workers.

Обмен событиями между потоками

Используя threading.Event, вы можете устанавливать флаги между потоками, чтобы уведомлять другие потоки о наступлении события.

 1import threading
 2import time
 3
 4event = threading.Event()
 5
 6def worker():
 7    print("Waiting for event to be set")
 8    event.wait()  # Wait until the event is set
 9
10    print("Event received, continuing work")
11
12thread = threading.Thread(target=worker)
13thread.start()
14
15time.sleep(2)
16print("Setting the event")
17event.set()  # Set the event and notify the thread
  • Этот код демонстрирует механизм, при котором рабочий поток ожидает сигнала Event и продолжает выполнение, когда главный поток вызывает event.set().

Обработка исключений и завершение работы потоков

Когда в потоках возникают исключения, они не передаются напрямую в главный поток, поэтому требуется шаблон для перехвата и передачи исключений.

 1import threading
 2import queue
 3
 4def worker(err_q):
 5    try:
 6        raise ValueError("Something bad")
 7    except Exception as e:
 8        err_q.put(e)
 9
10q = queue.Queue()
11t = threading.Thread(target=worker, args=(q,))
12t.start()
13t.join()
14if not q.empty():
15    exc = q.get()
16    print("Worker raised:", exc)
  • Помещая исключения в Queue и извлекая их в основном потоке, вы можете надёжно обнаруживать сбои. Если вы используете concurrent.futures.ThreadPoolExecutor, исключения будут повторно возбуждены с помощью future.result(), что упрощает их обработку.

GIL (Глобальная блокировка интерпретатора) и его влияние

Из-за механизма GIL (Глобальная блокировка интерпретатора) в CPython несколько байт-кодов Python фактически не выполняются одновременно в одном процессе. Для задач, требующих интенсивной загрузки процессора, таких как сложные вычисления, рекомендуется использовать multiprocessing. С другой стороны, для задач, связанных с вводом-выводом, таких как чтение файлов или сетевая коммуникация, эффективно работает threading.

Резюме

Используя модуль threading в Python, вы можете реализовать многопоточные программы и выполнять несколько процессов одновременно. С помощью механизмов синхронизации, таких как Lock и Condition, вы можете безопасно получать доступ к общим ресурсам и выполнять сложную синхронизацию. Кроме того, использование демон-потоков или ThreadPoolExecutor упрощает управление потоками и эффективную параллельную обработку.

Вы можете следовать этой статье, используя Visual Studio Code на нашем YouTube-канале. Пожалуйста, также посмотрите наш YouTube-канал.

YouTube Video