อินพุต/เอาต์พุตแบบอะซิงโครนัส

อินพุต/เอาต์พุตแบบอะซิงโครนัส

บทความนี้อธิบายเกี่ยวกับอินพุต/เอาต์พุตแบบอะซิงโครนัส

คู่มือนี้จะอธิบายแนวคิดและรูปแบบของอินพุต/เอาต์พุตแบบอะซิงโครนัส ซึ่งเป็นโปรแกรมที่มีประโยชน์ในการใช้งาน Python อย่างเป็นขั้นตอน

YouTube Video

การรับส่งข้อมูลแบบอะซิงโครนัส (I/O)

แนวคิดของการรับส่งข้อมูลแบบอะซิงโครนัส (I/O)

การรับส่งข้อมูลแบบอะซิงโครนัส (I/O) เป็นกลไกที่ช่วยให้งานอื่น ๆ สามารถทำงานไปพร้อมกันขณะที่รอกระบวนการ I/O ที่ใช้เวลานาน เช่น การอ่านเขียนไฟล์หรือสื่อสารผ่านเครือข่าย ใน Python มี asyncio เป็นเฟรมเวิร์คมาตรฐานสำหรับการเขียนโปรแกรมแบบอะซิงโครนัส และไลบรารีหลายรายการก็ออกแบบมาให้ใช้กลไกนี้

พื้นฐาน: async / await และ Event Loop

อันดับแรก นี่คือการเขียน coroutine เบื้องต้นและตัวอย่างการรัน coroutine หลายตัวพร้อมกันโดยใช้ asyncio.gather

โค้ดด้านล่างเป็นตัวอย่างสั้น ๆ สำหรับการนิยามและรันฟังก์ชันแบบอะซิงโครนัสพร้อมกัน ฟังก์ชัน sleep ถูกใช้เพื่อสาธิตการทำงานแบบขนาน

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • โค้ดนี้เริ่ม event loop โดยใช้ asyncio.run() และรัน coroutine สามตัวพร้อมกัน

async with และ Asynchronous Context Manager

ในการประมวลผลแบบอะซิงโครนัส การจัดการทรัพยากรเช่นการเปิดปิดการเชื่อมต่อหรือไฟล์อาจซับซ้อนได้ง่าย ที่นี่เองที่ asynchronous context manager ซึ่งใช้งานผ่าน async with เข้ามาช่วยได้มาก ไวยากรณ์นี้คล้ายกับการใช้ with แบบซิงโครนัส แต่การประมวลผลภายในจะเป็นแบบอะซิงโครนัส จึงเข้ากับ flow ของ async/await ได้เป็นอย่างดี

มีเหตุผลหลักสองประการที่ควรใช้ async with:

  • เพื่อให้สามารถเคลียร์ทรัพยากรเช่น การเชื่อมต่อ, file handle หรือ session ได้อย่างแน่นอน คุณจะมั่นใจได้ว่าทรัพยากรจะถูกปล่อยคืนอย่างถูกต้องแม้เกิดข้อผิดพลาดระหว่างการทำงาน
  • เพื่อทำให้การเริ่มต้นและการปิด เช่น การเชื่อมต่อและการเขียนข้อมูลจบในลักษณะอัตโนมัติแบบอะซิงโครนัส สิ่งนี้ช่วยให้ไม่ต้องเขียนโค้ดการจัดการเหล่านี้เองและทำให้โค้ดดูเข้าใจง่ายขึ้น

ด้านล่างคือตัวอย่างการสร้าง asynchronous context manager แบบง่ายจากศูนย์

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • โดยการกำหนด __aenter__ และ __aexit__ จะสามารถใช้ async with ได้
  • กระบวนการขณะเข้า-ออก block async with จะถูกจัดการแบบอะซิงโครนัสและปลอดภัย

การรับส่งข้อมูลไฟล์แบบอะซิงโครนัส (aiofiles)

การทำงานกับไฟล์เป็นตัวอย่างของงานที่บล็อกโปรแกรมหลักแบบชัดเจน ด้วย aiofiles คุณสามารถจัดการไฟล์แบบอะซิงโครนัสได้อย่างปลอดภัย ภายในไลบรารีจะใช้ thread pool และปิดไฟล์อย่างถูกต้องผ่าน async with

