โมดูล `threading` ใน Python
บทความนี้อธิบายเกี่ยวกับโมดูล threading ใน Python
YouTube Video
โมดูล threading ใน Python
โมดูล threading ใน Python เป็นไลบรารีมาตรฐานที่สนับสนุนการเขียนโปรแกรมหลายเธรด การใช้เธรดช่วยให้กระบวนการหลายตัวทำงานพร้อมกันได้ ซึ่งสามารถเพิ่มประสิทธิภาพของโปรแกรม โดยเฉพาะในกรณีที่เกี่ยวข้องกับการรอ I/O เนื่องจาก Global Interpreter Lock (GIL) ของไพธอน ประสิทธิภาพของมัลติเธรดจึงมีข้อจำกัดเมื่อใช้กับงานที่ใช้ CPU เป็นหลัก แต่เหมาะสมกับงานที่เน้น I/O
ส่วนต่อไปนี้อธิบายพื้นฐานการใช้งานโมดูล threading และการควบคุมเธรด
การใช้งานพื้นฐานของเธรด
การสร้างและการเรียกใช้งานเธรด
ในการสร้างเธรดและดำเนินการแบบขนาน ให้ใช้คลาส threading.Thread กำหนดฟังก์ชั่นเป้าหมายเพื่อสร้างเธรดและเรียกใช้งานเธรดนั้น
1import threading
2import time
3
4# Function to be executed in a thread
5def worker():
6 print("Worker thread started")
7 time.sleep(2)
8 print("Worker thread finished")
9
10# Create and start the thread
11thread = threading.Thread(target=worker)
12thread.start()
13
14# Processing in the main thread
15print("Main thread continues to run")
16
17# Wait for the thread to finish
18thread.join()
19print("Main thread finished")- ในตัวอย่างนี้ ฟังก์ชัน
workerจะถูกเรียกใช้งานในเธรดแยก ขณะที่เธรดหลักยังคงทำงานต่อไป โดยการเรียกใช้เมธอดjoin()เธรดหลักจะรอให้เธรดย่อยทำงานเสร็จสมบูรณ์
การตั้งชื่อเธรด
การตั้งชื่อที่มีความหมายให้เธรดจะช่วยให้การบันทึกข้อมูลและการดีบักง่ายขึ้น คุณสามารถระบุชื่อได้โดยใช้อาร์กิวเมนต์ name
1import threading
2import time
3
4# Function to be executed in a thread
5def worker():
6 print("Worker thread started")
7 time.sleep(2)
8 print("Worker thread finished")
9
10t = threading.Thread(
11 target=worker,
12 args=("named-worker", 0.3),
13 name="MyWorkerThread"
14)
15
16t.start()
17
18print("Active threads:", threading.active_count())
19for th in threading.enumerate():
20 print(" -", th.name)
21
22t.join()threading.enumerate()จะส่งคืนรายชื่อเธรดที่กำลังทำงานอยู่ ซึ่งมีประโยชน์ในการดีบักและติดตามสถานะ
การสืบทอดคลาสเธรด
หากคุณต้องการปรับแต่งคลาสที่ใช้ในการเรียกใช้งานเธรด คุณสามารถกำหนดคลาสใหม่โดยสืบทอดจากคลาส threading.Thread
1import threading
2import time
3
4# Inherit from the Thread class
5class WorkerThread(threading.Thread):
6 def __init__(self, name, delay, repeat=3):
7 super().__init__(name=name)
8 self.delay = delay
9 self.repeat = repeat
10 self.results = []
11
12 def run(self):
13 for i in range(self.repeat):
14 msg = f"{self.name} step {i+1}"
15 print(msg)
16 self.results.append(msg)
17 time.sleep(self.delay)
18
19# Create and start the threads
20t1 = WorkerThread("Worker-A", delay=0.4, repeat=3)
21t2 = WorkerThread("Worker-B", delay=0.2, repeat=5)
22
23t1.start()
24t2.start()
25
26t1.join()
27t2.join()
28
29print("Results A:", t1.results)
30print("Results B:", t2.results)- ในตัวอย่างนี้ เมทอด
run()ถูกเขียนทับเพื่อกำหนดพฤติกรรมของเธรด ทำให้แต่ละเธรดสามารถจัดการข้อมูลของตัวเองได้ สิ่งนี้มีประโยชน์เมื่อเธรดต้องประมวลผลที่ซับซ้อนหรือเมื่อคุณต้องการให้แต่ละเธรดมีข้อมูลอิสระของตัวเอง
การซิงโครไนซ์ระหว่างเธรด
เมื่อหลายเธรดเข้าถึงทรัพยากรร่วมกันพร้อมกัน อาจเกิดปัญหา data race ได้ เพื่อป้องกันปัญหานี้ โมดูล threading มีการซิงโครไนซ์หลายรูปแบบให้ใช้งาน
ล็อค (Lock)
ออบเจ็กต์ Lock ใช้เพื่อควบคุมการเข้าถึงทรัพยากรร่วมกันระหว่างเธรดแบบเฉพาะตัว เมื่อเธรดหนึ่งล็อคทรัพยากร เธรดอื่นจะไม่สามารถเข้าถึงทรัพยากรนั้นได้
1import threading
2
3lock = threading.Lock()
4shared_resource = 0
5
6def worker():
7 global shared_resource
8 with lock: # Acquire the lock
9 local_copy = shared_resource
10 local_copy += 1
11 shared_resource = local_copy
12
13threads = [threading.Thread(target=worker) for _ in range(5)]
14
15for t in threads:
16 t.start()
17
18for t in threads:
19 t.join()
20
21print(f"Final value of shared resource: {shared_resource}")- ในตัวอย่างนี้ มีเธรด 5 ตัวที่เข้าถึงทรัพยากรร่วมกัน แต่ใช้
Lockเพื่อป้องกันไม่ให้เธรดหลายตัวแก้ไขข้อมูลในเวลาเดียวกัน
ล็อกซ้ำได้ (RLock)
หากเธรดเดียวกันต้องเข้าถึงล็อกหลายครั้ง ให้ใช้ RLock (รีเอนแทรนต์ล็อก) สิ่งนี้มีประโยชน์สำหรับการเรียกแบบเวียนซ้ำ (recursive) หรือเรียกไลบรารีที่อาจใช้งานล็อกในการเรียกหลายครั้ง
1import threading
2
3rlock = threading.RLock()
4shared = []
5
6def outer():
7 with rlock:
8 shared.append("outer")
9 inner()
10
11def inner():
12 with rlock:
13 shared.append("inner")
14
15t = threading.Thread(target=outer)
16t.start()
17t.join()
18print(shared)- ด้วย
RLockเธรดเดียวกันสามารถขอล็อกที่มันถืออยู่ซ้ำได้ ซึ่งช่วยป้องกันปัญหาเดดล็อกจากการล็อกซ้อนกัน
เงื่อนไข (Condition)
Condition ถูกใช้เพื่อให้เธรดรอจนกว่าเงื่อนไขเฉพาะจะถูกตอบสนอง เมื่อเธรดหนึ่งตรงตามเงื่อนไข คุณสามารถเรียก notify() เพื่อแจ้งเตือนเธรดอื่น หรือ notify_all() เพื่อแจ้งเตือนเธรดทั้งหมดที่กำลังรออยู่
ด้านล่างเป็นตัวอย่างของโปรดิวเซอร์และคอนซูเมอร์ที่ใช้ Condition
1import threading
2
3condition = threading.Condition()
4shared_data = []
5
6def producer():
7 with condition:
8 shared_data.append(1)
9 print("Produced an item")
10 condition.notify() # Notify the consumer
11
12def consumer():
13 with condition:
14 condition.wait() # Wait until the condition is met
15
16 item = shared_data.pop(0)
17 print(f"Consumed an item: {item}")
18
19# Create the threads
20producer_thread = threading.Thread(target=producer)
21consumer_thread = threading.Thread(target=consumer)
22
23consumer_thread.start()
24producer_thread.start()
25
26producer_thread.join()
27consumer_thread.join()- โค้ดนี้ใช้
Conditionเพื่อให้โปรดิวเซอร์แจ้งเตือนเมื่อมีการเพิ่มข้อมูล และคอนซูเมอร์จะรอการแจ้งเตือนนั้นก่อนที่จะดึงข้อมูล เพื่อให้เกิดการประสานกัน
การทำให้เธรดเป็น Daemon
เธรด Daemon จะถูกหยุดโดยบังคับเมื่อเธรดหลักสิ้นสุดลง ในขณะที่เธรดปกติต้องรอเพื่อสิ้นสุด เธรด Daemon จะหยุดทำงานโดยอัตโนมัติ
1import threading
2import time
3
4def worker():
5 while True:
6 print("Working...")
7 time.sleep(1)
8
9# Create a daemon thread
10thread = threading.Thread(target=worker)
11thread.daemon = True # Set as a daemon thread
12
13thread.start()
14
15# Processing in the main thread
16time.sleep(3)
17print("Main thread finished")- ในตัวอย่างนี้ เธรด
workerถูกตั้งให้เป็น Daemon ดังนั้นมันจะหยุดทำงานโดยบังคับเมื่อเธรดหลักสิ้นสุดลง
การจัดการเธรดด้วย ThreadPoolExecutor
นอกจากโมดูล threading คุณยังสามารถใช้ ThreadPoolExecutor จากโมดูล concurrent.futures เพื่อจัดการพูลเธรดและดำเนินงานแบบขนาน
1from concurrent.futures import ThreadPoolExecutor
2import time
3
4def worker(seconds):
5 print(f"Sleeping for {seconds} second(s)")
6 time.sleep(seconds)
7 return f"Finished sleeping for {seconds} second(s)"
8
9with ThreadPoolExecutor(max_workers=3) as executor:
10 futures = [executor.submit(worker, i) for i in range(1, 4)]
11 for future in futures:
12 print(future.result())ThreadPoolExecutorสร้างพูลเธรดและประมวลผลงานอย่างมีประสิทธิภาพ ระบุจำนวนเธรดที่ต้องการให้ทำงานพร้อมกันด้วยmax_workers
การสื่อสารเหตุการณ์ระหว่างเธรด
การใช้ threading.Event คุณสามารถตั้งค่าสถานะระหว่างเธรดเพื่อแจ้งให้เธรดอื่น ๆ ทราบเกี่ยวกับการเกิดเหตุการณ์
1import threading
2import time
3
4event = threading.Event()
5
6def worker():
7 print("Waiting for event to be set")
8 event.wait() # Wait until the event is set
9
10 print("Event received, continuing work")
11
12thread = threading.Thread(target=worker)
13thread.start()
14
15time.sleep(2)
16print("Setting the event")
17event.set() # Set the event and notify the thread- โค้ดนี้แสดงกลไกที่เธรดงานจะรอสัญญาณ
Eventและจะดำเนินการต่อเมื่อเธรดหลักเรียกevent.set()
การจัดการข้อยกเว้นและการหยุดเธรด
เมื่อเกิดข้อยกเว้นในเธรด ข้อยกเว้นจะไม่ถูกส่งต่อไปยังเธรดหลักโดยตรง จึงจำเป็นต้องมีรูปแบบเพื่อจับและแบ่งปันข้อยกเว้น
1import threading
2import queue
3
4def worker(err_q):
5 try:
6 raise ValueError("Something bad")
7 except Exception as e:
8 err_q.put(e)
9
10q = queue.Queue()
11t = threading.Thread(target=worker, args=(q,))
12t.start()
13t.join()
14if not q.empty():
15 exc = q.get()
16 print("Worker raised:", exc)- โดยการใส่ข้อยกเว้นลงใน
Queueและดึงมันมาในเธรดหลัก คุณสามารถตรวจจับข้อผิดพลาดได้อย่างแม่นยำ หากคุณใช้concurrent.futures.ThreadPoolExecutorข้อยกเว้นจะถูกโยนซ้ำเมื่อเรียกfuture.result()จึงทำให้จัดการได้ง่ายขึ้น
GIL (Global Interpreter Lock) และผลกระทบ
เนื่องจากกลไกของ GIL (Global Interpreter Lock) ใน CPython ไบต์โค้ดของ Python หลายตัวจึงไม่สามารถทำงานพร้อมกันในโปรเซสเดียวกันได้จริง สำหรับงานที่ใช้ CPU หนัก เช่น การคำนวณจำนวนมาก แนะนำให้ใช้ multiprocessing ในทางกลับกัน สำหรับงานที่เน้น I/O เช่น การอ่านไฟล์หรือการสื่อสารเครือข่าย threading จะทำงานได้อย่างมีประสิทธิภาพ
สรุป
การใช้โมดูล threading ของ Python คุณสามารถสร้าง โปรแกรมหลายเธรด และรันกระบวนการหลายตัวพร้อมกัน ด้วยกลไกการซิงโครไนซ์เช่น Lock และ Condition คุณสามารถเข้าถึงทรัพยากรที่ใช้ร่วมกันได้อย่างปลอดภัยและทำการซิงโครไนซ์ที่ซับซ้อน นอกจากนี้ การใช้เธรด Daemon หรือ ThreadPoolExecutor ทำให้การจัดการเธรดและการประมวลผลที่รวดเร็วมีประสิทธิภาพมากขึ้น
คุณสามารถติดตามบทความข้างต้นโดยใช้ Visual Studio Code บนช่อง YouTube ของเรา กรุณาตรวจสอบช่อง YouTube ด้วย