โมดูล `concurrent` ในภาษา Python

โมดูล `concurrent` ในภาษา Python

ในบทความนี้ เราจะอธิบายเกี่ยวกับโมดูล concurrent ในภาษา Python

ขณะที่อธิบายความแตกต่างระหว่าง concurrency และ parallelism เราจะสอนวิธีการประมวลผลแบบอะซิงโครนัสด้วยโมดูล concurrent โดยยกตัวอย่างที่ใช้ได้จริง

YouTube Video

โมดูล concurrent ในภาษา Python

เมื่อเราต้องการเพิ่มความเร็วในการประมวลผลด้วย Python สิ่งสำคัญคือต้องเข้าใจความแตกต่างระหว่าง concurrency และ parallelism โมดูล concurrent เป็นวิธีสำคัญในการ จัดการประมวลผลแบบอะซิงโครนัสได้อย่างปลอดภัยและง่ายดายโดยคำนึงถึงความแตกต่างข้างต้น

ความแตกต่างระหว่าง Concurrency และ Parallelism

  • Concurrency หมายถึงการออกแบบให้หลายงานสามารถดำเนินการได้โดย สลับกันทำงานเป็นหน่วยย่อยๆ ถึงแม้งานเหล่านั้นจะไม่ได้ทำงานพร้อมกันจริงๆ แต่การใช้ "เวลารอคอย" ให้เกิดประโยชน์จะช่วยให้การประมวลผลโดยรวมมีประสิทธิภาพมากขึ้น

  • Parallelism คือกลไกที่สามารถ ประมวลผลงานหลายอย่างพร้อมกันจริงในเวลาเดียวกัน โดยการใช้ CPU หลายคอร์ งานต่างๆ จะถูกประมวลผลได้พร้อมกัน

ทั้งสองวิธีเป็นเทคนิคช่วยเพิ่มความเร็วการประมวลผล แต่ concurrency เป็นเรื่องของการออกแบบว่าควรดำเนินการอย่างไร ขณะที่ parallelism เป็นเรื่องของการประมวลผลจริงว่าทำงานพร้อมกันหรือไม่ ซึ่งทั้งสองแนวคิดต่างกันโดยพื้นฐาน

โมดูล concurrent คืออะไร?

concurrent เป็นไลบรารีมาตรฐานของ Python ที่ให้ API ระดับสูงสำหรับจัดการ concurrency และ parallelism อย่างปลอดภัยและไม่ซับซ้อน ถูกออกแบบมาเพื่อให้คุณ โฟกัสที่การรันงาน โดยไม่ต้องกังวลกับรายละเอียดระดับต่ำ เช่น การสร้างและจัดการ thread หรือ process

หน้าที่ของ ThreadPoolExecutor และ ProcessPoolExecutor

โมดูล concurrent มีสองทางเลือกหลักขึ้นอยู่กับลักษณะของงาน

  • ThreadPoolExecutor เหมาะกับการทำ concurrent โดยเฉพาะงานที่มีการรอ I/O มาก เช่น การติดต่อเครือข่าย หรืออ่าน/เขียนไฟล์ โดยการสลับงานต่างๆ ทำให้ใช้เวลารออย่างคุ้มค่า

  • ProcessPoolExecutor เหมาะสำหรับประมวลผลแบบขนาน (parallel processing) โดยเฉพาะงานที่ใช้ CPU หนัก ใช้หลาย process เพื่อใช้ศักยภาพของ CPU ทุกคอร์ในเวลาเดียวกันให้เต็มที่

ดังนั้น จุดเด่นหลักของ concurrent คือคุณสามารถ เลือกใช้ concurrency หรือ parallelism ได้อย่างเหมาะสมตามแต่ละงาน

พื้นฐานของ ThreadPoolExecutor (สำหรับงาน I/O)

