Input/Output Asynchronous

Input/Output Asynchronous

Artikel ini menerangkan input/output asynchronous.

Panduan ini menerangkan secara berperingkat konsep dan corak input/output asynchronous yang berguna secara praktikal dalam Python.

YouTube Video

Input/Output (I/O) Asynchronous

Konsep I/O Asynchronous

I/O asynchronous ialah satu mekanisme yang membenarkan operasi lain dijalankan serentak semasa menunggu operasi I/O yang mengambil masa, seperti operasi fail atau komunikasi rangkaian. Dalam Python, asyncio disediakan sebagai rangka kerja asynchronous standard, dan banyak perpustakaan direka untuk mengikuti mekanisme ini.

Asas: async / await dan Gelung Acara (Event Loop)

Pertama, berikut adalah cara menulis coroutine asas dan contoh menjalankan beberapa coroutines serentak menggunakan asyncio.gather.

Kod di bawah adalah contoh minimum untuk mentakrif dan menjalankan fungsi asynchronous secara serentak. Fungsi sleep digunakan untuk menunjukkan pelaksanaan selari.

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Kod ini memulakan gelung acara dengan asyncio.run() dan menjalankan tiga coroutine secara serentak.

async with dan Pengurus Konteks Asynchronous

Dalam pemprosesan asynchronous, pengurusan sumber seperti membuka sambungan dan menutup fail boleh menjadi rumit. Di sinilah pengurus konteks asynchronous menggunakan async with sangat berguna. Sintaks ini digunakan seperti pernyataan with selari (synchronous), tetapi pemprosesan dalaman bersifat asynchronous, jadi ia sesuai secara semula jadi dalam aliran async/await.

Terdapat dua sebab utama untuk menggunakan async with:.

  • Untuk membersihkan sumber seperti sambungan, pemegang fail, atau sesi dengan terjamin. Anda boleh yakin bahawa sumber akan dilepaskan dengan betul walaupun berlaku penamatan luar biasa.
  • Untuk mengautomasikan tugas permulaan (init) dan pembersihan seperti mewujudkan atau menutup sambungan dan pengosongan, secara asynchronous. Ini menjimatkan usaha pengekodan manual dan menjadikan kod anda lebih jelas.

Di bawah adalah contoh membuat pengurus konteks asynchronous yang ringkas dari awal.

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • Dengan mentakrifkan __aenter__ dan __aexit__, anda boleh menggunakan async with.
  • Pemprosesan ketika memasuki dan meninggalkan blok async with dilaksanakan secara asynchronous dan selamat.

Fail I/O Asynchronous (aiofiles)

Operasi fail adalah contoh klasik bagi sekatan (blocking). Dengan menggunakan aiofiles, anda boleh mengendalikan operasi fail secara asynchronous dengan selamat. Secara dalaman, ia menggunakan kumpulan benang (thread pool) dan memastikan fail ditutup dengan betul menggunakan async with.

Contoh berikut menunjukkan pembacaan fail secara asynchronous serentak. Anda perlu memasang aiofiles dengan pip install aiofiles sebelum menjalankan kod ini.

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • Kod ini membolehkan pembacaan setiap fail dijalankan serentak. aiofiles biasanya menggunakan kumpulan benang secara dalaman, membolehkan anda mengendalikan I/O fail yang sekatan melalui antara muka asynchronous.

Klien HTTP Asynchronous (aiohttp)

Sebagai contoh klasik I/O rangkaian, berikut adalah cara untuk melakukan permintaan HTTP secara asynchronous. Ia amat berkuasa apabila anda perlu membuat banyak permintaan HTTP secara serentak.

Di bawah ialah contoh mendapatkan pelbagai URL secara serentak menggunakan aiohttp. Anda perlu memasang aiohttp dengan pip install aiohttp.

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • Dengan menggunakan asyncio.as_completed, anda boleh memproses keputusan mengikut turutan tugasan siap. Ini berguna untuk mengendalikan banyak permintaan dengan cekap.

Kewujudan Bersama dengan I/O Penyumbat: run_in_executor

