โมดูล `threading` ใน Python

โมดูล `threading` ใน Python

บทความนี้อธิบายเกี่ยวกับโมดูล threading ใน Python

YouTube Video

โมดูล threading ใน Python

โมดูล threading ใน Python เป็นไลบรารีมาตรฐานที่สนับสนุนการเขียนโปรแกรมหลายเธรด การใช้เธรดช่วยให้กระบวนการหลายตัวทำงานพร้อมกันได้ ซึ่งสามารถเพิ่มประสิทธิภาพของโปรแกรม โดยเฉพาะในกรณีที่เกี่ยวข้องกับการรอ I/O เนื่องจาก Global Interpreter Lock (GIL) ของไพธอน ประสิทธิภาพของมัลติเธรดจึงมีข้อจำกัดเมื่อใช้กับงานที่ใช้ CPU เป็นหลัก แต่เหมาะสมกับงานที่เน้น I/O

ส่วนต่อไปนี้อธิบายพื้นฐานการใช้งานโมดูล threading และการควบคุมเธรด

การใช้งานพื้นฐานของเธรด

การสร้างและการเรียกใช้งานเธรด

ในการสร้างเธรดและดำเนินการแบบขนาน ให้ใช้คลาส threading.Thread กำหนดฟังก์ชั่นเป้าหมายเพื่อสร้างเธรดและเรียกใช้งานเธรดนั้น

 1import threading
 2import time
 3
 4# Function to be executed in a thread
 5def worker():
 6    print("Worker thread started")
 7    time.sleep(2)
 8    print("Worker thread finished")
 9
10# Create and start the thread
11thread = threading.Thread(target=worker)
12thread.start()
13
14# Processing in the main thread
15print("Main thread continues to run")
16
17# Wait for the thread to finish
18thread.join()
19print("Main thread finished")
  • ในตัวอย่างนี้ ฟังก์ชัน worker จะถูกเรียกใช้งานในเธรดแยก ขณะที่เธรดหลักยังคงทำงานต่อไป โดยการเรียกใช้เมธอด join() เธรดหลักจะรอให้เธรดย่อยทำงานเสร็จสมบูรณ์

การตั้งชื่อเธรด

การตั้งชื่อที่มีความหมายให้เธรดจะช่วยให้การบันทึกข้อมูลและการดีบักง่ายขึ้น คุณสามารถระบุชื่อได้โดยใช้อาร์กิวเมนต์ name

 1import threading
 2import time
 3
 4# Function to be executed in a thread
 5def worker():
 6    print("Worker thread started")
 7    time.sleep(2)
 8    print("Worker thread finished")
 9
10t = threading.Thread(
11    target=worker,
12    args=("named-worker", 0.3),
13    name="MyWorkerThread"
14)
15
16t.start()
17
18print("Active threads:", threading.active_count())
19for th in threading.enumerate():
20    print(" -", th.name)
21
22t.join()
  • threading.enumerate() จะส่งคืนรายชื่อเธรดที่กำลังทำงานอยู่ ซึ่งมีประโยชน์ในการดีบักและติดตามสถานะ

การสืบทอดคลาสเธรด

หากคุณต้องการปรับแต่งคลาสที่ใช้ในการเรียกใช้งานเธรด คุณสามารถกำหนดคลาสใหม่โดยสืบทอดจากคลาส threading.Thread

 1import threading
 2import time
 3
 4# Inherit from the Thread class
 5class WorkerThread(threading.Thread):
 6    def __init__(self, name, delay, repeat=3):
 7        super().__init__(name=name)
 8        self.delay = delay
 9        self.repeat = repeat
10        self.results = []
11
12    def run(self):
13        for i in range(self.repeat):
14            msg = f"{self.name} step {i+1}"
15            print(msg)
16            self.results.append(msg)
17            time.sleep(self.delay)
18
19# Create and start the threads
20t1 = WorkerThread("Worker-A", delay=0.4, repeat=3)
21t2 = WorkerThread("Worker-B", delay=0.2, repeat=5)
22
23t1.start()
24t2.start()
25
26t1.join()
27t2.join()
28
29print("Results A:", t1.results)
30print("Results B:", t2.results)
  • ในตัวอย่างนี้ เมทอด run() ถูกเขียนทับเพื่อกำหนดพฤติกรรมของเธรด ทำให้แต่ละเธรดสามารถจัดการข้อมูลของตัวเองได้ สิ่งนี้มีประโยชน์เมื่อเธรดต้องประมวลผลที่ซับซ้อนหรือเมื่อคุณต้องการให้แต่ละเธรดมีข้อมูลอิสระของตัวเอง

การซิงโครไนซ์ระหว่างเธรด