ตัวอย่างต่อไปนี้แสดงวิธีอ่านไฟล์หลายไฟล์แบบอะซิงโครนัสขนานกัน คุณต้องติดตั้ง aiofiles ด้วย pip install aiofiles ก่อนจึงจะรันโค้ดนี้ได้

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • โค้ดนี้ทำให้การอ่านไฟล์แต่ละไฟล์เป็นไปแบบขนาน aiofiles มักใช้ thread pool ภายในเพื่อให้คุณจัดการ I/O ของไฟล์แบบขวางกั้นผ่านอินเทอร์เฟซแบบอะซิงโครนัส

ไคลเอนต์ HTTP แบบอะซิงโครนัส (aiohttp)

ตัวอย่างคลาสสิกของ network I/O คือการทำคำขอ HTTP แบบอะซิงโครนัส วิธีนี้มีประสิทธิภาพมากโดยเฉพาะเมื่อต้องส่ง HTTP request จำนวนมากพร้อมกัน

ด้านล่างเป็นตัวอย่างการดึงข้อมูลจาก URL หลายอันพร้อมกันโดยใช้ aiohttp คุณต้องติดตั้ง aiohttp ด้วยคำสั่ง pip install aiohttp

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • โดยการใช้ asyncio.as_completed คุณจะสามารถประมวลผลผลลัพธ์ตามลำดับที่แต่ละงานเสร็จสมบูรณ์ สิ่งนี้มีประโยชน์สำหรับการจัดการคำขอจำนวนมากอย่างมีประสิทธิภาพ

การทำงานร่วมกับ Blocking I/O: run_in_executor

เมื่อคุณต้องจัดการกับงานที่ใช้ CPU สูง หรือ API ที่บล็อกการทำงานในโค้ดอะซิงโครนัส ให้ใช้ ThreadPoolExecutor หรือ ProcessPoolExecutor ผ่าน loop.run_in_executor

โค้ดตัวอย่างต่อไปนี้เป็นการรันงานที่มี I/O แบบบล็อกแบบขนานกันโดยใช้ thread pool

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • โดยการใช้ run_in_executor คุณสามารถนำโค้ด synchronous ที่มีอยู่มาใช้ร่วมกับโค้ดแบบอะซิงโครนัสได้โดยไม่ต้องแก้ไขมาก อย่างไรก็ตาม คุณควรระวังจำนวน thread และภาระของ CPU
  • ProcessPoolExecutor เหมาะสำหรับงานที่ต้องใช้ CPU สูง

เซิร์ฟเวอร์แบบอะซิงโครนัส: TCP Echo Server ที่ใช้ asyncio

หากคุณต้องการควบคุม socket โดยตรง สามารถสร้างเซิร์ฟเวอร์แบบอะซิงโครนัสได้ง่าย ๆ ด้วย asyncio.start_server

ตัวอย่างต่อไปนี้เป็น echo server แบบง่ายที่ส่งข้อมูลกลับไปหา client ตามที่ได้รับมา

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • ในการสื่อสาร TCP ด้วย asyncio นั้น StreamReader และ StreamWriter มีบทบาทสำคัญในการรับส่งข้อมูลแบบอะซิงโครนัส StreamReader ใช้อ่านข้อมูลที่ถูกส่งจากไคลเอนต์แบบอะซิงโครนัส ส่วน StreamWriter ใช้สำหรับส่งข้อมูลตอบกลับจากเซิร์ฟเวอร์ไปยังไคลเอนต์

  • แม้จะไม่ต้องจัดการรายละเอียดการทำงานของ socket ด้วยตนเอง ก็สามารถเปิดเซิร์ฟเวอร์แบบอะซิงโครนัสได้อย่างง่ายและมีประสิทธิภาพโดยใช้ asyncio.start_server

  • เมื่อคุณส่งฟังก์ชัน handler ให้กับ asyncio.start_server ฟังก์ชันนั้นจะได้รับ reader และ writer เป็นอาร์กิวเมนต์ โดยการใช้สิ่งเหล่านี้ คุณสามารถพัฒนากระบวนการสื่อสารได้อย่างปลอดภัยและเข้าใจกว่าการจัดการกับ API ของ socket ระดับต่ำโดยตรง ตัวอย่างเช่น คุณสามารถรับข้อมูลด้วย reader.read() และใช้ writer.write() ร่วมกับ writer.drain() เพื่อส่งข้อมูลแบบอะซิงโครนัสและรับรองว่าการส่งเสร็จสมบูรณ์

  • การตั้งค่านี้เหมาะกับการจัดการการเชื่อมต่อจำนวนมากพร้อมกัน เช่นบริการ TCP ขนาดเล็กหรือโปรโตคอลที่ไม่ซับซ้อน

