Module `threading` trong Python

Module `threading` trong Python

Bài viết này giải thích module threading trong Python.

YouTube Video

Module threading trong Python

Module threading trong Python là một thư viện chuẩn hỗ trợ lập trình đa luồng. Việc sử dụng thread cho phép nhiều tiến trình chạy cùng lúc, giúp cải thiện hiệu năng của chương trình, đặc biệt trong các tình huống liên quan đến các thao tác chờ như I/O. Do Global Interpreter Lock (GIL) của Python, hiệu suất của lập trình đa luồng bị giới hạn đối với các thao tác phụ thuộc CPU, nhưng hoạt động hiệu quả đối với các thao tác phụ thuộc I/O.

Các phần sau đây giải thích về các kiến thức cơ bản khi sử dụng module threading và cách điều khiển luồng.

Cách sử dụng cơ bản của Threads

Tạo và chạy Threads

Để tạo một thread và thực hiện xử lý đồng thời, sử dụng lớp threading.Thread. Chỉ định hàm mục tiêu để tạo một thread và thực thi thread đó.

 1import threading
 2import time
 3
 4# Function to be executed in a thread
 5def worker():
 6    print("Worker thread started")
 7    time.sleep(2)
 8    print("Worker thread finished")
 9
10# Create and start the thread
11thread = threading.Thread(target=worker)
12thread.start()
13
14# Processing in the main thread
15print("Main thread continues to run")
16
17# Wait for the thread to finish
18thread.join()
19print("Main thread finished")
  • Trong ví dụ này, hàm worker được thực thi trong một thread riêng biệt, trong khi thread chính tiếp tục hoạt động. Bằng cách gọi phương thức join(), thread chính sẽ chờ thread phụ hoàn thành.

Đặt tên cho các luồng

Việc đặt tên có ý nghĩa cho các luồng giúp ghi log và gỡ lỗi dễ dàng hơn. Bạn có thể chỉ định nó bằng đối số name.

 1import threading
 2import time
 3
 4# Function to be executed in a thread
 5def worker():
 6    print("Worker thread started")
 7    time.sleep(2)
 8    print("Worker thread finished")
 9
10t = threading.Thread(
11    target=worker,
12    args=("named-worker", 0.3),
13    name="MyWorkerThread"
14)
15
16t.start()
17
18print("Active threads:", threading.active_count())
19for th in threading.enumerate():
20    print(" -", th.name)
21
22t.join()
  • threading.enumerate() trả về danh sách các luồng hiện tại, hữu ích cho việc gỡ lỗi và theo dõi trạng thái.

Kế thừa lớp Thread

Nếu bạn muốn tùy chỉnh lớp thực thi thread, bạn có thể định nghĩa một lớp mới bằng cách kế thừa lớp threading.Thread.

 1import threading
 2import time
 3
 4# Inherit from the Thread class
 5class WorkerThread(threading.Thread):
 6    def __init__(self, name, delay, repeat=3):
 7        super().__init__(name=name)
 8        self.delay = delay
 9        self.repeat = repeat
10        self.results = []
11
12    def run(self):
13        for i in range(self.repeat):
14            msg = f"{self.name} step {i+1}"
15            print(msg)
16            self.results.append(msg)
17            time.sleep(self.delay)
18
19# Create and start the threads
20t1 = WorkerThread("Worker-A", delay=0.4, repeat=3)
21t2 = WorkerThread("Worker-B", delay=0.2, repeat=5)
22
23t1.start()
24t2.start()
25
26t1.join()
27t2.join()
28
29print("Results A:", t1.results)
30print("Results B:", t2.results)
  • Trong ví dụ này, phương thức run() được ghi đè để xác định hành vi của luồng, cho phép mỗi luồng giữ dữ liệu riêng của mình. Điều này hữu ích khi các luồng thực hiện xử lý phức tạp hoặc khi bạn muốn mỗi luồng có dữ liệu độc lập riêng.

