อินพุต/เอาต์พุตแบบอะซิงโครนัส
บทความนี้อธิบายเกี่ยวกับอินพุต/เอาต์พุตแบบอะซิงโครนัส
คู่มือนี้จะอธิบายแนวคิดและรูปแบบของอินพุต/เอาต์พุตแบบอะซิงโครนัส ซึ่งเป็นโปรแกรมที่มีประโยชน์ในการใช้งาน Python อย่างเป็นขั้นตอน
YouTube Video
การรับส่งข้อมูลแบบอะซิงโครนัส (I/O)
แนวคิดของการรับส่งข้อมูลแบบอะซิงโครนัส (I/O)
การรับส่งข้อมูลแบบอะซิงโครนัส (I/O) เป็นกลไกที่ช่วยให้งานอื่น ๆ สามารถทำงานไปพร้อมกันขณะที่รอกระบวนการ I/O ที่ใช้เวลานาน เช่น การอ่านเขียนไฟล์หรือสื่อสารผ่านเครือข่าย ใน Python มี asyncio เป็นเฟรมเวิร์คมาตรฐานสำหรับการเขียนโปรแกรมแบบอะซิงโครนัส และไลบรารีหลายรายการก็ออกแบบมาให้ใช้กลไกนี้
พื้นฐาน: async / await และ Event Loop
อันดับแรก นี่คือการเขียน coroutine เบื้องต้นและตัวอย่างการรัน coroutine หลายตัวพร้อมกันโดยใช้ asyncio.gather
โค้ดด้านล่างเป็นตัวอย่างสั้น ๆ สำหรับการนิยามและรันฟังก์ชันแบบอะซิงโครนัสพร้อมกัน ฟังก์ชัน sleep ถูกใช้เพื่อสาธิตการทำงานแบบขนาน
1import asyncio
2
3async def worker(name: str, delay: float):
4 # Simulate I/O-bound work by sleeping
5 print(f"{name} started")
6 await asyncio.sleep(delay)
7 print(f"{name} finished after {delay}s")
8 return name, delay
9
10async def main():
11 # Run multiple coroutines concurrently
12 tasks = [
13 worker("A", 1.5),
14 worker("B", 1.0),
15 worker("C", 0.5),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- โค้ดนี้เริ่ม event loop โดยใช้
asyncio.run()และรัน coroutine สามตัวพร้อมกัน
async with และ Asynchronous Context Manager
ในการประมวลผลแบบอะซิงโครนัส การจัดการทรัพยากรเช่นการเปิดปิดการเชื่อมต่อหรือไฟล์อาจซับซ้อนได้ง่าย ที่นี่เองที่ asynchronous context manager ซึ่งใช้งานผ่าน async with เข้ามาช่วยได้มาก ไวยากรณ์นี้คล้ายกับการใช้ with แบบซิงโครนัส แต่การประมวลผลภายในจะเป็นแบบอะซิงโครนัส จึงเข้ากับ flow ของ async/await ได้เป็นอย่างดี
มีเหตุผลหลักสองประการที่ควรใช้ async with:
- เพื่อให้สามารถเคลียร์ทรัพยากรเช่น การเชื่อมต่อ, file handle หรือ session ได้อย่างแน่นอน คุณจะมั่นใจได้ว่าทรัพยากรจะถูกปล่อยคืนอย่างถูกต้องแม้เกิดข้อผิดพลาดระหว่างการทำงาน
- เพื่อทำให้การเริ่มต้นและการปิด เช่น การเชื่อมต่อและการเขียนข้อมูลจบในลักษณะอัตโนมัติแบบอะซิงโครนัส สิ่งนี้ช่วยให้ไม่ต้องเขียนโค้ดการจัดการเหล่านี้เองและทำให้โค้ดดูเข้าใจง่ายขึ้น
ด้านล่างคือตัวอย่างการสร้าง asynchronous context manager แบบง่ายจากศูนย์
1import asyncio
2
3class AsyncResource:
4 async def __aenter__(self):
5 print("Opening resource...")
6 await asyncio.sleep(0.5)
7 print("Resource opened")
8 return self
9
10 async def __aexit__(self, exc_type, exc, tb):
11 print("Closing resource...")
12 await asyncio.sleep(0.5)
13 print("Resource closed")
14
15async def main():
16 async with AsyncResource() as r:
17 print("Using resource...")
18
19if __name__ == "__main__":
20 asyncio.run(main())- โดยการกำหนด
__aenter__และ__aexit__จะสามารถใช้async withได้ - กระบวนการขณะเข้า-ออก block
async withจะถูกจัดการแบบอะซิงโครนัสและปลอดภัย
การรับส่งข้อมูลไฟล์แบบอะซิงโครนัส (aiofiles)
การทำงานกับไฟล์เป็นตัวอย่างของงานที่บล็อกโปรแกรมหลักแบบชัดเจน ด้วย aiofiles คุณสามารถจัดการไฟล์แบบอะซิงโครนัสได้อย่างปลอดภัย ภายในไลบรารีจะใช้ thread pool และปิดไฟล์อย่างถูกต้องผ่าน async with
ตัวอย่างต่อไปนี้แสดงวิธีอ่านไฟล์หลายไฟล์แบบอะซิงโครนัสขนานกัน คุณต้องติดตั้ง aiofiles ด้วย pip install aiofiles ก่อนจึงจะรันโค้ดนี้ได้
1# pip install aiofiles
2import asyncio
3import aiofiles
4from pathlib import Path
5
6async def read_file(path: Path):
7 # Read file content asynchronously
8 async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
9 contents = await f.read()
10 return path.name, len(contents)
11
12async def main():
13 files = [
14 Path("example1.txt"),
15 Path("example2.txt"),
16 Path("example3.txt")
17 ]
18 tasks = [read_file(p) for p in files]
19 results = await asyncio.gather(*tasks)
20 for name, size in results:
21 print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24 asyncio.run(main())- โค้ดนี้ทำให้การอ่านไฟล์แต่ละไฟล์เป็นไปแบบขนาน
aiofilesมักใช้ thread pool ภายในเพื่อให้คุณจัดการ I/O ของไฟล์แบบขวางกั้นผ่านอินเทอร์เฟซแบบอะซิงโครนัส
ไคลเอนต์ HTTP แบบอะซิงโครนัส (aiohttp)
ตัวอย่างคลาสสิกของ network I/O คือการทำคำขอ HTTP แบบอะซิงโครนัส วิธีนี้มีประสิทธิภาพมากโดยเฉพาะเมื่อต้องส่ง HTTP request จำนวนมากพร้อมกัน
ด้านล่างเป็นตัวอย่างการดึงข้อมูลจาก URL หลายอันพร้อมกันโดยใช้ aiohttp คุณต้องติดตั้ง aiohttp ด้วยคำสั่ง pip install aiohttp
1# pip install aiohttp
2import asyncio
3import aiohttp
4
5async def fetch(session: aiohttp.ClientSession, url: str):
6 # Fetch a URL asynchronously and return status and size
7 async with session.get(url) as resp:
8 text = await resp.text()
9 return url, resp.status, len(text)
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [fetch(session, u) for u in urls]
14 for coro in asyncio.as_completed(tasks):
15 url, status, size = await coro
16 print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19 urls = [
20 "https://codesparklab.com/json/example1.json",
21 "https://codesparklab.com/json/example2.json",
22 "https://codesparklab.com/json/example3.json",
23 ]
24 asyncio.run(main(urls))- โดยการใช้
asyncio.as_completedคุณจะสามารถประมวลผลผลลัพธ์ตามลำดับที่แต่ละงานเสร็จสมบูรณ์ สิ่งนี้มีประโยชน์สำหรับการจัดการคำขอจำนวนมากอย่างมีประสิทธิภาพ
การทำงานร่วมกับ Blocking I/O: run_in_executor
เมื่อคุณต้องจัดการกับงานที่ใช้ CPU สูง หรือ API ที่บล็อกการทำงานในโค้ดอะซิงโครนัส ให้ใช้ ThreadPoolExecutor หรือ ProcessPoolExecutor ผ่าน loop.run_in_executor
โค้ดตัวอย่างต่อไปนี้เป็นการรันงานที่มี I/O แบบบล็อกแบบขนานกันโดยใช้ thread pool
1import asyncio
2from concurrent.futures import ThreadPoolExecutor
3import time
4
5def blocking_io(n):
6 # Simulate a blocking I/O or CPU-bound function
7 time.sleep(n)
8 return f"slept {n}s"
9
10async def main():
11 loop = asyncio.get_running_loop()
12 with ThreadPoolExecutor() as pool:
13 tasks = [
14 loop.run_in_executor(pool, blocking_io, 1),
15 loop.run_in_executor(pool, blocking_io, 2),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Blocking results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- โดยการใช้
run_in_executorคุณสามารถนำโค้ด synchronous ที่มีอยู่มาใช้ร่วมกับโค้ดแบบอะซิงโครนัสได้โดยไม่ต้องแก้ไขมาก อย่างไรก็ตาม คุณควรระวังจำนวน thread และภาระของ CPU ProcessPoolExecutorเหมาะสำหรับงานที่ต้องใช้ CPU สูง
เซิร์ฟเวอร์แบบอะซิงโครนัส: TCP Echo Server ที่ใช้ asyncio
หากคุณต้องการควบคุม socket โดยตรง สามารถสร้างเซิร์ฟเวอร์แบบอะซิงโครนัสได้ง่าย ๆ ด้วย asyncio.start_server
ตัวอย่างต่อไปนี้เป็น echo server แบบง่ายที่ส่งข้อมูลกลับไปหา client ตามที่ได้รับมา
1import asyncio
2
3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
4 # Handle a single client: read data and echo it back
5 addr = writer.get_extra_info('peername')
6 print(f"Connection from {addr}")
7 while True:
8 data = await reader.read(1024)
9 if not data:
10 break
11 writer.write(data) # echo back
12 await writer.drain()
13 writer.close()
14 await writer.wait_closed()
15 print(f"Connection closed {addr}")
16
17async def main():
18 server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19 addr = server.sockets[0].getsockname()
20 print(f"Serving on {addr}")
21 async with server:
22 await server.serve_forever()
23
24if __name__ == "__main__":
25 asyncio.run(main())-
ในการสื่อสาร TCP ด้วย
asyncioนั้นStreamReaderและStreamWriterมีบทบาทสำคัญในการรับส่งข้อมูลแบบอะซิงโครนัสStreamReaderใช้อ่านข้อมูลที่ถูกส่งจากไคลเอนต์แบบอะซิงโครนัส ส่วนStreamWriterใช้สำหรับส่งข้อมูลตอบกลับจากเซิร์ฟเวอร์ไปยังไคลเอนต์ -
แม้จะไม่ต้องจัดการรายละเอียดการทำงานของ socket ด้วยตนเอง ก็สามารถเปิดเซิร์ฟเวอร์แบบอะซิงโครนัสได้อย่างง่ายและมีประสิทธิภาพโดยใช้
asyncio.start_server -
เมื่อคุณส่งฟังก์ชัน handler ให้กับ
asyncio.start_serverฟังก์ชันนั้นจะได้รับreaderและwriterเป็นอาร์กิวเมนต์ โดยการใช้สิ่งเหล่านี้ คุณสามารถพัฒนากระบวนการสื่อสารได้อย่างปลอดภัยและเข้าใจกว่าการจัดการกับ API ของ socket ระดับต่ำโดยตรง ตัวอย่างเช่น คุณสามารถรับข้อมูลด้วยreader.read()และใช้writer.write()ร่วมกับwriter.drain()เพื่อส่งข้อมูลแบบอะซิงโครนัสและรับรองว่าการส่งเสร็จสมบูรณ์ -
การตั้งค่านี้เหมาะกับการจัดการการเชื่อมต่อจำนวนมากพร้อมกัน เช่นบริการ TCP ขนาดเล็กหรือโปรโตคอลที่ไม่ซับซ้อน
การจัดการข้อมูล streaming ขนาดใหญ่
เมื่อประมวลผลไฟล์ขนาดใหญ่หรือ response ที่มีขนาดมาก ควรอ่านและเขียนทีละส่วน (chunk) เพื่อลดการใช้หน่วยความจำ ตัวอย่างด้านล่างคือการอ่านข้อมูลแบบ streaming โดยใช้ aiohttp
โค้ดต่อไปนี้จะประมวลผล HTTP response ทีละ chunk และบันทึกลงดิสก์ทันทีเมื่อได้รับข้อมูล
1import aiohttp
2import asyncio
3import aiofiles
4
5async def stream_download(url: str, dest: str):
6 # Stream download and write to file in chunks
7 async with aiohttp.ClientSession() as session:
8 async with session.get(url) as resp:
9 async with aiofiles.open(dest, 'wb') as f:
10 async for chunk in resp.content.iter_chunked(1024 * 64):
11 await f.write(chunk)
12
13if __name__ == "__main__":
14 asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))-
โค้ดนี้จะไม่โหลดไฟล์ขนาดใหญ่ทั้งหมดในคราวเดียว แต่จะรับข้อมูลเป็นชิ้นเล็กๆ ทีละส่วน และเขียนลงไฟล์แบบอะซิงโครนัส ด้วยเหตุนี้จึงสามารถดาวน์โหลดไฟล์ได้อย่างรวดเร็วและมีประสิทธิภาพ ในขณะที่ใช้หน่วยความจำต่ำ
aiohttpใช้ดึงข้อมูลแบบอะซิงโครนัส และaiofilesเขียนข้อมูลลงไฟล์โดยไม่ block ทำให้สามารถทำงานร่วมกับกระบวนการอื่นๆ ได้อย่างง่ายดาย -
รูปแบบนี้เหมาะสำหรับการดาวน์โหลดและบันทึกไฟล์ขนาดใหญ่ให้มีประสิทธิภาพและใช้หน่วยความจำน้อยที่สุด
การรัน subprocess แบบอะซิงโครนัส
หากต้องการรันคำสั่งภายนอกแบบอะซิงโครนัสและอ่านข้อความที่ output ออกมาทันที สามารถใช้ asyncio.create_subprocess_exec ได้
ด้านล่างคือตัวอย่างการรันคำสั่งภายนอกและอ่านข้อความที่ส่งออกมาตามเวลาจริง
1import asyncio
2
3async def run_cmd(cmd):
4 # Run external command asynchronously and capture output line by line
5 proc = await asyncio.create_subprocess_exec(
6 *cmd,
7 stdout=asyncio.subprocess.PIPE,
8 stderr=asyncio.subprocess.PIPE
9 )
10
11 async def read_stream(stream, name):
12 while True:
13 line = await stream.readline()
14 if not line:
15 break
16 print(f"[{name}] {line.decode().rstrip()}")
17
18 await asyncio.gather(
19 read_stream(proc.stdout, "stdout"),
20 read_stream(proc.stderr, "stderr"),
21 )
22 await proc.wait()
23 return proc.returncode
24
25if __name__ == "__main__":
26 asyncio.run(run_cmd(["python", "--version"]))- การควบคุม subprocess แบบอะซิงโครนัสช่วยให้คุณจัดการ log จากเครื่องมือภายนอกแบบเรียลไทม์หรือรัน process หลายตัวพร้อมกันได้
การจัดการการยกเลิกและการกำหนด timeout
งานแบบอะซิงโครนัสสามารถถูกยกเลิกได้ เวลาต้องกำหนด timeout ให้ใช้งาน asyncio.wait_for ได้อย่างง่ายดาย
ตัวอย่างต่อไปนี้เป็นการรันงานที่มี timeout กำกับ
1import asyncio
2
3async def slow_task():
4 await asyncio.sleep(5)
5 return "done"
6
7async def main():
8 try:
9 result = await asyncio.wait_for(slow_task(), timeout=2.0)
10 print("Result:", result)
11 except asyncio.TimeoutError:
12 print("Task timed out")
13
14if __name__ == "__main__":
15 asyncio.run(main())wait_forจะขึ้นข้อผิดพลาดTimeoutErrorหากเกินเวลาที่กำหนดและจะยกเลิก task หากจำเป็น ควรระวังเรื่องการยกเลิกงานที่เกี่ยวเนื่องและการเคลียร์ข้อมูลขณะ task ถูกยกเลิก
ควบคุมความขนาน (Semaphore)
การเปิดการเชื่อมต่อหรือส่งคำขอพร้อมกันเป็นจำนวนมากอาจทำให้ทรัพยากรหมดได้ ควรจำกัดความขนานด้วย asyncio.Semaphore
ตัวอย่างต่อไปนี้คือการจำกัดการดาวน์โหลดพร้อมกันโดยใช้ semaphore
1import asyncio
2import aiohttp
3
4semaphore = asyncio.Semaphore(3) # allow up to 3 concurrent tasks
5
6async def limited_fetch(session, url):
7 async with semaphore:
8 async with session.get(url) as resp:
9 return url, resp.status
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [limited_fetch(session, u) for u in urls]
14 results = await asyncio.gather(*tasks)
15 print(results)
16
17if __name__ == "__main__":
18 urls = ["https://codesparklab.com/json/example.json"] * 10
19 asyncio.run(main(urls))- ด้วยวิธีนี้จะช่วยให้เข้าถึงบริการภายนอกอย่างระมัดระวังและป้องกันไม่ให้ระบบของคุณล้นภาระ
การจัดการข้อผิดพลาดและการพยายามซ้ำ (Retry)
ในการประมวลผลแบบอะซิงโครนัสก็ยังมีข้อผิดพลาดเกิดขึ้นได้เสมอ ควรดักจับ exception อย่างเหมาะสมและนำกลยุทธ์ retry เช่น exponential backoff มาใช้
ด้านล่างคือตัวอย่างการ retry ง่าย ๆ ไม่เกิน N ครั้ง
1import asyncio
2import aiohttp
3import random
4
5async def fetch_with_retry(session, url, retries=3):
6 for attempt in range(1, retries + 1):
7 try:
8 async with session.get(url) as resp:
9 resp.raise_for_status()
10 text = await resp.text()
11 return text
12 except Exception as e:
13 if attempt == retries:
14 raise
15 await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18 async with aiohttp.ClientSession() as session:
19 text = await fetch_with_retry(session, "https://codesparklab.com/")
20 print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23 asyncio.run(main())- ตรรกะใน retry ที่เหมาะสมสำคัญต่อการรักษาความเสถียรและควบคุมปริมาณงาน
เคล็ดลับสำหรับการดีบักและ log
ในงานแบบอะซิงโครนัส แต่ละงานจะทำงานพร้อมกัน จึงควรตรวจสอบปัญหาให้ละเอียดเพราะอาจสืบค้นหาสาเหตุได้ยาก เพื่อแก้ไขปัญหาได้อย่างมีประสิทธิภาพ ควรคำนึงถึงจุดต่อไปนี้เพื่อช่วยในการดีบัก
- ข้อผิดพลาดจาก
asyncio.run()และTaskมักถูกมองข้าม ดังนั้นควรตั้งระบบ log สำหรับ exception ที่ไม่ได้รับการจัดการ - เมื่อใช้
loggingหากใส่ชื่อ coroutine หรือ ใน Python 3.8 ขึ้นไป ใช้task.get_name()ใน log จะช่วยให้ติดตามสถานะได้ง่ายขึ้น - คุณสามารถตรวจสอบสถานะปัจจุบันของ task ต่างๆ ได้ด้วย
asyncio.Task.all_tasks()อย่างไรก็ดี API นี้ถูกออกแบบมาเพื่อจุดประสงค์สำหรับการดีบัก ควรใช้อย่างระมัดระวังในสภาพแวดล้อม production เพื่อหลีกเลี่ยงปัญหาด้านประสิทธิภาพหรือผลกระทบที่คาดไม่ถึง
ข้อควรระวังด้านประสิทธิภาพ
แม้การเขียนโปรแกรมแบบอะซิงโครนัสจะเหมาะกับงานที่ต้องรอ I/O แต่ถ้าใช้ไม่ถูกต้องอาจลดประสิทธิภาพของระบบได้ ควรเพิ่มประสิทธิภาพตามจุดต่อไปนี้:
- การประมวลผลแบบอะซิงโครนัสเหมาะกับงานที่ต้องรอ I/O ไม่เหมาะกับงานที่ใช้ CPU มาก — ในกรณีนั้นควรใช้ process pool
- เมื่อใช้ thread หรือ process pool ควรคำนึงถึงขนาดของ pool และลักษณะงาน
- หากเปิด task จำนวนมากในครั้งเดียว overhead ของ event loop จะเพิ่มขึ้น — ควรใช้ batch หรือ semaphore เพื่อช่วยควบคุม
สรุป
ระบบ I/O แบบอะซิงโครนัสของ Python เป็นกลไกที่มีประสิทธิภาพสำหรับการใช้เวลารอ I/O และรันงานบนเครือข่ายหรือไฟล์ไปพร้อม ๆ กันอย่างมีประสิทธิภาพ โดยการผสมผสานเทคนิคต่าง ๆ เช่น asyncio, aiohttp, aiofiles และ run_in_executor จะช่วยให้คุณเขียนแอปพลิเคชันแบบอะซิงโครนัสที่ยืดหยุ่นและใช้ได้จริง การใช้ async with เพื่อบริหารจัดการการเริ่มต้นและการปล่อยทรัพยากรแบบอัตโนมัติจะช่วยจัดการ resource ต่าง ๆ เช่น ไฟล์, HTTP session, และ lock ได้อย่างปลอดภัยและเชื่อถือได้ ด้วยการผนวกการจัดการข้อผิดพลาดและดูแลความขนานอย่างเหมาะสม จะช่วยให้คุณรันโปรแกรมอะซิงโครนัสที่เชื่อถือได้สูงได้อย่างปลอดภัย
คุณสามารถติดตามบทความข้างต้นโดยใช้ Visual Studio Code บนช่อง YouTube ของเรา กรุณาตรวจสอบช่อง YouTube ด้วย