Asynchronous na Input/Output
Ipinapaliwanag ng artikulong ito ang asynchronous na input/output.
Ang gabay na ito ay malinaw na nagpapaliwanag, hakbang-hakbang, ng mga konsepto at pattern ng asynchronous input/output na praktikal na magagamit sa Python.
YouTube Video
Asynchronous Input/Output (I/O)
Konsepto ng Asynchronous I/O
Ang Asynchronous I/O ay isang mekanismo na nagpapahintulot sa ibang mga operasyon na tumakbo nang sabay habang naghihintay para sa matagal na I/O, tulad ng mga file operation o komunikasyon sa network. Sa Python, ang asyncio ay inilaang pamantayang asynchronous framework, at maraming mga library ang dinisenyo upang sundan ang mekanismong ito.
Mga Batayan: async / await at ang Event Loop
Una, narito kung paano magsulat ng pangunahing mga coroutine at isang halimbawa ng pagpapatakbo ng maramihang coroutine nang sabay gamit ang asyncio.gather.
Ang code sa ibaba ay isang minimal na halimbawa ng pagdedeklara at pagpapatakbo ng asynchronous na mga function nang sabay. Ginagamit ang sleep function upang ipakita ang sabayang pagpapatakbo.
1import asyncio
2
3async def worker(name: str, delay: float):
4 # Simulate I/O-bound work by sleeping
5 print(f"{name} started")
6 await asyncio.sleep(delay)
7 print(f"{name} finished after {delay}s")
8 return name, delay
9
10async def main():
11 # Run multiple coroutines concurrently
12 tasks = [
13 worker("A", 1.5),
14 worker("B", 1.0),
15 worker("C", 0.5),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Sinisimulan ng code na ito ang event loop gamit ang
asyncio.run()at pinapatakbo nang sabay-sabay ang tatlong coroutine.
async with at Mga Asynchronous Context Manager
Sa asynchronous na pagpoproseso, ang resource management tulad ng pagbubukas ng koneksyon at pagsasara ng mga file ay madaling maging komplikado. Dito nagiging kapaki-pakinabang ang mga asynchronous context manager na gumagamit ng async with. Ginagamit ang syntax na ito tulad ng synchronous na with statement, ngunit ang panloob na proseso ay asynchronous, kaya natural itong umaangkop sa async/await na daloy.
May dalawang pangunahing dahilan upang gamitin ang async with:.
- Upang maaasahang malinis ang mga resources tulad ng koneksyon, file handles, o mga session. Makakatiyak ka na maayos na mara-release ang mga resources kahit magkaroon ng hindi inaasahang pagtigil.
- Upang i-automate ang initialization at cleanup na mga gawain, tulad ng pagpapasimula o pagsasara ng koneksyon at pag-flush, sa asynchronous na paraan. Ipinapadali nito ang gawain at nagpapalinaw ng iyong code.
Narito ang isang halimbawa ng paggawa ng isang simpleng asynchronous context manager mula sa simula.
1import asyncio
2
3class AsyncResource:
4 async def __aenter__(self):
5 print("Opening resource...")
6 await asyncio.sleep(0.5)
7 print("Resource opened")
8 return self
9
10 async def __aexit__(self, exc_type, exc, tb):
11 print("Closing resource...")
12 await asyncio.sleep(0.5)
13 print("Resource closed")
14
15async def main():
16 async with AsyncResource() as r:
17 print("Using resource...")
18
19if __name__ == "__main__":
20 asyncio.run(main())- Sa pamamagitan ng pagdedeklara ng
__aenter__at__aexit__, magagamit mo angasync with. - Ang pagpoproseso sa pagpasok at paglabas sa
async withna block ay isinasagawa nang asynchronous at ligtas.
Asynchronous File I/O (aiofiles)
Ang mga file operation ay isang karaniwang halimbawa ng blocking. Gamit ang aiofiles, maaari mong pamahalaan nang ligtas ang mga file operation sa asynchronous na paraan. Sa loob, gumagamit ito ng thread pool at tinitiyak na maayos na nasasara ang mga file gamit ang async with.
Ang sumusunod na halimbawa ay nagpapakita ng sabayang asynchronous na pagbabasa ng maraming file. Kailangan mong i-install ang aiofiles gamit ang pip install aiofiles bago patakbuhin ang code na ito.
1# pip install aiofiles
2import asyncio
3import aiofiles
4from pathlib import Path
5
6async def read_file(path: Path):
7 # Read file content asynchronously
8 async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
9 contents = await f.read()
10 return path.name, len(contents)
11
12async def main():
13 files = [
14 Path("example1.txt"),
15 Path("example2.txt"),
16 Path("example3.txt")
17 ]
18 tasks = [read_file(p) for p in files]
19 results = await asyncio.gather(*tasks)
20 for name, size in results:
21 print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24 asyncio.run(main())- Ang code na ito ay pansabayin ang pagbabasa ng bawat file. Ang
aiofilesay madalas gumamit ng thread pool sa loob, kaya't maaari mong hawakan ang blocking file I/O gamit ang asynchronous na interface.
Asynchronous HTTP Client (aiohttp)
Bilang klasikal na halimbawa ng network I/O, narito kung paano magsagawa ng HTTP requests nang asynchronous. Lalo itong malakas kapag kailangan mong gumawa ng maraming HTTP request nang sabay-sabay.
Narito ang isang halimbawa ng pagkuha ng maraming URL nang sabay-sabay gamit ang aiohttp. Kailangan mong i-install ang aiohttp gamit ang pip install aiohttp.
1# pip install aiohttp
2import asyncio
3import aiohttp
4
5async def fetch(session: aiohttp.ClientSession, url: str):
6 # Fetch a URL asynchronously and return status and size
7 async with session.get(url) as resp:
8 text = await resp.text()
9 return url, resp.status, len(text)
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [fetch(session, u) for u in urls]
14 for coro in asyncio.as_completed(tasks):
15 url, status, size = await coro
16 print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19 urls = [
20 "https://codesparklab.com/json/example1.json",
21 "https://codesparklab.com/json/example2.json",
22 "https://codesparklab.com/json/example3.json",
23 ]
24 asyncio.run(main(urls))- Gamit ang
asyncio.as_completed, maaari mong iproseso ang resulta ayon sa pagkakatapos ng mga task. Ito ay kapaki-pakinabang para sa episyenteng paghawak ng maraming request.
Pag-iral Kasama ang Blocking I/O: run_in_executor
Kapag humaharap sa mga gawain na nangangailangan ng marami sa CPU o lumang blocking APIs sa asynchronous na code, gamitin ang ThreadPoolExecutor o ProcessPoolExecutor sa pamamagitan ng loop.run_in_executor.
Ang sumusunod na code ay isang halimbawa ng pagpapatakbo ng mga gawain na pinapalagay ay blocking I/O nang sabay-sabay gamit ang isang thread pool.
1import asyncio
2from concurrent.futures import ThreadPoolExecutor
3import time
4
5def blocking_io(n):
6 # Simulate a blocking I/O or CPU-bound function
7 time.sleep(n)
8 return f"slept {n}s"
9
10async def main():
11 loop = asyncio.get_running_loop()
12 with ThreadPoolExecutor() as pool:
13 tasks = [
14 loop.run_in_executor(pool, blocking_io, 1),
15 loop.run_in_executor(pool, blocking_io, 2),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Blocking results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Sa paggamit ng
run_in_executor, maaari mong isama ang umiiral na synchronous na code sa asynchronous na daloy nang hindi kailangang baguhin nang malaki ang code. Gayunpaman, dapat mong bigyang pansin ang bilang ng mga thread at CPU load. - Ang
ProcessPoolExecutoray angkop para sa mga gawain na nangangailangan ng marami sa CPU.
Asynchronous Server: TCP Echo Server na nakabase sa asyncio
Kung gusto mong direktang pamahalaan ang sockets, madali kang makakagawa ng asynchronous na server gamit ang asyncio.start_server.
Ang sumusunod na halimbawa ay isang simpleng echo server na nagbabalik ng data na eksaktong natanggap mula sa client.
1import asyncio
2
3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
4 # Handle a single client: read data and echo it back
5 addr = writer.get_extra_info('peername')
6 print(f"Connection from {addr}")
7 while True:
8 data = await reader.read(1024)
9 if not data:
10 break
11 writer.write(data) # echo back
12 await writer.drain()
13 writer.close()
14 await writer.wait_closed()
15 print(f"Connection closed {addr}")
16
17async def main():
18 server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19 addr = server.sockets[0].getsockname()
20 print(f"Serving on {addr}")
21 async with server:
22 await server.serve_forever()
23
24if __name__ == "__main__":
25 asyncio.run(main())-
Sa komunikasyon ng TCP gamit ang
asyncio, mahalagang papel ang ginagampanan ngStreamReaderatStreamWritersa asynchronous na input at output. AngStreamReaderay nagbabasa ng datos mula sa kliyente nang asynchronous, habang angStreamWriternaman ay ginagamit upang magpadala ng tugon mula sa server pabalik sa kliyente. -
Kahit hindi mo manu-manong hawakan ang detalyadong operasyon ng sockets, maaari kang magpatakbo ng isang asynchronous na server nang simple at mahusay gamit ang
asyncio.start_server. -
Kapag nagbigay ka ng handler function sa
asyncio.start_server, tatanggapin ng function na iyon angreaderatwriterbilang mga argumento nito. Sa paggamit ng mga ito, maaari mong maisakatuparan ang proseso ng komunikasyon nang mas ligtas at mas malinaw kaysa sa direktang paghawak ng mga low-level na socket API. Halimbawa, sa pagtanggap ng datos gamit angreader.read()at pagsasama ngwriter.write()sawriter.drain(), maaari kang magsagawa ng asynchronous na pagpapadala na tinitiyak na tapos na ang transmission. -
Ang set up na ito ay angkop para sa paghawak ng maraming sabayang koneksyon at perpekto para sa simpleng mga protocol o maliliit na TCP service.
Paghawak ng Malalaking Streaming Data
Kapag nagpoproseso ng malalaking file o tugon nang sunud-sunod, basahin at isulat ang data nang paisa-isang bahagi upang mapanatiling mababa ang paggamit ng memorya. Narito ang isang halimbawa ng streaming reads gamit ang aiohttp.
Ang sumusunod na code ay nagpoproseso ng HTTP responses nang pira-piraso at nagsusulat sa disk habang natatanggap ang data.
1import aiohttp
2import asyncio
3import aiofiles
4
5async def stream_download(url: str, dest: str):
6 # Stream download and write to file in chunks
7 async with aiohttp.ClientSession() as session:
8 async with session.get(url) as resp:
9 async with aiofiles.open(dest, 'wb') as f:
10 async for chunk in resp.content.iter_chunked(1024 * 64):
11 await f.write(chunk)
12
13if __name__ == "__main__":
14 asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))-
Hindi nito ikinakarga ang malaking file ng sabay-sabay; sa halip, tinatanggap nito ang datos ng paunti-unti (chunks) at isinusulat ito sa file nang asynchronous. Bilang resulta, mabilis at mahusay itong makakapag-download habang mababa ang paggamit ng memory. Kinukuha ng
aiohttpang datos nang asynchronous at isinusulat ngaiofilesang datos sa file nang hindi nagba-block, kaya't madali itong isabay sa ibang proseso. -
Ang pattern na ito ay angkop para sa episyenteng pagda-download at pag-save ng malalaking file habang minimimi ang paggamit ng memorya.
Asynchronous na Pagpapatakbo ng Mga Subprocess
Kung gusto mong magpatakbo ng external na command nang asynchronous at basahin ang kanilang output sa real time, kapaki-pakinabang ang asyncio.create_subprocess_exec.
Narito ang isang halimbawa ng pagpapasimula ng external na command at pagbabasa ng standard output nito sa real time.
1import asyncio
2
3async def run_cmd(cmd):
4 # Run external command asynchronously and capture output line by line
5 proc = await asyncio.create_subprocess_exec(
6 *cmd,
7 stdout=asyncio.subprocess.PIPE,
8 stderr=asyncio.subprocess.PIPE
9 )
10
11 async def read_stream(stream, name):
12 while True:
13 line = await stream.readline()
14 if not line:
15 break
16 print(f"[{name}] {line.decode().rstrip()}")
17
18 await asyncio.gather(
19 read_stream(proc.stdout, "stdout"),
20 read_stream(proc.stderr, "stderr"),
21 )
22 await proc.wait()
23 return proc.returncode
24
25if __name__ == "__main__":
26 asyncio.run(run_cmd(["python", "--version"]))- Sa pamamagitan ng asynchronous na pagkontrol ng subprocesses, maaari mong iproseso ang logs mula sa external tools nang real time o magpatakbo ng maraming proseso nang sabay.
Pamamahala ng Pagka-cancel at Timeout
Maaaring i-cancel ang mga asynchronous na gawain. Kapag magpapatupad ng timeout, madali lang gamitin ang asyncio.wait_for.
Narito ang isang halimbawa ng pagpapatakbo ng gawain na may timeout.
1import asyncio
2
3async def slow_task():
4 await asyncio.sleep(5)
5 return "done"
6
7async def main():
8 try:
9 result = await asyncio.wait_for(slow_task(), timeout=2.0)
10 print("Result:", result)
11 except asyncio.TimeoutError:
12 print("Task timed out")
13
14if __name__ == "__main__":
15 asyncio.run(main())- Ang
wait_foray naglalabas ngTimeoutErrorkapag naabot ang timeout at kino-cancel ang gawain kung kinakailangan. Mag-ingat sa pagpapalaganap ng task cancellation at cleanup.
Pagkontrol ng Concurrency (Semaphore)
Dahil ang paggawa ng maraming sabayang koneksyon o request ay maaaring makaubos ng resources, limitahan ang concurrency gamit ang asyncio.Semaphore.
Ang sumusunod ay isang halimbawa ng paglilimita ng sabayang pag-download gamit ang semaphore.
1import asyncio
2import aiohttp
3
4semaphore = asyncio.Semaphore(3) # allow up to 3 concurrent tasks
5
6async def limited_fetch(session, url):
7 async with semaphore:
8 async with session.get(url) as resp:
9 return url, resp.status
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [limited_fetch(session, u) for u in urls]
14 results = await asyncio.gather(*tasks)
15 print(results)
16
17if __name__ == "__main__":
18 urls = ["https://codesparklab.com/json/example.json"] * 10
19 asyncio.run(main(urls))- Sa pamamaraang ito, maaari mong ma-access ang external services nang maingat at maiwasan ang pag-overload ng sarili mong proseso.
Pamamahala ng Error at Diskarte sa Pag-ulit (Retry)
Hindi maiiwasan ang mga error kahit sa asynchronous na pagpoproseso. Mahuli ng tama ang mga exception at magpatupad ng mga diskarte sa pag-ulit tulad ng exponential backoff.
Narito ang halimbawa ng implementasyon ng simpleng pag-ulit hanggang N na beses.
1import asyncio
2import aiohttp
3import random
4
5async def fetch_with_retry(session, url, retries=3):
6 for attempt in range(1, retries + 1):
7 try:
8 async with session.get(url) as resp:
9 resp.raise_for_status()
10 text = await resp.text()
11 return text
12 except Exception as e:
13 if attempt == retries:
14 raise
15 await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18 async with aiohttp.ClientSession() as session:
19 text = await fetch_with_retry(session, "https://codesparklab.com/")
20 print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23 asyncio.run(main())- Mahalaga ang tamang retry logic para sa tamang balanse ng consistency at traffic control.
Mga Tip para sa Debugging at Logging
Sa asynchronous na pagpoproseso, sabay-sabay ang mga gawain, kaya maaaring mahirap matukoy ang sanhi ng mga problema. Para masubaybayan ang mga isyu nang episyente, ang pag-alala sa mga sumusunod na punto ay magpapadali ng debugging.
- Madaling hindi makita ang exceptions mula sa
asyncio.run()atTask, kaya siguraduhing i-log ang mga hindi nahuhuling exception. - Kapag gumagamit ka ng
logging, ang pagsama ng pangalan ng coroutine o, sa Python 3.8 pataas, angtask.get_name()sa iyong mga log ay nagpapadali ng pagsubaybay. - Maaari mong suriin ang kasalukuyang estado ng mga task gamit ang
asyncio.Task.all_tasks(). Gayunpaman, ang API na ito ay para lamang sa debugging at dapat gamitin nang may pag-iingat sa production environment upang maiwasan ang mga isyu sa performance o hindi inaasahang abala.
Pag-isip-isip sa Performance
Bagamat mahusay ang asynchronous programming sa paghawak ng I/O waits, maaaring bumagal ang performance kung mali ang paggamit. Mag-optimize sa pamamagitan ng pag-alala sa sumusunod na mga punto:.
- Epektibo ang asynchronous processing sa I/O-bound na gawain ngunit hindi ito angkop para sa CPU-bound na gawain; gumamit ng process pool sa ganoong mga kaso.
- Kapag gumagamit ng thread o process pools, isaalang-alang ang dami ng pool at uri ng gawain.
- Kung magsisimula ka ng maraming maliliit na gawain ng sabay-sabay, tataas ang overhead ng event loop—kaya gumamit ng batching o semaphores para i-adjust.
Buod
Ang asynchronous I/O ng Python ay isang makapangyarihang mekanismo na nagagamit nang episyente ang panahon ng paghihintay sa I/O at mahusay sa sabayang pagpapatupad ng network at file operations. Sa pagsasama ng mga teknika tulad ng asyncio, aiohttp, aiofiles, at run_in_executor, maaari kang bumuo ng flexible at praktikal na asynchronous na mga aplikasyon. Sa paggamit ng async with para i-automate ang pagkuha at pag-release ng resources, maaari mong pamahalaan nang ligtas at maaasahan ang mga asynchronous na resource tulad ng files, HTTP sessions, at locks. Sa pagsasama ng tamang error handling at concurrency management, maaari kang magpatakbo ng mataas na pagiging maaasahan ng asynchronous na mga programa nang ligtas.
Maaari mong sundan ang artikulo sa itaas gamit ang Visual Studio Code sa aming YouTube channel. Paki-check din ang aming YouTube channel.