Đồng bộ hóa giữa các Threads

Khi nhiều thread cùng lúc truy cập vào tài nguyên chung, có thể xảy ra xung đột dữ liệu (data races). Để ngăn chặn điều này, module threading cung cấp nhiều cơ chế đồng bộ hóa.

Khóa (Lock)

Đối tượng Lock được sử dụng để thực hiện kiểm soát độc quyền tài nguyên giữa các thread. Khi một thread khóa một tài nguyên, các thread khác không thể truy cập tài nguyên đó.

 1import threading
 2
 3lock = threading.Lock()
 4shared_resource = 0
 5
 6def worker():
 7    global shared_resource
 8    with lock:  # Acquire the lock
 9        local_copy = shared_resource
10        local_copy += 1
11        shared_resource = local_copy
12
13threads = [threading.Thread(target=worker) for _ in range(5)]
14
15for t in threads:
16    t.start()
17
18for t in threads:
19    t.join()
20
21print(f"Final value of shared resource: {shared_resource}")
  • Trong ví dụ này, năm thread truy cập vào một tài nguyên chung, nhưng Lock được sử dụng để ngăn không cho nhiều thread cùng lúc sửa đổi dữ liệu.

Khóa tái nhập (RLock)

Nếu cùng một luồng cần chiếm giữ khóa nhiều lần, hãy sử dụng RLock (khóa tái nhập). Điều này hữu ích cho các cuộc gọi đệ quy hoặc khi thư viện có thể chiếm giữ khóa giữa các lần gọi khác nhau.

 1import threading
 2
 3rlock = threading.RLock()
 4shared = []
 5
 6def outer():
 7    with rlock:
 8        shared.append("outer")
 9        inner()
10
11def inner():
12    with rlock:
13        shared.append("inner")
14
15t = threading.Thread(target=outer)
16t.start()
17t.join()
18print(shared)
  • Với RLock, cùng một luồng có thể chiếm giữ lại khóa mà nó đã giữ, giúp ngăn ngừa tình trạng deadlock khi chiếm giữ khóa lồng nhau.

Điều kiện (Condition)

Condition được sử dụng để các luồng chờ cho đến khi một điều kiện cụ thể được đáp ứng. Khi một luồng thỏa mãn một điều kiện, bạn có thể gọi notify() để thông báo cho một luồng khác, hoặc notify_all() để thông báo cho tất cả các luồng đang chờ đợi.

Dưới đây là ví dụ về nhà sản xuất và người tiêu dùng sử dụng Condition.

 1import threading
 2
 3condition = threading.Condition()
 4shared_data = []
 5
 6def producer():
 7    with condition:
 8        shared_data.append(1)
 9        print("Produced an item")
10        condition.notify()  # Notify the consumer
11
12def consumer():
13    with condition:
14        condition.wait()  # Wait until the condition is met
15
16        item = shared_data.pop(0)
17        print(f"Consumed an item: {item}")
18
19# Create the threads
20producer_thread = threading.Thread(target=producer)
21consumer_thread = threading.Thread(target=consumer)
22
23consumer_thread.start()
24producer_thread.start()
25
26producer_thread.join()
27consumer_thread.join()
  • Mã này sử dụng Condition để nhà sản xuất thông báo khi có dữ liệu mới, và người tiêu dùng sẽ chờ thông báo đó trước khi lấy dữ liệu, giúp đồng bộ hóa.

Tạo daemon cho luồng

Các luồng daemon bị kết thúc cưỡng bức khi luồng chính dừng lại. Trong khi các luồng bình thường phải chờ để kết thúc, luồng daemon tự động kết thúc.

 1import threading
 2import time
 3
 4def worker():
 5    while True:
 6        print("Working...")
 7        time.sleep(1)
 8
 9# Create a daemon thread
