Nhập/Xuất không đồng bộ
Bài viết này giải thích về nhập/xuất không đồng bộ.
Hướng dẫn này giải thích một cách nhẹ nhàng, từng bước một về các khái niệm và mẫu của nhập/xuất không đồng bộ hữu ích khi sử dụng Python trên thực tế.
YouTube Video
Nhập/Xuất không đồng bộ (I/O)
Khái niệm về I/O không đồng bộ
I/O không đồng bộ là một cơ chế cho phép các thao tác khác chạy song song trong khi đợi các tác vụ I/O tốn thời gian, như thao tác tệp hoặc truyền thông mạng. Trong Python, asyncio được cung cấp như một framework tiêu chuẩn dành cho lập trình không đồng bộ, và nhiều thư viện cũng được thiết kế dựa trên cơ chế này.
Căn bản: async / await và vòng lặp sự kiện (Event Loop)
Đầu tiên, đây là cách viết coroutine cơ bản cùng ví dụ chạy nhiều coroutine đồng thời sử dụng asyncio.gather.
Đoạn mã dưới đây là ví dụ tối giản về việc định nghĩa và chạy các hàm không đồng bộ cùng lúc. Hàm sleep được sử dụng để mô phỏng việc thực thi song song.
1import asyncio
2
3async def worker(name: str, delay: float):
4 # Simulate I/O-bound work by sleeping
5 print(f"{name} started")
6 await asyncio.sleep(delay)
7 print(f"{name} finished after {delay}s")
8 return name, delay
9
10async def main():
11 # Run multiple coroutines concurrently
12 tasks = [
13 worker("A", 1.5),
14 worker("B", 1.0),
15 worker("C", 0.5),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Mã này khởi động vòng lặp sự kiện bằng
asyncio.run()và thực thi ba coroutine đồng thời.
async with và Bộ quản lý ngữ cảnh không đồng bộ
Trong xử lý không đồng bộ, việc quản lý tài nguyên như mở kết nối hay đóng tập tin có thể trở nên phức tạp. Lúc này, bộ quản lý ngữ cảnh không đồng bộ sử dụng async with trở nên hữu ích. Cú pháp này sử dụng tương tự như lệnh with đồng bộ, nhưng xử lý nội bộ là không đồng bộ cho phép tích hợp mạch lạc vào luồng async/await.
Có hai lý do chính để sử dụng async with:.
- Để đảm bảo dọn dẹp tài nguyên như kết nối, bộ xử lý tệp, hoặc phiên làm việc một cách đáng tin cậy. Bạn có thể yên tâm rằng tài nguyên sẽ được giải phóng đúng cách ngay cả khi xảy ra sự cố bất thường.
- Tự động hóa các tác vụ khởi tạo và dọn dẹp, như thiết lập hoặc đóng kết nối, xóa bộ nhớ đệm theo cách không đồng bộ. Giúp bạn giảm bớt việc viết mã thủ công và làm mã của bạn rõ ràng hơn.
Dưới đây là ví dụ tạo một bộ quản lý ngữ cảnh không đồng bộ đơn giản từ đầu.
1import asyncio
2
3class AsyncResource:
4 async def __aenter__(self):
5 print("Opening resource...")
6 await asyncio.sleep(0.5)
7 print("Resource opened")
8 return self
9
10 async def __aexit__(self, exc_type, exc, tb):
11 print("Closing resource...")
12 await asyncio.sleep(0.5)
13 print("Resource closed")
14
15async def main():
16 async with AsyncResource() as r:
17 print("Using resource...")
18
19if __name__ == "__main__":
20 asyncio.run(main())- Bằng cách định nghĩa
__aenter__và__aexit__, bạn có thể sử dụngasync with. - Xử lý khi vào và ra khỏi khối
async withsẽ được thực thi không đồng bộ và an toàn.
I/O tệp không đồng bộ (aiofiles)
Các thao tác tệp là ví dụ điển hình gây chặn. Nhờ sử dụng aiofiles, bạn có thể xử lý thao tác tệp một cách an toàn và không đồng bộ. Bên trong, nó sử dụng pool luồng để đảm bảo file được đóng đúng cách bằng async with.
Ví dụ sau đây minh họa việc đọc nhiều tệp song song bằng phương thức không đồng bộ. Bạn cần cài đặt aiofiles bằng lệnh pip install aiofiles trước khi chạy đoạn mã này.
1# pip install aiofiles
2import asyncio
3import aiofiles
4from pathlib import Path
5
6async def read_file(path: Path):
7 # Read file content asynchronously
8 async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
9 contents = await f.read()
10 return path.name, len(contents)
11
12async def main():
13 files = [
14 Path("example1.txt"),
15 Path("example2.txt"),
16 Path("example3.txt")
17 ]
18 tasks = [read_file(p) for p in files]
19 results = await asyncio.gather(*tasks)
20 for name, size in results:
21 print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24 asyncio.run(main())- Mã này song song hóa việc đọc từng tệp.
aiofilesthường sử dụng pool luồng bên trong, cho phép bạn xử lý I/O tệp gây chặn thông qua giao diện không đồng bộ.
HTTP Client không đồng bộ (aiohttp)
Như là ví dụ kinh điển về I/O mạng, đây là cách thực hiện các yêu cầu HTTP một cách không đồng bộ. Điều này đặc biệt mạnh mẽ khi bạn cần gửi nhiều yêu cầu HTTP song song.
Dưới đây là ví dụ về việc lấy nhiều URL song song với aiohttp. Bạn cần cài đặt aiohttp bằng lệnh pip install aiohttp.
1# pip install aiohttp
2import asyncio
3import aiohttp
4
5async def fetch(session: aiohttp.ClientSession, url: str):
6 # Fetch a URL asynchronously and return status and size
7 async with session.get(url) as resp:
8 text = await resp.text()
9 return url, resp.status, len(text)
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [fetch(session, u) for u in urls]
14 for coro in asyncio.as_completed(tasks):
15 url, status, size = await coro
16 print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19 urls = [
20 "https://codesparklab.com/json/example1.json",
21 "https://codesparklab.com/json/example2.json",
22 "https://codesparklab.com/json/example3.json",
23 ]
24 asyncio.run(main(urls))- Bằng cách sử dụng
asyncio.as_completed, bạn có thể xử lý kết quả theo thứ tự hoàn thành nhiệm vụ. Điều này rất hữu ích để xử lý nhiều yêu cầu một cách hiệu quả.
Sự đồng tồn tại với I/O bị chặn: run_in_executor
Khi xử lý các tác vụ cần nhiều CPU hoặc các API dạng blocking có sẵn trong mã không đồng bộ, hãy sử dụng ThreadPoolExecutor hoặc ProcessPoolExecutor thông qua loop.run_in_executor.
Đoạn mã dưới đây là ví dụ về việc chạy song song các tác vụ giả định I/O bị chặn bằng cách sử dụng thread pool.
1import asyncio
2from concurrent.futures import ThreadPoolExecutor
3import time
4
5def blocking_io(n):
6 # Simulate a blocking I/O or CPU-bound function
7 time.sleep(n)
8 return f"slept {n}s"
9
10async def main():
11 loop = asyncio.get_running_loop()
12 with ThreadPoolExecutor() as pool:
13 tasks = [
14 loop.run_in_executor(pool, blocking_io, 1),
15 loop.run_in_executor(pool, blocking_io, 2),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Blocking results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Bằng cách sử dụng
run_in_executor, bạn có thể tích hợp mã đồng bộ hiện có vào luồng không đồng bộ mà không cần phải chỉnh sửa nhiều. Tuy nhiên, bạn nên chú ý đến số lượng luồng và tải CPU. ProcessPoolExecutorphù hợp với các tác vụ cần nhiều CPU.
Máy chủ không đồng bộ: Máy chủ Echo TCP sử dụng asyncio
Nếu bạn muốn làm việc trực tiếp với socket, bạn có thể dễ dàng xây dựng máy chủ không đồng bộ với asyncio.start_server.
Ví dụ dưới đây là một máy chủ echo đơn giản, trả về dữ liệu đúng như nhận từ client.
1import asyncio
2
3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
4 # Handle a single client: read data and echo it back
5 addr = writer.get_extra_info('peername')
6 print(f"Connection from {addr}")
7 while True:
8 data = await reader.read(1024)
9 if not data:
10 break
11 writer.write(data) # echo back
12 await writer.drain()
13 writer.close()
14 await writer.wait_closed()
15 print(f"Connection closed {addr}")
16
17async def main():
18 server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19 addr = server.sockets[0].getsockname()
20 print(f"Serving on {addr}")
21 async with server:
22 await server.serve_forever()
23
24if __name__ == "__main__":
25 asyncio.run(main())-
Trong giao tiếp TCP với
asyncio,StreamReadervàStreamWriterđóng vai trò trung tâm trong việc nhập xuất bất đồng bộ.StreamReaderđọc dữ liệu từ client một cách bất đồng bộ, cònStreamWriterđược dùng để gửi phản hồi từ server trở lại client. -
Ngay cả khi không tự xử lý các thao tác socket chi tiết, bạn có thể khởi chạy một máy chủ bất đồng bộ một cách đơn giản và hiệu quả bằng cách sử dụng
asyncio.start_server. -
Khi bạn truyền một hàm xử lý cho
asyncio.start_server, hàm đó sẽ nhậnreadervàwriterlàm tham số. Bằng cách sử dụng chúng, bạn có thể triển khai các quy trình giao tiếp theo cách an toàn và rõ ràng hơn so với việc trực tiếp xử lý các API socket cấp thấp. Ví dụ, bằng cách nhận dữ liệu vớireader.read()và kết hợpwriter.write()cùng vớiwriter.drain(), bạn có thể thực hiện gửi bất đồng bộ đảm bảo quá trình truyền được hoàn tất. -
Thiết lập này phù hợp để xử lý nhiều kết nối đồng thời và rất lý tưởng cho các giao thức đơn giản hoặc dịch vụ TCP quy mô nhỏ.
Xử lý dữ liệu truyền lớn (Large Streaming Data)
Khi xử lý các tệp lớn hoặc phản hồi tuần tự, nên đọc và ghi dữ liệu theo từng phần nhỏ (chunk) để tiết kiệm bộ nhớ. Dưới đây là ví dụ về việc đọc dữ liệu dạng dòng (streaming) dùng aiohttp.
Mã sau đây xử lý phản hồi HTTP thành các phân đoạn và ghi ra ổ đĩa khi nhận được dữ liệu.
1import aiohttp
2import asyncio
3import aiofiles
4
5async def stream_download(url: str, dest: str):
6 # Stream download and write to file in chunks
7 async with aiohttp.ClientSession() as session:
8 async with session.get(url) as resp:
9 async with aiofiles.open(dest, 'wb') as f:
10 async for chunk in resp.content.iter_chunked(1024 * 64):
11 await f.write(chunk)
12
13if __name__ == "__main__":
14 asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))-
Đoạn mã này không tải toàn bộ tệp lớn cùng một lúc; thay vào đó, nó nhận dữ liệu từng phần (các đoạn nhỏ) và ghi chúng vào tệp một cách bất đồng bộ. Kết quả là, nó có thể thực hiện tải về nhanh chóng và hiệu quả trong khi vẫn giữ mức sử dụng bộ nhớ ở mức thấp.
aiohttptruy xuất dữ liệu bất đồng bộ vàaiofilesghi vào tệp mà không bị chặn, giúp dễ dàng chạy đồng thời với các tiến trình khác. -
Mẫu này phù hợp để tải xuống và lưu trữ các tập tin lớn một cách hiệu quả, giảm thiểu việc sử dụng bộ nhớ.
Thực thi tiến trình con (Subprocess) không đồng bộ
Nếu bạn muốn thực thi lệnh ngoài một cách không đồng bộ và đọc kết quả đầu ra thời gian thực, asyncio.create_subprocess_exec rất hữu ích.
Dưới đây là ví dụ chạy lệnh ngoài và đọc đầu ra chuẩn theo thời gian thực.
1import asyncio
2
3async def run_cmd(cmd):
4 # Run external command asynchronously and capture output line by line
5 proc = await asyncio.create_subprocess_exec(
6 *cmd,
7 stdout=asyncio.subprocess.PIPE,
8 stderr=asyncio.subprocess.PIPE
9 )
10
11 async def read_stream(stream, name):
12 while True:
13 line = await stream.readline()
14 if not line:
15 break
16 print(f"[{name}] {line.decode().rstrip()}")
17
18 await asyncio.gather(
19 read_stream(proc.stdout, "stdout"),
20 read_stream(proc.stderr, "stderr"),
21 )
22 await proc.wait()
23 return proc.returncode
24
25if __name__ == "__main__":
26 asyncio.run(run_cmd(["python", "--version"]))- Nhờ điều khiển subprocess không đồng bộ, bạn có thể xử lý log từ công cụ ngoài theo thời gian thực hoặc chạy song song nhiều tiến trình.
Xử lý Hủy (Cancel) và Hết thời gian (Timeout)
Các tác vụ không đồng bộ có thể bị hủy. Khi cần timeout, bạn chỉ cần dùng asyncio.wait_for.
Dưới đây là ví dụ chạy tác vụ có timeout.
1import asyncio
2
3async def slow_task():
4 await asyncio.sleep(5)
5 return "done"
6
7async def main():
8 try:
9 result = await asyncio.wait_for(slow_task(), timeout=2.0)
10 print("Result:", result)
11 except asyncio.TimeoutError:
12 print("Task timed out")
13
14if __name__ == "__main__":
15 asyncio.run(main())wait_forsẽ đưa ra ngoại lệTimeoutErrornếu quá thời gian và hủy nhiệm vụ nếu cần thiết. Hãy cẩn thận với việc truyền và dọn dẹp sau khi hủy tác vụ.
Kiểm soát mức đồng thời (Concurrency) (Semaphore)
Vì việc tạo nhiều kết nối hoặc yêu cầu đồng thời có thể làm cạn kiệt tài nguyên, hãy hạn chế bằng asyncio.Semaphore.
Dưới đây là ví dụ giới hạn số lượt tải đồng thời bằng semaphore.
1import asyncio
2import aiohttp
3
4semaphore = asyncio.Semaphore(3) # allow up to 3 concurrent tasks
5
6async def limited_fetch(session, url):
7 async with semaphore:
8 async with session.get(url) as resp:
9 return url, resp.status
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [limited_fetch(session, u) for u in urls]
14 results = await asyncio.gather(*tasks)
15 print(results)
16
17if __name__ == "__main__":
18 urls = ["https://codesparklab.com/json/example.json"] * 10
19 asyncio.run(main(urls))- Thông qua cách này, bạn có thể truy cập các dịch vụ ngoài một cách nhẹ nhàng, tránh quá tải tiến trình của mình.
Xử lý lỗi và Chiến lược thử lại (Retry)
Lỗi là điều không thể tránh khỏi kể cả trong xử lý không đồng bộ. Hãy bắt ngoại lệ phù hợp và triển khai chiến lược thử lại như backoff theo cấp số nhân.
Dưới đây là ví dụ thử lại đơn giản tối đa N lần.
1import asyncio
2import aiohttp
3import random
4
5async def fetch_with_retry(session, url, retries=3):
6 for attempt in range(1, retries + 1):
7 try:
8 async with session.get(url) as resp:
9 resp.raise_for_status()
10 text = await resp.text()
11 return text
12 except Exception as e:
13 if attempt == retries:
14 raise
15 await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18 async with aiohttp.ClientSession() as session:
19 text = await fetch_with_retry(session, "https://codesparklab.com/")
20 print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23 asyncio.run(main())- Logic thử lại hợp lý rất quan trọng để cân bằng tính nhất quán và kiểm soát lưu lượng.
Mẹo cho Debugging và Logging
Trong xử lý không đồng bộ, các nhiệm vụ diễn ra đồng thời nên khó xác định nguyên nhân lỗi. Để theo dõi lỗi hiệu quả, ghi nhớ những điểm sau sẽ giúp việc debug dễ dàng hơn.
- Ngoại lệ từ
asyncio.run()vàTaskdễ bị bỏ sót, vì vậy hãy đảm bảo log lại các exception chưa xử lý. - Khi sử dụng
logging, thêm tên coroutine hoặc, trong Python 3.8 trở lên,task.get_name()vào log sẽ giúp việc theo dõi trở nên dễ dàng hơn. - Bạn có thể kiểm tra trạng thái hiện tại của các tác vụ bằng cách sử dụng
asyncio.Task.all_tasks(). Tuy nhiên, API này chỉ dành cho mục đích gỡ lỗi và nên được sử dụng thận trọng trong môi trường sản xuất để tránh các sự cố về hiệu suất hoặc can thiệp ngoài ý muốn.
Cân nhắc về hiệu năng
Lập trình không đồng bộ rất mạnh ở xử lý chờ I/O, nhưng sử dụng không đúng có thể làm giảm hiệu năng. Tối ưu hóa bằng cách lưu ý các điểm sau:.
- Xử lý không đồng bộ phù hợp với tác vụ chờ I/O, không hợp lý cho tác vụ ngốn CPU; hãy dùng pool tiến trình trong những trường hợp đó.
- Khi sử dụng pool luồng hoặc pool tiến trình, hãy cân nhắc kích cỡ pool và tính chất nhiệm vụ.
- Nếu bạn bắt đầu nhiều tác vụ nhỏ cùng lúc, overhead của vòng lặp sự kiện sẽ tăng—hãy dùng batching hoặc semaphore để cân bằng.
Tóm tắt
I/O không đồng bộ của Python là một cơ chế mạnh mẽ, tận dụng tối đa thời gian chờ I/O và thực hiện hiệu quả các thao tác mạng và tệp song song. Bằng cách kết hợp các kỹ thuật như asyncio, aiohttp, aiofiles và run_in_executor, bạn có thể xây dựng các ứng dụng không đồng bộ thực tiễn một cách linh hoạt. Sử dụng async with để tự động hóa việc chiếm và giải phóng tài nguyên giúp bạn quản lý an toàn, đáng tin cậy các tài nguyên không đồng bộ như tệp, phiên HTTP hoặc khóa. Bằng cách thêm xử lý lỗi và quản lý đồng thời đúng cách, bạn có thể vận hành các chương trình không đồng bộ với độ tin cậy cao một cách an toàn.
Bạn có thể làm theo bài viết trên bằng cách sử dụng Visual Studio Code trên kênh YouTube của chúng tôi. Vui lòng ghé thăm kênh YouTube.