การจัดการข้อมูล streaming ขนาดใหญ่

เมื่อประมวลผลไฟล์ขนาดใหญ่หรือ response ที่มีขนาดมาก ควรอ่านและเขียนทีละส่วน (chunk) เพื่อลดการใช้หน่วยความจำ ตัวอย่างด้านล่างคือการอ่านข้อมูลแบบ streaming โดยใช้ aiohttp

โค้ดต่อไปนี้จะประมวลผล HTTP response ทีละ chunk และบันทึกลงดิสก์ทันทีเมื่อได้รับข้อมูล

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • โค้ดนี้จะไม่โหลดไฟล์ขนาดใหญ่ทั้งหมดในคราวเดียว แต่จะรับข้อมูลเป็นชิ้นเล็กๆ ทีละส่วน และเขียนลงไฟล์แบบอะซิงโครนัส ด้วยเหตุนี้จึงสามารถดาวน์โหลดไฟล์ได้อย่างรวดเร็วและมีประสิทธิภาพ ในขณะที่ใช้หน่วยความจำต่ำ aiohttp ใช้ดึงข้อมูลแบบอะซิงโครนัส และ aiofiles เขียนข้อมูลลงไฟล์โดยไม่ block ทำให้สามารถทำงานร่วมกับกระบวนการอื่นๆ ได้อย่างง่ายดาย

  • รูปแบบนี้เหมาะสำหรับการดาวน์โหลดและบันทึกไฟล์ขนาดใหญ่ให้มีประสิทธิภาพและใช้หน่วยความจำน้อยที่สุด

การรัน subprocess แบบอะซิงโครนัส

หากต้องการรันคำสั่งภายนอกแบบอะซิงโครนัสและอ่านข้อความที่ output ออกมาทันที สามารถใช้ asyncio.create_subprocess_exec ได้

ด้านล่างคือตัวอย่างการรันคำสั่งภายนอกและอ่านข้อความที่ส่งออกมาตามเวลาจริง

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • การควบคุม subprocess แบบอะซิงโครนัสช่วยให้คุณจัดการ log จากเครื่องมือภายนอกแบบเรียลไทม์หรือรัน process หลายตัวพร้อมกันได้

การจัดการการยกเลิกและการกำหนด timeout

งานแบบอะซิงโครนัสสามารถถูกยกเลิกได้ เวลาต้องกำหนด timeout ให้ใช้งาน asyncio.wait_for ได้อย่างง่ายดาย

ตัวอย่างต่อไปนี้เป็นการรันงานที่มี timeout กำกับ

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • wait_for จะขึ้นข้อผิดพลาด TimeoutError หากเกินเวลาที่กำหนดและจะยกเลิก task หากจำเป็น ควรระวังเรื่องการยกเลิกงานที่เกี่ยวเนื่องและการเคลียร์ข้อมูลขณะ task ถูกยกเลิก

ควบคุมความขนาน (Semaphore)

การเปิดการเชื่อมต่อหรือส่งคำขอพร้อมกันเป็นจำนวนมากอาจทำให้ทรัพยากรหมดได้ ควรจำกัดความขนานด้วย asyncio.Semaphore

ตัวอย่างต่อไปนี้คือการจำกัดการดาวน์โหลดพร้อมกันโดยใช้ semaphore

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • ด้วยวิธีนี้จะช่วยให้เข้าถึงบริการภายนอกอย่างระมัดระวังและป้องกันไม่ให้ระบบของคุณล้นภาระ

การจัดการข้อผิดพลาดและการพยายามซ้ำ (Retry)