ThreadPoolExecutor เหมาะกับงานที่ ขึ้นกับ I/O เช่น การติดต่อเครือข่าย หรืออ่าน/เขียนไฟล์ กระจายงานไปยังหลาย thread เพื่อใช้เวลารอให้คุ้มค่า

 1from concurrent.futures import ThreadPoolExecutor
 2import time
 3
 4def fetch_data(n):
 5    # Simulate an I/O-bound task
 6    time.sleep(1)
 7    return f"data-{n}"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    futures = [executor.submit(fetch_data, i) for i in range(5)]
11
12    for future in futures:
13        print(future.result())
  • ในตัวอย่างนี้ งาน I/O หลายรายการที่ต้องรอ 1 วินาทีถูกดำเนินการแบบ concurrent โดยใช้ submit ฟังก์ชันจะถูกลงทะเบียนเป็นงานอะซิงโครนัส และเมื่อเรียก result() จะรอจนจบงานและรับผลลัพธ์ ทำให้สามารถประมวลผล concurrent ได้อย่างกระชับและใช้เวลารออย่างมีประสิทธิภาพ

การประมวลผล concurrent อย่างง่ายด้วย map

ถ้าไม่ต้องควบคุมการทำงานที่ซับซ้อน การใช้ map จะช่วยให้โค้ดกระชับขึ้น

 1from concurrent.futures import ThreadPoolExecutor
 2import time
 3
 4def fetch_data(n):
 5    # Simulate an I/O-bound task
 6    time.sleep(1)
 7    return f"data-{n}"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    results = executor.map(fetch_data, range(5))
11
12    for result in results:
13        print(result)
  • ตัวอย่างนี้เป็นการประมวลผลงาน I/O หลายงานแบบ concurrent ด้วย ThreadPoolExecutor.map เนื่องจาก map คืนผลลัพธ์ตามลำดับเดียวกับอินพุต คุณจึงเขียนโค้ดให้ดูลำดับเหมือนแบบ sequential ได้ และยังได้ concurrent ในตัวโดยไม่ต้องจัดการอะซิงโครนัสเอง ซึ่งเป็นข้อดีสำคัญ

พื้นฐานของ ProcessPoolExecutor (สำหรับงานที่ใช้ CPU มาก)

ถ้างานคำนวณหนักและใช้ CPU เต็มกำลัง ควรใช้ process แทน thread การใช้วิธีนี้ช่วยหลีกเลี่ยงข้อจำกัด Global Interpreter Lock (GIL)

 1from concurrent.futures import ProcessPoolExecutor
 2
 3def heavy_calculation(n):
 4    # Simulate a CPU-bound task
 5    total = 0
 6    for i in range(10_000_000):
 7        total += i * n
 8    return total
 9
10if __name__ == "__main__":
11    with ProcessPoolExecutor(max_workers=4) as executor:
12        results = executor.map(heavy_calculation, range(4))
13
14        for result in results:
15            print(result)

ในตัวอย่างนี้จะประมวลผลงานที่ใช้ CPU หนักแบบขนานด้วย ProcessPoolExecutor เนื่องจากต้องสร้าง process จึงจำเป็นต้องใช้ __main__ guard ซึ่ง ช่วยให้ประมวลผลแบบขนาน (parallel) ได้อย่างปลอดภัยด้วยหลายคอร์

ประมวลผลตามลำดับเสร็จงานด้วย as_completed

as_completed มีประโยชน์เมื่อต้องการจัดการผลลัพธ์ตามลำดับที่งานเสร็จจริง

 1from concurrent.futures import ThreadPoolExecutor, as_completed
 2import time
 3
 4def fetch_data(n):
 5    # Simulate an I/O-bound task
 6    time.sleep(1)
 7    return f"data-{n}"
 8
 9with ThreadPoolExecutor(max_workers=3) as executor:
10    futures = [executor.submit(fetch_data, i) for i in range(5)]
11
12    for future in as_completed(futures):
13        print(future.result())
  • ในตัวอย่างนี้ งานอะซิงโครนัสหลายๆ งานถูกรันพร้อมกัน และ ดึงผลลัพธ์เรียงตามลำดับที่แต่ละงานเสร็จสิ้น การใช้ as_completed ช่วยให้จัดการผลลัพธ์ได้รวดเร็วโดยไม่ต้องรอลำดับ รวดเร็ว เหมาะกับการแสดงสถานะความคืบหน้าหรืองานที่ต้องการจัดการทีละขั้น

