Kontrol Sinkronisasi dalam Pemrosesan Asinkron Python

Kontrol Sinkronisasi dalam Pemrosesan Asinkron Python

Artikel ini menjelaskan kontrol sinkronisasi dalam pemrosesan asinkron di Python.

Anda akan belajar langkah demi langkah, mulai dari dasar-dasar asyncio hingga pola-pola praktis yang umum digunakan untuk kontrol sinkronisasi.

YouTube Video

Kontrol Sinkronisasi dalam Pemrosesan Asinkron Python

Dalam pemrosesan asinkron, Anda dapat dengan mudah menjalankan beberapa tugas secara bersamaan. Namun, dalam praktiknya, penyesuaian yang lebih canggih diperlukan, seperti mengontrol konkurensi, mengoordinasikan tugas, kontrol eksklusif terhadap sumber daya bersama, menangani proses sinkron yang berat, dan pembersihan setelah pembatalan.

Di sini, kita akan belajar langkah demi langkah mulai dari dasar asyncio hingga pola praktis yang umum digunakan untuk sinkronisasi.

Pendahuluan: Dasar-dasar (async / await dan create_task)

Mari kita lihat terlebih dahulu beberapa kode asinkron minimal. await menunggu di titik itu sampai coroutine yang dipanggil selesai, dan asyncio.create_task menjadwalkan tugas untuk dijalankan secara bersamaan.

 1import asyncio
 2
 3async def worker(name, delay):
 4    # Simulate async work
 5    await asyncio.sleep(delay)
 6    return f"{name} done after {delay}s"
 7
 8async def main():
 9    # Create two tasks that run concurrently.
10    t1 = asyncio.create_task(worker("A", 1))
11    t2 = asyncio.create_task(worker("B", 2))
12
13    # Await both results (this suspends until both are done).
14    result1 = await t1
15    result2 = await t2
16    print(result1, result2)
17
18if __name__ == "__main__":
19    asyncio.run(main())
  • Kode ini adalah pola khas di mana tugas secara eksplisit dibuat, dijalankan secara paralel, dan hasilnya diterima di akhir dengan await. create_task memungkinkan eksekusi secara bersamaan.

Perbedaan antara asyncio.gather, asyncio.wait, dan asyncio.as_completed

Saat menjalankan beberapa coroutine secara bersamaan, Anda memilih mana yang akan digunakan tergantung pada bagaimana Anda ingin mengambil hasilnya. gather menunggu semua selesai dan mengembalikan hasil sesuai urutan input, sedangkan as_completed memungkinkan Anda memproses hasil segera setelah selesai, tanpa memperhatikan urutan.

 1import asyncio
 2import random
 3
 4async def job(i):
 5    delay = random.random() * 2
 6    await asyncio.sleep(delay)
 7    return (i, delay)
 8
 9async def gather_example():
10    # gather waits for all tasks and returns results in the same order as input
11    results = await asyncio.gather(*(job(i) for i in range(5)))
12    print("gather order:", results)
13
14async def as_completed_example():
15    # as_completed yields results as they finish (useful to process early results)
16    tasks = [asyncio.create_task(job(i)) for i in range(5)]
17    for coro in asyncio.as_completed(tasks):
18        res = await coro
19        print("completed:", res)
20
21async def main():
22    await gather_example()
23    await as_completed_example()
24
25if __name__ == "__main__":
26    asyncio.run(main())
  • Seperti ditunjukkan dalam kode ini, gather mengembalikan hasil sesuai urutan input, sehingga berguna ketika Anda ingin menjaga urutan. as_completed digunakan ketika Anda ingin memproses hasil segera setelah selesai.

Mengontrol konkurensi: Membatasi eksekusi bersamaan dengan asyncio.Semaphore

Jika terdapat batasan jumlah permintaan API eksternal atau koneksi DB, Anda dapat mengontrol eksekusi bersamaan menggunakan Semaphore.

 1import asyncio
 2import random
 3
 4sem = asyncio.Semaphore(3)  # allow up to 3 concurrent workers
 5
 6async def limited_worker(i):
 7    async with sem:
 8        # Only 3 tasks can be inside this block concurrently
 9        delay = random.random() * 2
10        await asyncio.sleep(delay)
11        print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14    tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15    await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18    asyncio.run(main())
  • Dengan menggunakan Semaphore bersama async with, Anda dapat dengan mudah membatasi jumlah eksekusi bersamaan. Ini efektif dalam situasi dengan batasan eksternal.