เมื่อหลายเธรดเข้าถึงทรัพยากรร่วมกันพร้อมกัน อาจเกิดปัญหา data race ได้ เพื่อป้องกันปัญหานี้ โมดูล threading มีการซิงโครไนซ์หลายรูปแบบให้ใช้งาน

ล็อค (Lock)

ออบเจ็กต์ Lock ใช้เพื่อควบคุมการเข้าถึงทรัพยากรร่วมกันระหว่างเธรดแบบเฉพาะตัว เมื่อเธรดหนึ่งล็อคทรัพยากร เธรดอื่นจะไม่สามารถเข้าถึงทรัพยากรนั้นได้

 1import threading
 2
 3lock = threading.Lock()
 4shared_resource = 0
 5
 6def worker():
 7    global shared_resource
 8    with lock:  # Acquire the lock
 9        local_copy = shared_resource
10        local_copy += 1
11        shared_resource = local_copy
12
13threads = [threading.Thread(target=worker) for _ in range(5)]
14
15for t in threads:
16    t.start()
17
18for t in threads:
19    t.join()
20
21print(f"Final value of shared resource: {shared_resource}")
  • ในตัวอย่างนี้ มีเธรด 5 ตัวที่เข้าถึงทรัพยากรร่วมกัน แต่ใช้ Lock เพื่อป้องกันไม่ให้เธรดหลายตัวแก้ไขข้อมูลในเวลาเดียวกัน

ล็อกซ้ำได้ (RLock)

หากเธรดเดียวกันต้องเข้าถึงล็อกหลายครั้ง ให้ใช้ RLock (รีเอนแทรนต์ล็อก) สิ่งนี้มีประโยชน์สำหรับการเรียกแบบเวียนซ้ำ (recursive) หรือเรียกไลบรารีที่อาจใช้งานล็อกในการเรียกหลายครั้ง

 1import threading
 2
 3rlock = threading.RLock()
 4shared = []
 5
 6def outer():
 7    with rlock:
 8        shared.append("outer")
 9        inner()
10
11def inner():
12    with rlock:
13        shared.append("inner")
14
15t = threading.Thread(target=outer)
16t.start()
17t.join()
18print(shared)
  • ด้วย RLock เธรดเดียวกันสามารถขอล็อกที่มันถืออยู่ซ้ำได้ ซึ่งช่วยป้องกันปัญหาเดดล็อกจากการล็อกซ้อนกัน

เงื่อนไข (Condition)

Condition ถูกใช้เพื่อให้เธรดรอจนกว่าเงื่อนไขเฉพาะจะถูกตอบสนอง เมื่อเธรดหนึ่งตรงตามเงื่อนไข คุณสามารถเรียก notify() เพื่อแจ้งเตือนเธรดอื่น หรือ notify_all() เพื่อแจ้งเตือนเธรดทั้งหมดที่กำลังรออยู่

ด้านล่างเป็นตัวอย่างของโปรดิวเซอร์และคอนซูเมอร์ที่ใช้ Condition

 1import threading
 2
 3condition = threading.Condition()
 4shared_data = []
 5
 6def producer():
 7    with condition:
 8        shared_data.append(1)
 9        print("Produced an item")
10        condition.notify()  # Notify the consumer
11
12def consumer():
13    with condition:
14        condition.wait()  # Wait until the condition is met
15
16        item = shared_data.pop(0)
17        print(f"Consumed an item: {item}")
18
19# Create the threads
20producer_thread = threading.Thread(target=producer)
21consumer_thread = threading.Thread(target=consumer)
22
23consumer_thread.start()
24producer_thread.start()
25
26producer_thread.join()
27consumer_thread.join()
  • โค้ดนี้ใช้ Condition เพื่อให้โปรดิวเซอร์แจ้งเตือนเมื่อมีการเพิ่มข้อมูล และคอนซูเมอร์จะรอการแจ้งเตือนนั้นก่อนที่จะดึงข้อมูล เพื่อให้เกิดการประสานกัน

การทำให้เธรดเป็น Daemon

เธรด Daemon จะถูกหยุดโดยบังคับเมื่อเธรดหลักสิ้นสุดลง ในขณะที่เธรดปกติต้องรอเพื่อสิ้นสุด เธรด Daemon จะหยุดทำงานโดยอัตโนมัติ

 1import threading
 2import time
 3
 4def worker():
 5    while True:
 6        print("Working...")
 7        time.sleep(1)
 8
 9# Create a daemon thread