Apabila berurusan dengan tugasan yang memerlukan CPU tinggi atau API sedia ada yang melakukan blocking dalam kod asynchronous, gunakan ThreadPoolExecutor atau ProcessPoolExecutor melalui loop.run_in_executor.

Kod berikut adalah contoh menjalankan tugasan yang melibatkan blocking I/O secara serentak menggunakan thread pool.

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Dengan menggunakan run_in_executor, anda boleh memasukkan kod synchronous sedia ada ke dalam aliran asynchronous tanpa perlu menulis semula secara besar-besaran. Namun, anda perlu memberi perhatian kepada bilangan benang dan beban CPU.
  • ProcessPoolExecutor sesuai untuk tugasan yang bergantung kepada CPU.

Pelayan Asynchronous: Pelayan Echo TCP berasaskan asyncio

Jika anda ingin mengendalikan soket secara terus, anda boleh membina pelayan asynchronous dengan mudah menggunakan asyncio.start_server.

Contoh berikut ialah pelayan echo ringkas yang memulangkan data sebagaimana diterima dari klien.

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • Dalam komunikasi TCP menggunakan asyncio, StreamReader dan StreamWriter memainkan peranan utama dalam input dan output secara tak segerak. StreamReader membaca data yang dihantar daripada klien secara tak segerak, manakala StreamWriter digunakan untuk menghantar respons daripada pelayan kembali kepada klien.

  • Walaupun tanpa mengendalikan operasi socket secara terperinci, anda boleh memulakan pelayan tak segerak dengan mudah dan cekap menggunakan asyncio.start_server.

  • Apabila anda memberikan fungsi pengendali kepada asyncio.start_server, fungsi tersebut menerima reader dan writer sebagai argumennya. Dengan menggunakan ini, anda boleh melaksanakan proses komunikasi dengan cara yang lebih selamat dan jelas berbanding mengendalikan API socket aras rendah secara langsung. Sebagai contoh, dengan menerima data menggunakan reader.read() dan menggabungkan writer.write() bersama writer.drain(), anda boleh melaksanakan penghantaran secara tak segerak yang memastikan penghantaran selesai.

  • Persediaan ini sesuai untuk mengendalikan banyak sambungan serentak dan sangat ideal untuk protokol ringkas atau perkhidmatan TCP skala kecil.

Pengendalian Data Penstriman Besar

Apabila memproses fail besar atau respons secara berurutan, baca dan tulis data secara kepingan (chunks) untuk mengekalkan penggunaan memori rendah. Di bawah adalah contoh pembacaan penstriman menggunakan aiohttp.

Kod berikut memproses respons HTTP secara kepingan dan menulis ke cakera apabila data diterima.

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • Kod ini tidak memuatkan fail besar sekaligus; sebaliknya, ia menerima data secara berkeping-keping (kepingan kecil) dan menulis ke dalam fail secara tak segerak. Hasilnya, ia boleh melaksanakan muat turun dengan cepat dan cekap sambil mengekalkan penggunaan memori yang rendah. aiohttp mendapatkan data secara tak segerak dan aiofiles menulis ke dalam fail tanpa menyekat, memudahkan ia dijalankan serentak dengan proses lain.

  • Corak ini sesuai untuk memuat turun dan menyimpan fail besar dengan cekap sambil meminimumkan penggunaan memori.

Pelaksanaan Proses Turutan (Subprocess) secara Asynchronous

Jika anda ingin menjalankan arahan luaran secara asynchronous dan membaca output mereka dalam masa nyata, asyncio.create_subprocess_exec amat berguna.

Di bawah adalah contoh memulakan arahan luaran dan membaca output standard dalam masa nyata.

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • Dengan mengawal proses turutan secara asynchronous, anda boleh mengendalikan log dari alat luaran dalam masa nyata atau menjalankan pelbagai proses secara serentak.

Pengendalian Pembatalan dan Tempoh Masa Tamat (Timeout)

Tugasan asynchronous boleh dibatalkan. Apabila melaksanakan tempoh masa tamat, mudah untuk menggunakan asyncio.wait_for.