Kontrol eksklusif sumber daya bersama: asyncio.Lock

Lock digunakan untuk mencegah pembaruan data bersama secara bersamaan. asyncio.Lock adalah primitif eksklusif untuk penggunaan asinkron.

 1import asyncio
 2
 3counter = 0
 4lock = asyncio.Lock()
 5
 6async def incrementer(n_times):
 7    global counter
 8    for _ in range(n_times):
 9        # Acquire lock to update shared counter safely
10        async with lock:
11            tmp = counter
12            await asyncio.sleep(0)  # yield control to increase race likelihood
13            counter = tmp + 1
14
15async def main():
16    tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17    await asyncio.gather(*tasks)
18    print("final counter:", counter)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Jika beberapa tugas memperbarui variabel bersama seperti counter global, konflik dapat terjadi. Dengan membungkus operasi menggunakan Lock, Anda dapat menjaga konsistensi.

Koordinasi tugas: asyncio.Event

Event digunakan ketika satu tugas memberi sinyal bahwa itu sudah siap, dan tugas lain menunggu sinyal ini. Ini adalah cara sederhana bagi tugas-tugas untuk berbagi sinyal dan sinkronisasi satu sama lain.

 1import asyncio
 2
 3event = asyncio.Event()
 4
 5async def waiter(name):
 6    print(f"{name} is waiting for event")
 7    await event.wait()
 8    print(f"{name} resumed after event set")
 9
10async def setter():
11    print("setter will sleep and then set the event")
12    await asyncio.sleep(1)
13    event.set()
14    print("event set by setter")
15
16async def main():
17    tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18    await asyncio.create_task(setter())
19    await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22    asyncio.run(main())
  • Event memiliki flag boolean, dan memanggil set() akan melanjutkan semua tugas yang sedang menunggu. Ini berguna untuk sinkronisasi sederhana.

Pola produsen-konsumen: asyncio.Queue

Dengan menggunakan Queue, produsen (yang membuat data) dan konsumen (yang memproses data) dapat berkoordinasi dengan lancar dan secara asinkron. Selain itu, ketika antrian penuh, produsen secara otomatis menunggu, secara alami menerapkan backpressure untuk mencegah kelebihan produksi.

 1import asyncio
 2import random
 3
 4async def producer(q, n_items):
 5    for i in range(n_items):
 6        await asyncio.sleep(random.random() * 0.5)
 7        item = f"item-{i}"
 8        await q.put(item)
 9        print("produced", item)
10    # signal consumers to stop
11    await q.put(None)
12
13async def consumer(q, name):
14    while True:
15        item = await q.get()
16        if item is None:
17            # put sentinel back for other consumers and break
18            await q.put(None)
19            break
20        await asyncio.sleep(random.random() * 1)
21        print(name, "consumed", item)
22        q.task_done()
23
24async def main():
25    q = asyncio.Queue(maxsize=5)  # bounded queue to apply backpressure
26    prod = asyncio.create_task(producer(q, 10))
27    cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28    await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31    asyncio.run(main())
  • Queue membantu mengoordinasikan produsen dan konsumen secara asinkron. Selain itu, pengaturan maxsize membuat produsen menunggu pada put saat antrian penuh, sehingga mencegah kelebihan produksi.

Menangani Operasi Sinkron yang Memblokir: run_in_executor

Untuk pemrosesan yang membebani CPU atau saat menggunakan pustaka yang tidak mendukung async, gunakan run_in_executor untuk mendelegasikan pemrosesan ke thread atau proses lain. Melakukan ini mencegah event loop utama berhenti, sehingga tugas asinkron lainnya dapat berjalan dengan lancar.

 1import asyncio
 2import time
 3
 4def blocking_io(x):
 5    # simulate blocking I/O or CPU-bound work
 6    time.sleep(2)
 7    return x * x
 8
 9async def main():
10    loop = asyncio.get_running_loop()
11    # run blocking_io in default thread pool
12    result = await loop.run_in_executor(None, blocking_io, 3)
13    print("blocking result:", result)
14
15if __name__ == "__main__":
16    asyncio.run(main())
  • Memanggil fungsi sinkron langsung akan memblok event loop. Dengan run_in_executor, kode dijalankan di thread terpisah dan tugas asinkron dapat terus berjalan secara bersamaan.

Contoh: Memanggil API dengan batas kecepatan (menggabungkan Semaphore + run_in_executor)