10thread = threading.Thread(target=worker)
11thread.daemon = True  # Set as a daemon thread
12
13thread.start()
14
15# Processing in the main thread
16time.sleep(3)
17print("Main thread finished")
  • Trong ví dụ này, luồng worker được tạo daemon, vì vậy nó sẽ bị kết thúc cưỡng bức khi luồng chính dừng lại.

Quản lý luồng với ThreadPoolExecutor

Ngoài module threading, bạn có thể sử dụng ThreadPoolExecutor từ module concurrent.futures để quản lý nhóm luồng và thực hiện các tác vụ song song.

 1from concurrent.futures import ThreadPoolExecutor
 2import time
 3
 4def worker(seconds):
 5    print(f"Sleeping for {seconds} second(s)")
 6    time.sleep(seconds)
 7    return f"Finished sleeping for {seconds} second(s)"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    futures = [executor.submit(worker, i) for i in range(1, 4)]
11    for future in futures:
12        print(future.result())
  • ThreadPoolExecutor tạo ra một nhóm luồng và xử lý các tác vụ một cách hiệu quả. Xác định số lượng luồng chạy đồng thời bằng max_workers.

Giao tiếp sự kiện giữa các luồng

Sử dụng threading.Event, bạn có thể đặt cờ giữa các luồng để thông báo cho các luồng khác về sự kiện đã xảy ra.

 1import threading
 2import time
 3
 4event = threading.Event()
 5
 6def worker():
 7    print("Waiting for event to be set")
 8    event.wait()  # Wait until the event is set
 9
10    print("Event received, continuing work")
11
12thread = threading.Thread(target=worker)
13thread.start()
14
15time.sleep(2)
16print("Setting the event")
17event.set()  # Set the event and notify the thread
  • Mã này trình bày cơ chế mà luồng công nhân chờ tín hiệu Event và tiếp tục xử lý khi luồng chính gọi event.set().

Xử lý ngoại lệ và kết thúc luồng trong các luồng

Khi xảy ra ngoại lệ trong các luồng, chúng không được truyền trực tiếp đến luồng chính, do đó cần một mẫu để thu thập và chia sẻ ngoại lệ.

 1import threading
 2import queue
 3
 4def worker(err_q):
 5    try:
 6        raise ValueError("Something bad")
 7    except Exception as e:
 8        err_q.put(e)
 9
10q = queue.Queue()
11t = threading.Thread(target=worker, args=(q,))
12t.start()
13t.join()
14if not q.empty():
15    exc = q.get()
16    print("Worker raised:", exc)
  • Bằng cách đưa các ngoại lệ vào một Queue và lấy chúng trong luồng chính, bạn có thể phát hiện lỗi một cách đáng tin cậy. Nếu bạn sử dụng concurrent.futures.ThreadPoolExecutor, ngoại lệ sẽ được ném lại với future.result(), giúp xử lý dễ dàng hơn.

GIL (Global Interpreter Lock) và tác động của nó

Do cơ chế của GIL (Global Interpreter Lock) trong CPython, nhiều mã byte Python không thực sự chạy đồng thời trong cùng một tiến trình. Đối với các tác vụ sử dụng nhiều CPU như tính toán nặng, nên sử dụng multiprocessing. Ngược lại, đối với các tác vụ giới hạn I/O như đọc file hoặc giao tiếp mạng, threading hoạt động hiệu quả.

Tóm tắt

Sử dụng module threading của Python, bạn có thể triển khai chương trình đa luồng và thực thi nhiều tiến trình đồng thời. Với các cơ chế đồng bộ như LockCondition, bạn có thể truy cập tài nguyên chung một cách an toàn và thực hiện các tác vụ đồng bộ phức tạp. Ngoài ra, việc sử dụng các luồng daemon hoặc ThreadPoolExecutor giúp quản lý luồng và xử lý song song hiệu quả trở nên dễ dàng hơn.

Bạn có thể làm theo bài viết trên bằng cách sử dụng Visual Studio Code trên kênh YouTube của chúng tôi. Vui lòng ghé thăm kênh YouTube.

YouTube Video