Input/Output Asinkron
Artikel ini menjelaskan input/output asinkron.
Panduan ini menjelaskan secara perlahan, langkah demi langkah, konsep dan pola input/output asinkron yang berguna secara praktis di Python.
YouTube Video
Input/Output (I/O) Asinkron
Konsep I/O Asinkron
I/O asinkron adalah mekanisme yang memungkinkan operasi lain berjalan secara paralel saat menunggu I/O yang memakan waktu, seperti operasi file atau komunikasi jaringan. Di Python, asyncio disediakan sebagai kerangka kerja standar untuk asinkron, dan banyak pustaka didesain mengikuti mekanisme ini.
Dasar-dasar: async / await dan Event Loop
Pertama, berikut cara menulis coroutine dasar dan contoh menjalankan beberapa coroutine secara bersamaan menggunakan asyncio.gather.
Kode di bawah ini adalah contoh minimal mendefinisikan dan menjalankan fungsi asinkron secara bersamaan. Fungsi sleep digunakan untuk mendemonstrasikan eksekusi paralel.
1import asyncio
2
3async def worker(name: str, delay: float):
4 # Simulate I/O-bound work by sleeping
5 print(f"{name} started")
6 await asyncio.sleep(delay)
7 print(f"{name} finished after {delay}s")
8 return name, delay
9
10async def main():
11 # Run multiple coroutines concurrently
12 tasks = [
13 worker("A", 1.5),
14 worker("B", 1.0),
15 worker("C", 0.5),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Kode ini memulai event loop menggunakan
asyncio.run()dan menjalankan tiga coroutine secara bersamaan.
async with dan Asynchronous Context Manager
Dalam pemrosesan asinkron, manajemen sumber daya seperti membuka koneksi dan menutup file bisa menjadi kompleks. Di sinilah asynchronous context manager dengan async with menjadi sangat berguna. Sintaks ini digunakan seperti statement with yang sinkron, tetapi pemrosesan di dalamnya berjalan asinkron sehingga sangat cocok dengan alur async/await.
Ada dua alasan utama menggunakan async with:.
- Untuk memastikan sumber daya seperti koneksi, file handle, atau sesi dibersihkan dengan baik. Anda bisa yakin sumber daya akan dilepaskan dengan benar meskipun terjadi penghentian yang tidak normal.
- Untuk mengotomatisasi inisialisasi dan pembersihan seperti membuat atau menutup koneksi dan flushing, secara asinkron. Ini menghemat coding manual dan membuat kode Anda lebih jelas.
Berikut adalah contoh membuat asynchronous context manager sederhana dari awal.
1import asyncio
2
3class AsyncResource:
4 async def __aenter__(self):
5 print("Opening resource...")
6 await asyncio.sleep(0.5)
7 print("Resource opened")
8 return self
9
10 async def __aexit__(self, exc_type, exc, tb):
11 print("Closing resource...")
12 await asyncio.sleep(0.5)
13 print("Resource closed")
14
15async def main():
16 async with AsyncResource() as r:
17 print("Using resource...")
18
19if __name__ == "__main__":
20 asyncio.run(main())- Dengan mendefinisikan
__aenter__dan__aexit__, Anda dapat menggunakanasync with. - Pemrosesan saat masuk dan keluar blok
async withdijalankan secara asinkron dan aman.
I/O File Asinkron (aiofiles)
Operasi file adalah contoh klasik yang bersifat blocking. Dengan menggunakan aiofiles, Anda dapat menangani operasi file secara asinkron dengan aman. Secara internal, ini menggunakan thread pool dan memastikan file ditutup dengan benar menggunakan async with.
Contoh berikut menunjukkan pembacaan file secara asinkron dan paralel dari beberapa file. Anda harus menginstal aiofiles dengan pip install aiofiles sebelum menjalankan kode ini.
1# pip install aiofiles
2import asyncio
3import aiofiles
4from pathlib import Path
5
6async def read_file(path: Path):
7 # Read file content asynchronously
8 async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
9 contents = await f.read()
10 return path.name, len(contents)
11
12async def main():
13 files = [
14 Path("example1.txt"),
15 Path("example2.txt"),
16 Path("example3.txt")
17 ]
18 tasks = [read_file(p) for p in files]
19 results = await asyncio.gather(*tasks)
20 for name, size in results:
21 print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24 asyncio.run(main())- Kode ini memparalelkan pembacaan setiap file.
aiofilessering menggunakan thread pool secara internal, memungkinkan Anda menangani file I/O blocking melalui antarmuka asinkron.
Klien HTTP Asinkron (aiohttp)
Sebagai contoh klasik I/O jaringan, berikut cara melakukan permintaan HTTP secara asinkron. Ini sangat bermanfaat jika Anda perlu melakukan banyak permintaan HTTP secara paralel.
Berikut adalah contoh pengambilan beberapa URL secara paralel menggunakan aiohttp. Anda perlu menginstal aiohttp dengan pip install aiohttp.
1# pip install aiohttp
2import asyncio
3import aiohttp
4
5async def fetch(session: aiohttp.ClientSession, url: str):
6 # Fetch a URL asynchronously and return status and size
7 async with session.get(url) as resp:
8 text = await resp.text()
9 return url, resp.status, len(text)
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [fetch(session, u) for u in urls]
14 for coro in asyncio.as_completed(tasks):
15 url, status, size = await coro
16 print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19 urls = [
20 "https://codesparklab.com/json/example1.json",
21 "https://codesparklab.com/json/example2.json",
22 "https://codesparklab.com/json/example3.json",
23 ]
24 asyncio.run(main(urls))- Dengan menggunakan
asyncio.as_completed, Anda dapat memproses hasil sesuai urutan tugas selesai. Ini berguna untuk menangani banyak permintaan secara efisien.
Koeksistensi dengan I/O yang Memblokir: run_in_executor
Ketika menangani tugas yang memerlukan CPU tinggi atau API yang bersifat blocking dalam kode asinkron, gunakan ThreadPoolExecutor atau ProcessPoolExecutor melalui loop.run_in_executor.
Kode berikut adalah contoh menjalankan tugas yang mengasumsikan I/O blocking secara bersamaan menggunakan thread pool.
1import asyncio
2from concurrent.futures import ThreadPoolExecutor
3import time
4
5def blocking_io(n):
6 # Simulate a blocking I/O or CPU-bound function
7 time.sleep(n)
8 return f"slept {n}s"
9
10async def main():
11 loop = asyncio.get_running_loop()
12 with ThreadPoolExecutor() as pool:
13 tasks = [
14 loop.run_in_executor(pool, blocking_io, 1),
15 loop.run_in_executor(pool, blocking_io, 2),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Blocking results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Dengan memanfaatkan
run_in_executor, Anda dapat memasukkan kode sinkron yang sudah ada ke dalam alur asinkron tanpa penulisan ulang yang signifikan. Namun, Anda harus memperhatikan jumlah thread dan beban CPU. ProcessPoolExecutorcocok untuk tugas yang terikat pada CPU.
Server Asinkron: Server Echo TCP berbasis asyncio
Jika Anda ingin menangani socket secara langsung, Anda dapat dengan mudah membangun server asinkron menggunakan asyncio.start_server.
Contoh berikut adalah server echo sederhana yang mengembalikan data persis seperti yang diterima dari klien.
1import asyncio
2
3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
4 # Handle a single client: read data and echo it back
5 addr = writer.get_extra_info('peername')
6 print(f"Connection from {addr}")
7 while True:
8 data = await reader.read(1024)
9 if not data:
10 break
11 writer.write(data) # echo back
12 await writer.drain()
13 writer.close()
14 await writer.wait_closed()
15 print(f"Connection closed {addr}")
16
17async def main():
18 server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19 addr = server.sockets[0].getsockname()
20 print(f"Serving on {addr}")
21 async with server:
22 await server.serve_forever()
23
24if __name__ == "__main__":
25 asyncio.run(main())-
Dalam komunikasi TCP dengan
asyncio,StreamReaderdanStreamWritermemainkan peran utama dalam input dan output secara asinkron.StreamReadermembaca data yang dikirim dari klien secara asinkron, sementaraStreamWriterdigunakan untuk mengirim respons dari server kembali ke klien. -
Bahkan tanpa menangani operasi detail socket sendiri, Anda dapat menjalankan server asinkron dengan mudah dan efisien menggunakan
asyncio.start_server. -
Saat Anda memberikan fungsi handler ke
asyncio.start_server, fungsi tersebut akan menerimareaderdanwritersebagai argumennya. Dengan menggunakan ini, Anda dapat mengimplementasikan proses komunikasi dengan cara yang lebih aman dan jelas dibandingkan menangani API socket level rendah secara langsung. Misalnya, dengan menerima data menggunakanreader.read()dan menggabungkanwriter.write()denganwriter.drain(), Anda dapat mengimplementasikan pengiriman asinkron yang memastikan transmisi selesai. -
Pengaturan ini cocok untuk menangani banyak koneksi serentak dan ideal untuk protokol sederhana atau layanan TCP skala kecil.
Menangani Data Streaming Besar
Saat memproses file besar atau respons secara berurutan, baca dan tulis data dalam potongan kecil untuk menjaga penggunaan memori tetap rendah. Berikut adalah contoh pembacaan streaming menggunakan aiohttp.
Kode berikut memproses respon HTTP sedikit demi sedikit dan menulis ke disk saat data diterima.
1import aiohttp
2import asyncio
3import aiofiles
4
5async def stream_download(url: str, dest: str):
6 # Stream download and write to file in chunks
7 async with aiohttp.ClientSession() as session:
8 async with session.get(url) as resp:
9 async with aiofiles.open(dest, 'wb') as f:
10 async for chunk in resp.content.iter_chunked(1024 * 64):
11 await f.write(chunk)
12
13if __name__ == "__main__":
14 asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))-
Kode ini tidak memuat file besar sekaligus; sebaliknya, ia menerima data dalam beberapa bagian (potongan kecil) dan menulisnya ke file secara asinkron. Akibatnya, proses pengunduhan dapat berlangsung dengan cepat dan efisien sambil menjaga penggunaan memori tetap rendah.
aiohttpmengambil data secara asinkron danaiofilesmenulis ke file tanpa memblokir, sehingga mudah dijalankan bersamaan dengan proses lain. -
Pola ini sangat cocok untuk mengunduh dan menyimpan file besar secara efisien dengan penggunaan memori minimal.
Eksekusi Subproses Asinkron
Jika Anda ingin menjalankan perintah eksternal secara asinkron dan membaca outputnya secara real time, asyncio.create_subprocess_exec sangat bermanfaat.
Di bawah ini adalah contoh menjalankan perintah eksternal dan membaca output standarnya secara real time.
1import asyncio
2
3async def run_cmd(cmd):
4 # Run external command asynchronously and capture output line by line
5 proc = await asyncio.create_subprocess_exec(
6 *cmd,
7 stdout=asyncio.subprocess.PIPE,
8 stderr=asyncio.subprocess.PIPE
9 )
10
11 async def read_stream(stream, name):
12 while True:
13 line = await stream.readline()
14 if not line:
15 break
16 print(f"[{name}] {line.decode().rstrip()}")
17
18 await asyncio.gather(
19 read_stream(proc.stdout, "stdout"),
20 read_stream(proc.stderr, "stderr"),
21 )
22 await proc.wait()
23 return proc.returncode
24
25if __name__ == "__main__":
26 asyncio.run(run_cmd(["python", "--version"]))- Dengan mengontrol subproses secara asinkron, Anda dapat menangani log dari alat eksternal secara real time atau menjalankan beberapa proses secara paralel.
Menangani Pembatalan dan Timeout
Tugas asinkron dapat dibatalkan. Saat menerapkan timeout, Anda dapat menggunakan asyncio.wait_for dengan mudah.
Di bawah ini adalah contoh menjalankan tugas dengan timeout.
1import asyncio
2
3async def slow_task():
4 await asyncio.sleep(5)
5 return "done"
6
7async def main():
8 try:
9 result = await asyncio.wait_for(slow_task(), timeout=2.0)
10 print("Result:", result)
11 except asyncio.TimeoutError:
12 print("Task timed out")
13
14if __name__ == "__main__":
15 asyncio.run(main())wait_forakan melemparTimeoutErrorjika waktu habis dan membatalkan tugas bila perlu. Berhati-hatilah dengan propagasi pembatalan tugas dan pembersihan.
Mengontrol Konkurensi (Semaphore)
Karena banyak koneksi atau permintaan bersamaan dapat menguras sumber daya, batasi konkurensi dengan asyncio.Semaphore.
Berikut adalah contoh membatasi unduhan secara bersamaan dengan semaphore.
1import asyncio
2import aiohttp
3
4semaphore = asyncio.Semaphore(3) # allow up to 3 concurrent tasks
5
6async def limited_fetch(session, url):
7 async with semaphore:
8 async with session.get(url) as resp:
9 return url, resp.status
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [limited_fetch(session, u) for u in urls]
14 results = await asyncio.gather(*tasks)
15 print(results)
16
17if __name__ == "__main__":
18 urls = ["https://codesparklab.com/json/example.json"] * 10
19 asyncio.run(main(urls))- Dengan cara ini, Anda dapat mengakses layanan eksternal dengan bijak dan menghindari kelebihan beban proses Anda sendiri.
Penanganan Error dan Strategi Retry
Kesalahan pasti terjadi bahkan dalam pemrosesan asinkron. Tangkap exception dengan tepat dan terapkan strategi retry seperti pemunduran eksponensial (exponential backoff).
Di bawah ini adalah contoh implementasi retry sederhana hingga N kali.
1import asyncio
2import aiohttp
3import random
4
5async def fetch_with_retry(session, url, retries=3):
6 for attempt in range(1, retries + 1):
7 try:
8 async with session.get(url) as resp:
9 resp.raise_for_status()
10 text = await resp.text()
11 return text
12 except Exception as e:
13 if attempt == retries:
14 raise
15 await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18 async with aiohttp.ClientSession() as session:
19 text = await fetch_with_retry(session, "https://codesparklab.com/")
20 print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23 asyncio.run(main())- Logika retry yang tepat penting untuk menyeimbangkan konsistensi dan kontrol trafik.
Tips untuk Debugging dan Logging
Dalam pemrosesan asinkron, tugas berjalan bersamaan, sehingga sulit mengidentifikasi penyebab masalah. Untuk melacak masalah secara efisien, perhatikan poin-poin berikut agar debugging lebih lancar.
- Exception dari
asyncio.run()danTaskmudah terlewat, jadi pastikan untuk mencatat exception yang tidak tertangkap. - Saat menggunakan
logging, menyertakan nama coroutine atau, pada Python 3.8 ke atas,task.get_name()dalam log Anda membuat pelacakan lebih mudah. - Anda dapat memeriksa status saat ini dari tugas-tugas menggunakan
asyncio.Task.all_tasks(). Namun, API ini ditujukan untuk tujuan debugging dan harus digunakan dengan hati-hati di lingkungan produksi untuk menghindari masalah performa atau gangguan yang tidak terduga.
Pertimbangan Kinerja
Meskipun pemrograman asinkron unggul dalam menangani waktu tunggu I/O, penggunaan yang tidak tepat justru dapat menurunkan kinerja. Optimalkan dengan memperhatikan poin-poin berikut:.
- Pemrosesan asinkron sangat cocok untuk tugas I/O-bound tetapi kurang cocok untuk tugas CPU-bound; gunakan process pool dalam kasus tersebut.
- Saat menggunakan thread atau process pool, pertimbangkan ukuran pool dan sifat tugasnya.
- Jika Anda memulai banyak tugas kecil sekaligus, overhead event loop akan meningkat—gunakan batching atau semaphore untuk menyesuaikan.
Ringkasan
I/O asinkron di Python adalah mekanisme yang kuat yang memanfaatkan waktu tunggu I/O secara efektif dan mengeksekusi operasi jaringan serta file secara efisien dan bersamaan. Dengan menggabungkan teknik seperti asyncio, aiohttp, aiofiles, dan run_in_executor, Anda dapat membangun aplikasi asinkron yang praktis dan fleksibel. Dengan memanfaatkan async with untuk mengotomatisasi perolehan dan pelepasan sumber daya, Anda dapat mengelola sumber daya asinkron seperti file, sesi HTTP, dan lock secara aman dan andal. Dengan memasukkan penanganan error dan manajemen konkurensi yang tepat, Anda dapat menjalankan program asinkron yang andal dengan aman.
Anda dapat mengikuti artikel di atas menggunakan Visual Studio Code di saluran YouTube kami. Silakan periksa juga saluran YouTube kami.