Berikut adalah contoh skenario di mana pemanggilan API dibatasi kecepatannya dan pemrosesan berat dilakukan pada hasilnya. Menggabungkan Semaphore dan run_in_executor memungkinkan pemrosesan berjalan dengan aman dan efisien.

 1import asyncio
 2import time
 3import random
 4
 5sem = asyncio.Semaphore(5)
 6
 7def heavy_sync_processing(data):
 8    # simulate heavy CPU-bound work
 9    time.sleep(1)
10    return f"processed-{data}"
11
12async def api_call(i):
13    await asyncio.sleep(random.random() * 0.5)  # simulate network latency
14    return f"data-{i}"
15
16async def worker(i):
17    async with sem:
18        data = await api_call(i)
19        # offload CPU-bound work to threadpool
20        loop = asyncio.get_running_loop()
21        result = await loop.run_in_executor(None, heavy_sync_processing, data)
22        print(result)
23
24async def main():
25    tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26    await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29    asyncio.run(main())
  • Kami menggunakan Semaphore untuk membatasi jumlah pemanggilan API bersamaan, dan pemrosesan berat pada data hasilnya didelegasikan ke thread pool. Memisahkan pemrosesan jaringan dan CPU meningkatkan efisiensi.

Pembatalan tugas dan pembersihan

Ketika sebuah tugas dibatalkan, menangani finally dan asyncio.CancelledError dengan baik sangatlah penting. Hal ini memastikan file dan koneksi dilepaskan serta status antara ditangani dengan benar, menjaga konsistensi aplikasi.

 1import asyncio
 2
 3async def long_running():
 4    try:
 5        print("started long_running")
 6        await asyncio.sleep(10)
 7        print("finished long_running")
 8    except asyncio.CancelledError:
 9        print("long_running was cancelled, cleaning up")
10        # perform cleanup here
11        raise
12
13async def main():
14    task = asyncio.create_task(long_running())
15    await asyncio.sleep(1)
16    task.cancel()
17    try:
18        await task
19    except asyncio.CancelledError:
20        print("task cancelled in main")
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • Pembatalan dikirimkan sebagai exception (CancelledError), jadi lakukan pembersihan yang diperlukan di blok except dan angkat lagi exception jika diperlukan.

Poin kunci untuk desain praktis

Berikut adalah poin-poin praktis yang berguna untuk merancang pemrosesan asinkron.

  • Kontrol konkurensi secara eksplisit Jika ada batasan sumber daya seperti API atau DB, Anda dapat membatasi jumlah eksekusi bersamaan dengan Semaphore.

  • Tangani sumber daya bersama dengan aman Jika Anda perlu memperbarui status dari beberapa tugas, gunakan Lock. Mengurangi status bersama dan merancang dengan data yang tidak dapat diubah membuat segala sesuatu lebih aman.

  • Pilih cara menerima hasil Jika Anda ingin memproses tugas begitu selesai, gunakan asyncio.as_completed; jika ingin memproses hasil sesuai urutan input, gunakan gather.

  • Isolasikan pemrosesan sinkron yang berat Untuk panggilan pustaka yang membebani CPU atau sinkron, gunakan run_in_executor atau ProcessPoolExecutor agar event loop tidak terblokir.

  • Rencanakan pembatalan dan penanganan exception Tuliskan penanganan exception yang baik untuk membersihkan sumber daya dengan aman, bahkan jika tugas dibatalkan di tengah jalan.

  • Permudah pengujian Abstraksikan efek samping seperti I/O, waktu, dan keacakan sehingga dapat diganti, membuat pengujian kode asinkron lebih mudah.

Ringkasan

asyncio sangat kuat, namun jika Anda hanya berfokus pada "menjalankan tugas secara paralel", Anda bisa menghadapi masalah seperti perebutan sumber daya bersama, pelanggaran batas sumber daya, atau pemblokiran event loop. Dengan menggabungkan Semaphore, Lock, Event, Queue, run_in_executor, dan penanganan pembatalan yang tepat, Anda dapat merancang aplikasi asinkron yang aman dan efisien. Dengan memanfaatkan mekanisme seperti pola produsen-konsumen, pembatasan konkurensi, atau memisahkan pemrosesan asinkron dan pemrosesan yang memblokir, alur kerja asinkron dapat dibangun dengan lebih aman dan efisien.

Anda dapat mengikuti artikel di atas menggunakan Visual Studio Code di saluran YouTube kami. Silakan periksa juga saluran YouTube kami.

YouTube Video