Input/Output Asinkron

Input/Output Asinkron

Artikel ini menjelaskan input/output asinkron.

Panduan ini menjelaskan secara perlahan, langkah demi langkah, konsep dan pola input/output asinkron yang berguna secara praktis di Python.

YouTube Video

Input/Output (I/O) Asinkron

Konsep I/O Asinkron

I/O asinkron adalah mekanisme yang memungkinkan operasi lain berjalan secara paralel saat menunggu I/O yang memakan waktu, seperti operasi file atau komunikasi jaringan. Di Python, asyncio disediakan sebagai kerangka kerja standar untuk asinkron, dan banyak pustaka didesain mengikuti mekanisme ini.

Dasar-dasar: async / await dan Event Loop

Pertama, berikut cara menulis coroutine dasar dan contoh menjalankan beberapa coroutine secara bersamaan menggunakan asyncio.gather.

Kode di bawah ini adalah contoh minimal mendefinisikan dan menjalankan fungsi asinkron secara bersamaan. Fungsi sleep digunakan untuk mendemonstrasikan eksekusi paralel.

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Kode ini memulai event loop menggunakan asyncio.run() dan menjalankan tiga coroutine secara bersamaan.

async with dan Asynchronous Context Manager

Dalam pemrosesan asinkron, manajemen sumber daya seperti membuka koneksi dan menutup file bisa menjadi kompleks. Di sinilah asynchronous context manager dengan async with menjadi sangat berguna. Sintaks ini digunakan seperti statement with yang sinkron, tetapi pemrosesan di dalamnya berjalan asinkron sehingga sangat cocok dengan alur async/await.

Ada dua alasan utama menggunakan async with:.

  • Untuk memastikan sumber daya seperti koneksi, file handle, atau sesi dibersihkan dengan baik. Anda bisa yakin sumber daya akan dilepaskan dengan benar meskipun terjadi penghentian yang tidak normal.
  • Untuk mengotomatisasi inisialisasi dan pembersihan seperti membuat atau menutup koneksi dan flushing, secara asinkron. Ini menghemat coding manual dan membuat kode Anda lebih jelas.

Berikut adalah contoh membuat asynchronous context manager sederhana dari awal.

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • Dengan mendefinisikan __aenter__ dan __aexit__, Anda dapat menggunakan async with.
  • Pemrosesan saat masuk dan keluar blok async with dijalankan secara asinkron dan aman.

I/O File Asinkron (aiofiles)

Operasi file adalah contoh klasik yang bersifat blocking. Dengan menggunakan aiofiles, Anda dapat menangani operasi file secara asinkron dengan aman. Secara internal, ini menggunakan thread pool dan memastikan file ditutup dengan benar menggunakan async with.

Contoh berikut menunjukkan pembacaan file secara asinkron dan paralel dari beberapa file. Anda harus menginstal aiofiles dengan pip install aiofiles sebelum menjalankan kode ini.

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • Kode ini memparalelkan pembacaan setiap file. aiofiles sering menggunakan thread pool secara internal, memungkinkan Anda menangani file I/O blocking melalui antarmuka asinkron.

Klien HTTP Asinkron (aiohttp)

Sebagai contoh klasik I/O jaringan, berikut cara melakukan permintaan HTTP secara asinkron. Ini sangat bermanfaat jika Anda perlu melakukan banyak permintaan HTTP secara paralel.

Berikut adalah contoh pengambilan beberapa URL secara paralel menggunakan aiohttp. Anda perlu menginstal aiohttp dengan pip install aiohttp.

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • Dengan menggunakan asyncio.as_completed, Anda dapat memproses hasil sesuai urutan tugas selesai. Ini berguna untuk menangani banyak permintaan secara efisien.

Koeksistensi dengan I/O yang Memblokir: run_in_executor

Ketika menangani tugas yang memerlukan CPU tinggi atau API yang bersifat blocking dalam kode asinkron, gunakan ThreadPoolExecutor atau ProcessPoolExecutor melalui loop.run_in_executor.

Kode berikut adalah contoh menjalankan tugas yang mengasumsikan I/O blocking secara bersamaan menggunakan thread pool.

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Dengan memanfaatkan run_in_executor, Anda dapat memasukkan kode sinkron yang sudah ada ke dalam alur asinkron tanpa penulisan ulang yang signifikan. Namun, Anda harus memperhatikan jumlah thread dan beban CPU.
  • ProcessPoolExecutor cocok untuk tugas yang terikat pada CPU.

Server Asinkron: Server Echo TCP berbasis asyncio

Jika Anda ingin menangani socket secara langsung, Anda dapat dengan mudah membangun server asinkron menggunakan asyncio.start_server.

Contoh berikut adalah server echo sederhana yang mengembalikan data persis seperti yang diterima dari klien.

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • Dalam komunikasi TCP dengan asyncio, StreamReader dan StreamWriter memainkan peran utama dalam input dan output secara asinkron. StreamReader membaca data yang dikirim dari klien secara asinkron, sementara StreamWriter digunakan untuk mengirim respons dari server kembali ke klien.

  • Bahkan tanpa menangani operasi detail socket sendiri, Anda dapat menjalankan server asinkron dengan mudah dan efisien menggunakan asyncio.start_server.

  • Saat Anda memberikan fungsi handler ke asyncio.start_server, fungsi tersebut akan menerima reader dan writer sebagai argumennya. Dengan menggunakan ini, Anda dapat mengimplementasikan proses komunikasi dengan cara yang lebih aman dan jelas dibandingkan menangani API socket level rendah secara langsung. Misalnya, dengan menerima data menggunakan reader.read() dan menggabungkan writer.write() dengan writer.drain(), Anda dapat mengimplementasikan pengiriman asinkron yang memastikan transmisi selesai.

  • Pengaturan ini cocok untuk menangani banyak koneksi serentak dan ideal untuk protokol sederhana atau layanan TCP skala kecil.