ในการประมวลผลแบบอะซิงโครนัสก็ยังมีข้อผิดพลาดเกิดขึ้นได้เสมอ ควรดักจับ exception อย่างเหมาะสมและนำกลยุทธ์ retry เช่น exponential backoff มาใช้

ด้านล่างคือตัวอย่างการ retry ง่าย ๆ ไม่เกิน N ครั้ง

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • ตรรกะใน retry ที่เหมาะสมสำคัญต่อการรักษาความเสถียรและควบคุมปริมาณงาน

เคล็ดลับสำหรับการดีบักและ log

ในงานแบบอะซิงโครนัส แต่ละงานจะทำงานพร้อมกัน จึงควรตรวจสอบปัญหาให้ละเอียดเพราะอาจสืบค้นหาสาเหตุได้ยาก เพื่อแก้ไขปัญหาได้อย่างมีประสิทธิภาพ ควรคำนึงถึงจุดต่อไปนี้เพื่อช่วยในการดีบัก

  • ข้อผิดพลาดจาก asyncio.run() และ Task มักถูกมองข้าม ดังนั้นควรตั้งระบบ log สำหรับ exception ที่ไม่ได้รับการจัดการ
  • เมื่อใช้ logging หากใส่ชื่อ coroutine หรือ ใน Python 3.8 ขึ้นไป ใช้ task.get_name() ใน log จะช่วยให้ติดตามสถานะได้ง่ายขึ้น
  • คุณสามารถตรวจสอบสถานะปัจจุบันของ task ต่างๆ ได้ด้วย asyncio.Task.all_tasks() อย่างไรก็ดี API นี้ถูกออกแบบมาเพื่อจุดประสงค์สำหรับการดีบัก ควรใช้อย่างระมัดระวังในสภาพแวดล้อม production เพื่อหลีกเลี่ยงปัญหาด้านประสิทธิภาพหรือผลกระทบที่คาดไม่ถึง

ข้อควรระวังด้านประสิทธิภาพ

แม้การเขียนโปรแกรมแบบอะซิงโครนัสจะเหมาะกับงานที่ต้องรอ I/O แต่ถ้าใช้ไม่ถูกต้องอาจลดประสิทธิภาพของระบบได้ ควรเพิ่มประสิทธิภาพตามจุดต่อไปนี้:

  • การประมวลผลแบบอะซิงโครนัสเหมาะกับงานที่ต้องรอ I/O ไม่เหมาะกับงานที่ใช้ CPU มาก — ในกรณีนั้นควรใช้ process pool
  • เมื่อใช้ thread หรือ process pool ควรคำนึงถึงขนาดของ pool และลักษณะงาน
  • หากเปิด task จำนวนมากในครั้งเดียว overhead ของ event loop จะเพิ่มขึ้น — ควรใช้ batch หรือ semaphore เพื่อช่วยควบคุม

สรุป

ระบบ I/O แบบอะซิงโครนัสของ Python เป็นกลไกที่มีประสิทธิภาพสำหรับการใช้เวลารอ I/O และรันงานบนเครือข่ายหรือไฟล์ไปพร้อม ๆ กันอย่างมีประสิทธิภาพ โดยการผสมผสานเทคนิคต่าง ๆ เช่น asyncio, aiohttp, aiofiles และ run_in_executor จะช่วยให้คุณเขียนแอปพลิเคชันแบบอะซิงโครนัสที่ยืดหยุ่นและใช้ได้จริง การใช้ async with เพื่อบริหารจัดการการเริ่มต้นและการปล่อยทรัพยากรแบบอัตโนมัติจะช่วยจัดการ resource ต่าง ๆ เช่น ไฟล์, HTTP session, และ lock ได้อย่างปลอดภัยและเชื่อถือได้ ด้วยการผนวกการจัดการข้อผิดพลาดและดูแลความขนานอย่างเหมาะสม จะช่วยให้คุณรันโปรแกรมอะซิงโครนัสที่เชื่อถือได้สูงได้อย่างปลอดภัย

คุณสามารถติดตามบทความข้างต้นโดยใช้ Visual Studio Code บนช่อง YouTube ของเรา กรุณาตรวจสอบช่อง YouTube ด้วย

YouTube Video