Asynchronous Input/Output
This article explains asynchronous input/output.
This guide gently explains, step by step, the concepts and patterns of asynchronous input/output that are practically useful in Python.
YouTube Video
Asynchronous Input/Output (I/O)
Concept of Asynchronous I/O
Asynchronous I/O is a mechanism that allows other operations to run in parallel while waiting for time-consuming I/O, such as file operations or network communication. In Python, asyncio is provided as the standard asynchronous framework, and many libraries are designed to follow this mechanism.
Basics: async / await and the Event Loop
First, here is how to write basic coroutines and an example of running multiple coroutines simultaneously using asyncio.gather.
The code below is a minimal example of defining and running asynchronous functions concurrently. The sleep function is used to demonstrate parallel execution.
1import asyncio
2
3async def worker(name: str, delay: float):
4 # Simulate I/O-bound work by sleeping
5 print(f"{name} started")
6 await asyncio.sleep(delay)
7 print(f"{name} finished after {delay}s")
8 return name, delay
9
10async def main():
11 # Run multiple coroutines concurrently
12 tasks = [
13 worker("A", 1.5),
14 worker("B", 1.0),
15 worker("C", 0.5),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- This code starts the event loop using
asyncio.run()and runs three coroutines concurrently.
async with and Asynchronous Context Managers
In asynchronous processing, resource management such as opening connections and closing files can easily become complex. This is where asynchronous context managers using async with become useful. This syntax is used just like the synchronous with statement, but the internal processing is asynchronous, so it fits naturally into the async/await flow.
There are two main reasons to use async with:.
- To reliably clean up resources such as connections, file handles, or sessions. You can rest assured that resources will be properly released even if an abnormal termination occurs.
- To automate initialization and cleanup tasks, such as establishing or closing connections and flushing, in an asynchronous manner. This saves the trouble of manual coding and makes your code clearer.
Below is an example of creating a simple asynchronous context manager from scratch.
1import asyncio
2
3class AsyncResource:
4 async def __aenter__(self):
5 print("Opening resource...")
6 await asyncio.sleep(0.5)
7 print("Resource opened")
8 return self
9
10 async def __aexit__(self, exc_type, exc, tb):
11 print("Closing resource...")
12 await asyncio.sleep(0.5)
13 print("Resource closed")
14
15async def main():
16 async with AsyncResource() as r:
17 print("Using resource...")
18
19if __name__ == "__main__":
20 asyncio.run(main())- By defining
__aenter__and__aexit__, you can useasync with. - Processing when entering and exiting the
async withblock is executed asynchronously and safely.
Asynchronous File I/O (aiofiles)
File operations are a classic example of blocking. By using aiofiles, you can safely handle file operations asynchronously. Internally, it uses a thread pool and ensures that files are properly closed using async with.
The following example demonstrates parallel asynchronous reading of multiple files. You need to install aiofiles with pip install aiofiles before running this code.
1# pip install aiofiles
2import asyncio
3import aiofiles
4from pathlib import Path
5
6async def read_file(path: Path):
7 # Read file content asynchronously
8 async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
9 contents = await f.read()
10 return path.name, len(contents)
11
12async def main():
13 files = [
14 Path("example1.txt"),
15 Path("example2.txt"),
16 Path("example3.txt")
17 ]
18 tasks = [read_file(p) for p in files]
19 results = await asyncio.gather(*tasks)
20 for name, size in results:
21 print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24 asyncio.run(main())- This code parallelizes the reading of each file.
aiofilesoften uses a thread pool internally, allowing you to handle blocking file I/O via an asynchronous interface.
Asynchronous HTTP Client (aiohttp)
As a classic example of network I/O, here is how to perform HTTP requests asynchronously. It is particularly powerful when you need to make a large number of HTTP requests in parallel.
Below is an example of fetching multiple URLs in parallel using aiohttp. You will need to install aiohttp with pip install aiohttp.
1# pip install aiohttp
2import asyncio
3import aiohttp
4
5async def fetch(session: aiohttp.ClientSession, url: str):
6 # Fetch a URL asynchronously and return status and size
7 async with session.get(url) as resp:
8 text = await resp.text()
9 return url, resp.status, len(text)
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [fetch(session, u) for u in urls]
14 for coro in asyncio.as_completed(tasks):
15 url, status, size = await coro
16 print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19 urls = [
20 "https://codesparklab.com/json/example1.json",
21 "https://codesparklab.com/json/example2.json",
22 "https://codesparklab.com/json/example3.json",
23 ]
24 asyncio.run(main(urls))- By using
asyncio.as_completed, you can process results in the order that tasks complete. This is useful for efficiently dealing with many requests.
Coexistence with Blocking I/O: run_in_executor
When dealing with CPU-intensive tasks or existing blocking APIs in asynchronous code, use ThreadPoolExecutor or ProcessPoolExecutor via loop.run_in_executor.
The following code is an example of running tasks that assume blocking I/O concurrently using a thread pool.
1import asyncio
2from concurrent.futures import ThreadPoolExecutor
3import time
4
5def blocking_io(n):
6 # Simulate a blocking I/O or CPU-bound function
7 time.sleep(n)
8 return f"slept {n}s"
9
10async def main():
11 loop = asyncio.get_running_loop()
12 with ThreadPoolExecutor() as pool:
13 tasks = [
14 loop.run_in_executor(pool, blocking_io, 1),
15 loop.run_in_executor(pool, blocking_io, 2),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Blocking results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- By utilizing
run_in_executor, you can incorporate existing synchronous code into asynchronous flows without significant rewrites. However, you should pay attention to the number of threads and CPU load. ProcessPoolExecutoris suitable for CPU-bound tasks.
Asynchronous Server: asyncio-based TCP Echo Server
If you want to handle sockets directly, you can easily build an asynchronous server using asyncio.start_server.
The following example is a simple echo server that returns data exactly as received from the client.
1import asyncio
2
3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
4 # Handle a single client: read data and echo it back
5 addr = writer.get_extra_info('peername')
6 print(f"Connection from {addr}")
7 while True:
8 data = await reader.read(1024)
9 if not data:
10 break
11 writer.write(data) # echo back
12 await writer.drain()
13 writer.close()
14 await writer.wait_closed()
15 print(f"Connection closed {addr}")
16
17async def main():
18 server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19 addr = server.sockets[0].getsockname()
20 print(f"Serving on {addr}")
21 async with server:
22 await server.serve_forever()
23
24if __name__ == "__main__":
25 asyncio.run(main())-
In TCP communication with
asyncio,StreamReaderandStreamWriterplay a central role in asynchronous input and output.StreamReaderasynchronously reads data sent from the client, whileStreamWriteris used to send responses from the server back to the client. -
Even without handling the detailed operations of sockets yourself, you can launch an asynchronous server simply and efficiently using
asyncio.start_server. -
When you pass a handler function to
asyncio.start_server, that function receivesreaderandwriteras its arguments. By using these, you can implement communication processes in a safer and clearer way than handling low-level socket APIs directly. For example, by receiving data withreader.read()and combiningwriter.write()withwriter.drain(), you can implement asynchronous sending that ensures the transmission is completed. -
This setup is suitable for handling large numbers of simultaneous connections and is ideal for simple protocols or small-scale TCP services.
Handling Large Streaming Data
When processing large files or responses sequentially, read and write data in chunks to keep memory usage low. Below is an example of streaming reads using aiohttp.
The following code processes HTTP responses in chunks and writes to the disk as data is received.
1import aiohttp
2import asyncio
3import aiofiles
4
5async def stream_download(url: str, dest: str):
6 # Stream download and write to file in chunks
7 async with aiohttp.ClientSession() as session:
8 async with session.get(url) as resp:
9 async with aiofiles.open(dest, 'wb') as f:
10 async for chunk in resp.content.iter_chunked(1024 * 64):
11 await f.write(chunk)
12
13if __name__ == "__main__":
14 asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))-
This code does not load a large file all at once; instead, it receives the data in chunks (small pieces) and writes it to a file asynchronously. As a result, it can perform downloads quickly and efficiently while keeping memory usage low.
aiohttpasynchronously retrieves data andaiofileswrites to the file without blocking, making it easy to run alongside other processes. -
This pattern is suitable for downloading and saving large files efficiently while minimizing memory usage.
Asynchronous Execution of Subprocesses
If you want to run external commands asynchronously and read their output in real time, asyncio.create_subprocess_exec is useful.
Below is an example of starting an external command and reading its standard output in real time.
1import asyncio
2
3async def run_cmd(cmd):
4 # Run external command asynchronously and capture output line by line
5 proc = await asyncio.create_subprocess_exec(
6 *cmd,
7 stdout=asyncio.subprocess.PIPE,
8 stderr=asyncio.subprocess.PIPE
9 )
10
11 async def read_stream(stream, name):
12 while True:
13 line = await stream.readline()
14 if not line:
15 break
16 print(f"[{name}] {line.decode().rstrip()}")
17
18 await asyncio.gather(
19 read_stream(proc.stdout, "stdout"),
20 read_stream(proc.stderr, "stderr"),
21 )
22 await proc.wait()
23 return proc.returncode
24
25if __name__ == "__main__":
26 asyncio.run(run_cmd(["python", "--version"]))- By controlling subprocesses asynchronously, you can handle logs from external tools in real time or run multiple processes in parallel.
Handling Cancel and Timeout
Asynchronous tasks can be cancelled. When implementing a timeout, it is simple to use asyncio.wait_for.
Below is an example of running a task with a timeout.
1import asyncio
2
3async def slow_task():
4 await asyncio.sleep(5)
5 return "done"
6
7async def main():
8 try:
9 result = await asyncio.wait_for(slow_task(), timeout=2.0)
10 print("Result:", result)
11 except asyncio.TimeoutError:
12 print("Task timed out")
13
14if __name__ == "__main__":
15 asyncio.run(main())wait_forthrows aTimeoutErrorif the timeout is reached and cancels the task if necessary. Be careful with task cancellation propagation and cleanup.
Controlling Concurrency (Semaphore)
Because making many concurrent connections or requests can exhaust resources, limit concurrency with asyncio.Semaphore.
The following is an example of limiting simultaneous downloads using a semaphore.
1import asyncio
2import aiohttp
3
4semaphore = asyncio.Semaphore(3) # allow up to 3 concurrent tasks
5
6async def limited_fetch(session, url):
7 async with semaphore:
8 async with session.get(url) as resp:
9 return url, resp.status
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [limited_fetch(session, u) for u in urls]
14 results = await asyncio.gather(*tasks)
15 print(results)
16
17if __name__ == "__main__":
18 urls = ["https://codesparklab.com/json/example.json"] * 10
19 asyncio.run(main(urls))- With this method, you can access external services gently and avoid overloading your own process.
Error Handling and Retry Strategies
Errors inevitably occur even in asynchronous processing. Catch exceptions appropriately and implement retry strategies such as exponential backoff.
Below is an example implementation of simple retries up to N times.
1import asyncio
2import aiohttp
3import random
4
5async def fetch_with_retry(session, url, retries=3):
6 for attempt in range(1, retries + 1):
7 try:
8 async with session.get(url) as resp:
9 resp.raise_for_status()
10 text = await resp.text()
11 return text
12 except Exception as e:
13 if attempt == retries:
14 raise
15 await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18 async with aiohttp.ClientSession() as session:
19 text = await fetch_with_retry(session, "https://codesparklab.com/")
20 print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23 asyncio.run(main())- Proper retry logic is important for balancing consistency and traffic control.
Tips for Debugging and Logging
In asynchronous processing, tasks proceed concurrently, which can make it difficult to identify the cause of problems. To track issues efficiently, keeping the following points in mind will make debugging smoother.
- Exceptions from
asyncio.run()andTaskare easy to miss, so make sure to log unhandled exceptions. - When using
logging, including the coroutine name or, in Python 3.8 and above,task.get_name()in your logs makes tracking easier. - You can check the current status of tasks using
asyncio.Task.all_tasks(). However, this API is intended for debugging purposes and should be used with caution in production environments to avoid performance issues or unexpected interference.
Performance Considerations
While asynchronous programming excels at handling I/O waits, improper use can actually degrade performance. Optimize by keeping the following points in mind:.
- Asynchronous processing excels at I/O-bound tasks but is not suited for CPU-bound tasks; use a process pool in such cases.
- When using thread or process pools, consider the pool size and task nature.
- If you start many small tasks at once, event loop overhead increases—so use batching or semaphores to adjust.
Summary
Python's asynchronous I/O is a powerful mechanism that makes effective use of I/O wait times and efficiently executes network and file operations concurrently. By combining techniques such as asyncio, aiohttp, aiofiles, and run_in_executor, you can flexibly build practical asynchronous applications. Utilizing async with to automate resource acquisition and release allows you to safely and reliably manage asynchronous resources such as files, HTTP sessions, and locks. By incorporating proper error handling and concurrency management, you can run high-reliability asynchronous programs safely.
You can follow along with the above article using Visual Studio Code on our YouTube channel. Please also check out the YouTube channel.