Kontrol Sinkronisasi dalam Pemrosesan Asinkron Python
Artikel ini menjelaskan kontrol sinkronisasi dalam pemrosesan asinkron di Python.
Anda akan belajar langkah demi langkah, mulai dari dasar-dasar asyncio hingga pola-pola praktis yang umum digunakan untuk kontrol sinkronisasi.
YouTube Video
Kontrol Sinkronisasi dalam Pemrosesan Asinkron Python
Dalam pemrosesan asinkron, Anda dapat dengan mudah menjalankan beberapa tugas secara bersamaan. Namun, dalam praktiknya, penyesuaian yang lebih canggih diperlukan, seperti mengontrol konkurensi, mengoordinasikan tugas, kontrol eksklusif terhadap sumber daya bersama, menangani proses sinkron yang berat, dan pembersihan setelah pembatalan.
Di sini, kita akan belajar langkah demi langkah mulai dari dasar asyncio hingga pola praktis yang umum digunakan untuk sinkronisasi.
Pendahuluan: Dasar-dasar (async / await dan create_task)
Mari kita lihat terlebih dahulu beberapa kode asinkron minimal. await menunggu di titik itu sampai coroutine yang dipanggil selesai, dan asyncio.create_task menjadwalkan tugas untuk dijalankan secara bersamaan.
1import asyncio
2
3async def worker(name, delay):
4 # Simulate async work
5 await asyncio.sleep(delay)
6 return f"{name} done after {delay}s"
7
8async def main():
9 # Create two tasks that run concurrently.
10 t1 = asyncio.create_task(worker("A", 1))
11 t2 = asyncio.create_task(worker("B", 2))
12
13 # Await both results (this suspends until both are done).
14 result1 = await t1
15 result2 = await t2
16 print(result1, result2)
17
18if __name__ == "__main__":
19 asyncio.run(main())- Kode ini adalah pola khas di mana tugas secara eksplisit dibuat, dijalankan secara paralel, dan hasilnya diterima di akhir dengan
await.create_taskmemungkinkan eksekusi secara bersamaan.
Perbedaan antara asyncio.gather, asyncio.wait, dan asyncio.as_completed
Saat menjalankan beberapa coroutine secara bersamaan, Anda memilih mana yang akan digunakan tergantung pada bagaimana Anda ingin mengambil hasilnya. gather menunggu semua selesai dan mengembalikan hasil sesuai urutan input, sedangkan as_completed memungkinkan Anda memproses hasil segera setelah selesai, tanpa memperhatikan urutan.
1import asyncio
2import random
3
4async def job(i):
5 delay = random.random() * 2
6 await asyncio.sleep(delay)
7 return (i, delay)
8
9async def gather_example():
10 # gather waits for all tasks and returns results in the same order as input
11 results = await asyncio.gather(*(job(i) for i in range(5)))
12 print("gather order:", results)
13
14async def as_completed_example():
15 # as_completed yields results as they finish (useful to process early results)
16 tasks = [asyncio.create_task(job(i)) for i in range(5)]
17 for coro in asyncio.as_completed(tasks):
18 res = await coro
19 print("completed:", res)
20
21async def main():
22 await gather_example()
23 await as_completed_example()
24
25if __name__ == "__main__":
26 asyncio.run(main())- Seperti ditunjukkan dalam kode ini,
gathermengembalikan hasil sesuai urutan input, sehingga berguna ketika Anda ingin menjaga urutan.as_completeddigunakan ketika Anda ingin memproses hasil segera setelah selesai.
Mengontrol konkurensi: Membatasi eksekusi bersamaan dengan asyncio.Semaphore
Jika terdapat batasan jumlah permintaan API eksternal atau koneksi DB, Anda dapat mengontrol eksekusi bersamaan menggunakan Semaphore.
1import asyncio
2import random
3
4sem = asyncio.Semaphore(3) # allow up to 3 concurrent workers
5
6async def limited_worker(i):
7 async with sem:
8 # Only 3 tasks can be inside this block concurrently
9 delay = random.random() * 2
10 await asyncio.sleep(delay)
11 print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14 tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15 await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18 asyncio.run(main())- Dengan menggunakan
Semaphorebersamaasync with, Anda dapat dengan mudah membatasi jumlah eksekusi bersamaan. Ini efektif dalam situasi dengan batasan eksternal.
Kontrol eksklusif sumber daya bersama: asyncio.Lock
Lock digunakan untuk mencegah pembaruan data bersama secara bersamaan. asyncio.Lock adalah primitif eksklusif untuk penggunaan asinkron.
1import asyncio
2
3counter = 0
4lock = asyncio.Lock()
5
6async def incrementer(n_times):
7 global counter
8 for _ in range(n_times):
9 # Acquire lock to update shared counter safely
10 async with lock:
11 tmp = counter
12 await asyncio.sleep(0) # yield control to increase race likelihood
13 counter = tmp + 1
14
15async def main():
16 tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17 await asyncio.gather(*tasks)
18 print("final counter:", counter)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Jika beberapa tugas memperbarui variabel bersama seperti
counterglobal, konflik dapat terjadi. Dengan membungkus operasi menggunakanLock, Anda dapat menjaga konsistensi.
Koordinasi tugas: asyncio.Event
Event digunakan ketika satu tugas memberi sinyal bahwa itu sudah siap, dan tugas lain menunggu sinyal ini. Ini adalah cara sederhana bagi tugas-tugas untuk berbagi sinyal dan sinkronisasi satu sama lain.
1import asyncio
2
3event = asyncio.Event()
4
5async def waiter(name):
6 print(f"{name} is waiting for event")
7 await event.wait()
8 print(f"{name} resumed after event set")
9
10async def setter():
11 print("setter will sleep and then set the event")
12 await asyncio.sleep(1)
13 event.set()
14 print("event set by setter")
15
16async def main():
17 tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18 await asyncio.create_task(setter())
19 await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22 asyncio.run(main())Eventmemiliki flag boolean, dan memanggilset()akan melanjutkan semua tugas yang sedang menunggu. Ini berguna untuk sinkronisasi sederhana.
Pola produsen-konsumen: asyncio.Queue
Dengan menggunakan Queue, produsen (yang membuat data) dan konsumen (yang memproses data) dapat berkoordinasi dengan lancar dan secara asinkron. Selain itu, ketika antrian penuh, produsen secara otomatis menunggu, secara alami menerapkan backpressure untuk mencegah kelebihan produksi.
1import asyncio
2import random
3
4async def producer(q, n_items):
5 for i in range(n_items):
6 await asyncio.sleep(random.random() * 0.5)
7 item = f"item-{i}"
8 await q.put(item)
9 print("produced", item)
10 # signal consumers to stop
11 await q.put(None)
12
13async def consumer(q, name):
14 while True:
15 item = await q.get()
16 if item is None:
17 # put sentinel back for other consumers and break
18 await q.put(None)
19 break
20 await asyncio.sleep(random.random() * 1)
21 print(name, "consumed", item)
22 q.task_done()
23
24async def main():
25 q = asyncio.Queue(maxsize=5) # bounded queue to apply backpressure
26 prod = asyncio.create_task(producer(q, 10))
27 cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28 await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31 asyncio.run(main())Queuemembantu mengoordinasikan produsen dan konsumen secara asinkron. Selain itu, pengaturanmaxsizemembuat produsen menunggu padaputsaat antrian penuh, sehingga mencegah kelebihan produksi.
Menangani Operasi Sinkron yang Memblokir: run_in_executor
Untuk pemrosesan yang membebani CPU atau saat menggunakan pustaka yang tidak mendukung async, gunakan run_in_executor untuk mendelegasikan pemrosesan ke thread atau proses lain. Melakukan ini mencegah event loop utama berhenti, sehingga tugas asinkron lainnya dapat berjalan dengan lancar.
1import asyncio
2import time
3
4def blocking_io(x):
5 # simulate blocking I/O or CPU-bound work
6 time.sleep(2)
7 return x * x
8
9async def main():
10 loop = asyncio.get_running_loop()
11 # run blocking_io in default thread pool
12 result = await loop.run_in_executor(None, blocking_io, 3)
13 print("blocking result:", result)
14
15if __name__ == "__main__":
16 asyncio.run(main())- Memanggil fungsi sinkron langsung akan memblok event loop. Dengan
run_in_executor, kode dijalankan di thread terpisah dan tugas asinkron dapat terus berjalan secara bersamaan.
Contoh: Memanggil API dengan batas kecepatan (menggabungkan Semaphore + run_in_executor)
Berikut adalah contoh skenario di mana pemanggilan API dibatasi kecepatannya dan pemrosesan berat dilakukan pada hasilnya. Menggabungkan Semaphore dan run_in_executor memungkinkan pemrosesan berjalan dengan aman dan efisien.
1import asyncio
2import time
3import random
4
5sem = asyncio.Semaphore(5)
6
7def heavy_sync_processing(data):
8 # simulate heavy CPU-bound work
9 time.sleep(1)
10 return f"processed-{data}"
11
12async def api_call(i):
13 await asyncio.sleep(random.random() * 0.5) # simulate network latency
14 return f"data-{i}"
15
16async def worker(i):
17 async with sem:
18 data = await api_call(i)
19 # offload CPU-bound work to threadpool
20 loop = asyncio.get_running_loop()
21 result = await loop.run_in_executor(None, heavy_sync_processing, data)
22 print(result)
23
24async def main():
25 tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26 await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29 asyncio.run(main())- Kami menggunakan
Semaphoreuntuk membatasi jumlah pemanggilan API bersamaan, dan pemrosesan berat pada data hasilnya didelegasikan ke thread pool. Memisahkan pemrosesan jaringan dan CPU meningkatkan efisiensi.
Pembatalan tugas dan pembersihan
Ketika sebuah tugas dibatalkan, menangani finally dan asyncio.CancelledError dengan baik sangatlah penting. Hal ini memastikan file dan koneksi dilepaskan serta status antara ditangani dengan benar, menjaga konsistensi aplikasi.
1import asyncio
2
3async def long_running():
4 try:
5 print("started long_running")
6 await asyncio.sleep(10)
7 print("finished long_running")
8 except asyncio.CancelledError:
9 print("long_running was cancelled, cleaning up")
10 # perform cleanup here
11 raise
12
13async def main():
14 task = asyncio.create_task(long_running())
15 await asyncio.sleep(1)
16 task.cancel()
17 try:
18 await task
19 except asyncio.CancelledError:
20 print("task cancelled in main")
21
22if __name__ == "__main__":
23 asyncio.run(main())- Pembatalan dikirimkan sebagai exception (
CancelledError), jadi lakukan pembersihan yang diperlukan di blokexceptdan angkat lagi exception jika diperlukan.
Poin kunci untuk desain praktis
Berikut adalah poin-poin praktis yang berguna untuk merancang pemrosesan asinkron.
-
Kontrol konkurensi secara eksplisit Jika ada batasan sumber daya seperti API atau DB, Anda dapat membatasi jumlah eksekusi bersamaan dengan
Semaphore. -
Tangani sumber daya bersama dengan aman Jika Anda perlu memperbarui status dari beberapa tugas, gunakan
Lock. Mengurangi status bersama dan merancang dengan data yang tidak dapat diubah membuat segala sesuatu lebih aman. -
Pilih cara menerima hasil Jika Anda ingin memproses tugas begitu selesai, gunakan
asyncio.as_completed; jika ingin memproses hasil sesuai urutan input, gunakangather. -
Isolasikan pemrosesan sinkron yang berat Untuk panggilan pustaka yang membebani CPU atau sinkron, gunakan
run_in_executoratauProcessPoolExecutoragar event loop tidak terblokir. -
Rencanakan pembatalan dan penanganan exception Tuliskan penanganan exception yang baik untuk membersihkan sumber daya dengan aman, bahkan jika tugas dibatalkan di tengah jalan.
-
Permudah pengujian Abstraksikan efek samping seperti I/O, waktu, dan keacakan sehingga dapat diganti, membuat pengujian kode asinkron lebih mudah.
Ringkasan
asyncio sangat kuat, namun jika Anda hanya berfokus pada "menjalankan tugas secara paralel", Anda bisa menghadapi masalah seperti perebutan sumber daya bersama, pelanggaran batas sumber daya, atau pemblokiran event loop. Dengan menggabungkan Semaphore, Lock, Event, Queue, run_in_executor, dan penanganan pembatalan yang tepat, Anda dapat merancang aplikasi asinkron yang aman dan efisien. Dengan memanfaatkan mekanisme seperti pola produsen-konsumen, pembatasan konkurensi, atau memisahkan pemrosesan asinkron dan pemrosesan yang memblokir, alur kerja asinkron dapat dibangun dengan lebih aman dan efisien.
Anda dapat mengikuti artikel di atas menggunakan Visual Studio Code di saluran YouTube kami. Silakan periksa juga saluran YouTube kami.