การจัดการข้อผิดพลาด (Exception)

ใน concurrent ข้อผิดพลาด (exception) จะถูกโยนออกมาขณะเรียก result()

 1from concurrent.futures import ThreadPoolExecutor
 2
 3def risky_task(n):
 4    # Simulate a task that may fail for a specific input
 5    if n == 3:
 6        raise ValueError("Something went wrong")
 7    return n * 2
 8
 9with ThreadPoolExecutor() as executor:
10    futures = [executor.submit(risky_task, i) for i in range(5)]
11
12    for future in futures:
13        try:
14            print(future.result())
15        except Exception as e:
16            print("Error:", e)
  • ตัวอย่างนี้แสดงให้เห็นว่า แม้จะมีบางงานเกิดข้อผิดพลาด แต่งานอื่นก็ยังคงทำงานต่อ และคุณสามารถจัดการ exception แยกเป็นรายงานได้ในขณะที่ดึงผลลัพธ์ ด้วยการใช้ Future ของ concurrent คุณสามารถจัดการทั้งกรณีสำเร็จและล้มเหลวของงานอะซิงโครนัสได้อย่างปลอดภัย

แนวทางการเลือกใช้งานระหว่าง Thread และ Process

เพื่อใช้ concurrency และ parallelism อย่างได้ผล ควรเลือกวิธีที่เหมาะสมกับลักษณะของงาน

ในการใช้งานจริง เกณฑ์ต่อไปนี้จะช่วยให้ตัดสินใจได้

  • สำหรับโปรเซสที่มีการรอ I/O จำนวนมาก เช่น การสื่อสารหรือการทำงานกับไฟล์ ให้ใช้ ThreadPoolExecutor
  • ถ้างานต้องใช้ CPU หนักมาก ให้ใช้ ProcessPoolExecutor
  • ถ้ามีงานย่อยจำนวนมากและไม่ซับซ้อน การใช้ map จะช่วยให้โค้ดกระชับขึ้น
  • หากต้องควบคุมลำดับหรือต้องจัดการ exception อย่างละเอียด ให้ใช้ submit ร่วมกับ as_completed

ข้อดีของการใช้ concurrent

โดยใช้โมดูล concurrent คุณสามารถจัดการงานอะซิงโครนัส ได้อย่างปลอดภัยและเข้าใจง่าย

ข้อดีหลักๆ มีดังนี้:

  • ไม่ต้องยุ่งยากกับการจัดการ thread หรือ process ระดับต่ำ
  • เนื่องจากเป็นไลบรารีมาตรฐานใน Python จึงใช้ได้อย่างมั่นใจ
  • โค้ดจะอ่านง่ายและบำรุงรักษาได้สะดวกขึ้น
  • เหมาะอย่างยิ่งสำหรับเป็นจุดเริ่มต้นศึกษาการประมวลผล concurrent และ parallel

แค่จำแนวทางเหล่านี้ ก็ลดโอกาสผิดพลาดเมื่อนำ concurrent ไปใช้ได้มาก

สรุป

โมดูล concurrent คือ ตัวเลือกมาตรฐานสำหรับการใช้งานจริงที่ต้องการ concurrency และ parallelism ใน Python ช่วยให้ปรับปรุงประสิทธิภาพโดยไม่ต้องเปลี่ยนแปลงโค้ดเดิมมาก ซึ่งเป็นข้อดีอย่างมากในการใช้งานจริง ด้วยการใช้ concurrent คุณสามารถ ประมวลผลอะซิงโครนัสได้อย่างกระชับ พร้อมจัดการข้อผิดพลาดและควบคุมการทำงานได้อย่างปลอดภัย

คุณสามารถติดตามบทความข้างต้นโดยใช้ Visual Studio Code บนช่อง YouTube ของเรา กรุณาตรวจสอบช่อง YouTube ด้วย

YouTube Video