Di bawah adalah contoh menjalankan tugasan dengan tempoh masa tamat.

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • wait_for akan melemparkan TimeoutError jika masa tamat dicapai dan membatalkan tugasan jika perlu. Berhati-hati dengan penyebaran pembatalan tugasan dan pembersihan.

Mengawal Kebersamaan (Concurrency) (Semaphore)

Kerana membuat banyak sambungan atau permintaan serentak boleh menghabiskan sumber, hadkan kebersamaan dengan asyncio.Semaphore.

Berikut adalah contoh menghadkan muat turun serentak menggunakan semaphore.

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • Dengan kaedah ini, anda boleh mengakses perkhidmatan luaran secara berhemah dan mengelakkan bebanan berlebihan pada proses anda sendiri.

Pengendalian Ralat dan Strategi Percubaan Semula (Retry)

Ralat pasti berlaku walaupun dalam pemprosesan asynchronous. Tangkap pengecualian secara sesuai dan laksanakan strategi percubaan semula seperti pengunduran eksponen (exponential backoff).

Di bawah adalah contoh pelaksanaan percubaan semula ringkas sehingga N kali.

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • Logik percubaan semula yang betul adalah penting untuk mengimbangi konsistensi dan kawalan trafik.

Petua untuk Penyahpepijatan (Debugging) dan Logging

Dalam pemprosesan asynchronous, tugasan dijalankan serentak, yang boleh menyukarkan untuk mengenal pasti punca masalah. Untuk menjejaki isu dengan cekap, ingat perkara berikut untuk memudahkan penyahpepijatan.

  • Pengecualian dari asyncio.run() dan Task mudah terlepas pandang, jadi pastikan untuk mencatat pengecualian yang tidak dikendalikan.
  • Apabila menggunakan logging, memasukkan nama coroutine atau, dalam Python 3.8 ke atas, task.get_name() dalam log anda akan memudahkan penjejakan.
  • Anda boleh menyemak status semasa tugasan menggunakan asyncio.Task.all_tasks(). Namun, API ini adalah untuk tujuan penyahpepijatan dan harus digunakan dengan berhati-hati dalam persekitaran produksi untuk mengelakkan isu prestasi atau gangguan yang tidak dijangka.

Pertimbangan Prestasi

Walaupun pengaturcaraan asynchronous cemerlang dalam mengendalikan masa menunggu I/O, penggunaan yang tidak betul boleh merosakkan prestasi. Optimumkan dengan mengambil kira perkara berikut:.

  • Pemprosesan asynchronous sangat baik untuk tugasan terikat I/O tetapi tidak sesuai untuk tugasan terikat CPU; gunakan kumpulan proses (process pool) dalam kes tersebut.
  • Apabila menggunakan kumpulan benang atau kumpulan proses, pertimbangkan saiz kumpulan dan sifat tugasan.
  • Jika anda memulakan banyak tugasan kecil sekali gus, beban gelung acara meningkat—jadi gunakan pengegasan (batching) atau semaphore untuk pelarasan.

Ringkasan

Asynchronous I/O Python ialah satu mekanisme berkuasa yang memanfaatkan masa menunggu I/O dan melaksanakan operasi rangkaian serta fail secara cekap dan serentak. Dengan menggabungkan teknik seperti asyncio, aiohttp, aiofiles, dan run_in_executor, anda boleh membina aplikasi asynchronous yang praktikal dan fleksibel. Penggunaan async with untuk mengautomasi pemerolehan dan pelepasan sumber membolehkan anda mengurus sumber asynchronous seperti fail, sesi HTTP dan kunci dengan selamat dan boleh dipercayai. Dengan menggabungkan pengendalian ralat dan pengurusan kebersamaan yang betul, anda boleh menjalankan program asynchronous berkebolehpercayaan tinggi dengan selamat.

Anda boleh mengikuti artikel di atas menggunakan Visual Studio Code di saluran YouTube kami. Sila lihat juga saluran YouTube kami.

YouTube Video