Menangani Data Streaming Besar

Saat memproses file besar atau respons secara berurutan, baca dan tulis data dalam potongan kecil untuk menjaga penggunaan memori tetap rendah. Berikut adalah contoh pembacaan streaming menggunakan aiohttp.

Kode berikut memproses respon HTTP sedikit demi sedikit dan menulis ke disk saat data diterima.

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • Kode ini tidak memuat file besar sekaligus; sebaliknya, ia menerima data dalam beberapa bagian (potongan kecil) dan menulisnya ke file secara asinkron. Akibatnya, proses pengunduhan dapat berlangsung dengan cepat dan efisien sambil menjaga penggunaan memori tetap rendah. aiohttp mengambil data secara asinkron dan aiofiles menulis ke file tanpa memblokir, sehingga mudah dijalankan bersamaan dengan proses lain.

  • Pola ini sangat cocok untuk mengunduh dan menyimpan file besar secara efisien dengan penggunaan memori minimal.

Eksekusi Subproses Asinkron

Jika Anda ingin menjalankan perintah eksternal secara asinkron dan membaca outputnya secara real time, asyncio.create_subprocess_exec sangat bermanfaat.

Di bawah ini adalah contoh menjalankan perintah eksternal dan membaca output standarnya secara real time.

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • Dengan mengontrol subproses secara asinkron, Anda dapat menangani log dari alat eksternal secara real time atau menjalankan beberapa proses secara paralel.

Menangani Pembatalan dan Timeout

Tugas asinkron dapat dibatalkan. Saat menerapkan timeout, Anda dapat menggunakan asyncio.wait_for dengan mudah.

Di bawah ini adalah contoh menjalankan tugas dengan timeout.

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • wait_for akan melempar TimeoutError jika waktu habis dan membatalkan tugas bila perlu. Berhati-hatilah dengan propagasi pembatalan tugas dan pembersihan.

Mengontrol Konkurensi (Semaphore)

Karena banyak koneksi atau permintaan bersamaan dapat menguras sumber daya, batasi konkurensi dengan asyncio.Semaphore.

Berikut adalah contoh membatasi unduhan secara bersamaan dengan semaphore.

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • Dengan cara ini, Anda dapat mengakses layanan eksternal dengan bijak dan menghindari kelebihan beban proses Anda sendiri.

Penanganan Error dan Strategi Retry

Kesalahan pasti terjadi bahkan dalam pemrosesan asinkron. Tangkap exception dengan tepat dan terapkan strategi retry seperti pemunduran eksponensial (exponential backoff).

Di bawah ini adalah contoh implementasi retry sederhana hingga N kali.

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • Logika retry yang tepat penting untuk menyeimbangkan konsistensi dan kontrol trafik.

Tips untuk Debugging dan Logging

Dalam pemrosesan asinkron, tugas berjalan bersamaan, sehingga sulit mengidentifikasi penyebab masalah. Untuk melacak masalah secara efisien, perhatikan poin-poin berikut agar debugging lebih lancar.

  • Exception dari asyncio.run() dan Task mudah terlewat, jadi pastikan untuk mencatat exception yang tidak tertangkap.
  • Saat menggunakan logging, menyertakan nama coroutine atau, pada Python 3.8 ke atas, task.get_name() dalam log Anda membuat pelacakan lebih mudah.
  • Anda dapat memeriksa status saat ini dari tugas-tugas menggunakan asyncio.Task.all_tasks(). Namun, API ini ditujukan untuk tujuan debugging dan harus digunakan dengan hati-hati di lingkungan produksi untuk menghindari masalah performa atau gangguan yang tidak terduga.

Pertimbangan Kinerja

Meskipun pemrograman asinkron unggul dalam menangani waktu tunggu I/O, penggunaan yang tidak tepat justru dapat menurunkan kinerja. Optimalkan dengan memperhatikan poin-poin berikut:.

  • Pemrosesan asinkron sangat cocok untuk tugas I/O-bound tetapi kurang cocok untuk tugas CPU-bound; gunakan process pool dalam kasus tersebut.
  • Saat menggunakan thread atau process pool, pertimbangkan ukuran pool dan sifat tugasnya.
  • Jika Anda memulai banyak tugas kecil sekaligus, overhead event loop akan meningkat—gunakan batching atau semaphore untuk menyesuaikan.

Ringkasan

I/O asinkron di Python adalah mekanisme yang kuat yang memanfaatkan waktu tunggu I/O secara efektif dan mengeksekusi operasi jaringan serta file secara efisien dan bersamaan. Dengan menggabungkan teknik seperti asyncio, aiohttp, aiofiles, dan run_in_executor, Anda dapat membangun aplikasi asinkron yang praktis dan fleksibel. Dengan memanfaatkan async with untuk mengotomatisasi perolehan dan pelepasan sumber daya, Anda dapat mengelola sumber daya asinkron seperti file, sesi HTTP, dan lock secara aman dan andal. Dengan memasukkan penanganan error dan manajemen konkurensi yang tepat, Anda dapat menjalankan program asinkron yang andal dengan aman.

Anda dapat mengikuti artikel di atas menggunakan Visual Studio Code di saluran YouTube kami. Silakan periksa juga saluran YouTube kami.

YouTube Video