10thread = threading.Thread(target=worker)
11thread.daemon = True  # Set as a daemon thread
12
13thread.start()
14
15# Processing in the main thread
16time.sleep(3)
17print("Main thread finished")
  • ในตัวอย่างนี้ เธรด worker ถูกตั้งให้เป็น Daemon ดังนั้นมันจะหยุดทำงานโดยบังคับเมื่อเธรดหลักสิ้นสุดลง

การจัดการเธรดด้วย ThreadPoolExecutor

นอกจากโมดูล threading คุณยังสามารถใช้ ThreadPoolExecutor จากโมดูล concurrent.futures เพื่อจัดการพูลเธรดและดำเนินงานแบบขนาน

 1from concurrent.futures import ThreadPoolExecutor
 2import time
 3
 4def worker(seconds):
 5    print(f"Sleeping for {seconds} second(s)")
 6    time.sleep(seconds)
 7    return f"Finished sleeping for {seconds} second(s)"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    futures = [executor.submit(worker, i) for i in range(1, 4)]
11    for future in futures:
12        print(future.result())
  • ThreadPoolExecutor สร้างพูลเธรดและประมวลผลงานอย่างมีประสิทธิภาพ ระบุจำนวนเธรดที่ต้องการให้ทำงานพร้อมกันด้วย max_workers

การสื่อสารเหตุการณ์ระหว่างเธรด

การใช้ threading.Event คุณสามารถตั้งค่าสถานะระหว่างเธรดเพื่อแจ้งให้เธรดอื่น ๆ ทราบเกี่ยวกับการเกิดเหตุการณ์

 1import threading
 2import time
 3
 4event = threading.Event()
 5
 6def worker():
 7    print("Waiting for event to be set")
 8    event.wait()  # Wait until the event is set
 9
10    print("Event received, continuing work")
11
12thread = threading.Thread(target=worker)
13thread.start()
14
15time.sleep(2)
16print("Setting the event")
17event.set()  # Set the event and notify the thread
  • โค้ดนี้แสดงกลไกที่เธรดงานจะรอสัญญาณ Event และจะดำเนินการต่อเมื่อเธรดหลักเรียก event.set()

การจัดการข้อยกเว้นและการหยุดเธรด

เมื่อเกิดข้อยกเว้นในเธรด ข้อยกเว้นจะไม่ถูกส่งต่อไปยังเธรดหลักโดยตรง จึงจำเป็นต้องมีรูปแบบเพื่อจับและแบ่งปันข้อยกเว้น

 1import threading
 2import queue
 3
 4def worker(err_q):
 5    try:
 6        raise ValueError("Something bad")
 7    except Exception as e:
 8        err_q.put(e)
 9
10q = queue.Queue()
11t = threading.Thread(target=worker, args=(q,))
12t.start()
13t.join()
14if not q.empty():
15    exc = q.get()
16    print("Worker raised:", exc)
  • โดยการใส่ข้อยกเว้นลงใน Queue และดึงมันมาในเธรดหลัก คุณสามารถตรวจจับข้อผิดพลาดได้อย่างแม่นยำ หากคุณใช้ concurrent.futures.ThreadPoolExecutor ข้อยกเว้นจะถูกโยนซ้ำเมื่อเรียก future.result() จึงทำให้จัดการได้ง่ายขึ้น

GIL (Global Interpreter Lock) และผลกระทบ

เนื่องจากกลไกของ GIL (Global Interpreter Lock) ใน CPython ไบต์โค้ดของ Python หลายตัวจึงไม่สามารถทำงานพร้อมกันในโปรเซสเดียวกันได้จริง สำหรับงานที่ใช้ CPU หนัก เช่น การคำนวณจำนวนมาก แนะนำให้ใช้ multiprocessing ในทางกลับกัน สำหรับงานที่เน้น I/O เช่น การอ่านไฟล์หรือการสื่อสารเครือข่าย threading จะทำงานได้อย่างมีประสิทธิภาพ

สรุป

การใช้โมดูล threading ของ Python คุณสามารถสร้าง โปรแกรมหลายเธรด และรันกระบวนการหลายตัวพร้อมกัน ด้วยกลไกการซิงโครไนซ์เช่น Lock และ Condition คุณสามารถเข้าถึงทรัพยากรที่ใช้ร่วมกันได้อย่างปลอดภัยและทำการซิงโครไนซ์ที่ซับซ้อน นอกจากนี้ การใช้เธรด Daemon หรือ ThreadPoolExecutor ทำให้การจัดการเธรดและการประมวลผลที่รวดเร็วมีประสิทธิภาพมากขึ้น

คุณสามารถติดตามบทความข้างต้นโดยใช้ Visual Studio Code บนช่อง YouTube ของเรา กรุณาตรวจสอบช่อง YouTube ด้วย

YouTube Video