Kawalan Penyegerakan dalam Pemprosesan Asinkron Python
Artikel ini menerangkan kawalan penyegerakan dalam pemprosesan asinkron Python.
Anda akan belajar langkah demi langkah, dari asas asyncio hingga corak praktikal yang biasa digunakan untuk kawalan penyegerakan.
YouTube Video
Kawalan Penyegerakan dalam Pemprosesan Asinkron Python
Dalam pemprosesan asinkron, anda boleh menjalankan beberapa tugasan secara serentak dengan mudah. Walau bagaimanapun, dalam praktik, pelarasan yang lebih maju diperlukan seperti mengawal serentak, menyelaraskan tugasan, kawalan eksklusif sumber dikongsi, menangani proses segerak yang berat, dan pembersihan selepas pembatalan.
Di sini, kita akan belajar langkah demi langkah daripada asas asyncio hingga corak praktikal yang biasa digunakan untuk penyegerakan.
Pengenalan: Asas (async / await dan create_task)
Mari kita lihat dahulu kod asinkron yang minimum. await menunggu di titik itu sehingga korutin yang dipanggil selesai, dan asyncio.create_task menjadualkan tugasan untuk pelaksanaan serentak.
1import asyncio
2
3async def worker(name, delay):
4 # Simulate async work
5 await asyncio.sleep(delay)
6 return f"{name} done after {delay}s"
7
8async def main():
9 # Create two tasks that run concurrently.
10 t1 = asyncio.create_task(worker("A", 1))
11 t2 = asyncio.create_task(worker("B", 2))
12
13 # Await both results (this suspends until both are done).
14 result1 = await t1
15 result2 = await t2
16 print(result1, result2)
17
18if __name__ == "__main__":
19 asyncio.run(main())- Kod ini adalah corak tipikal di mana tugasan dicipta secara eksplisit, dijalankan secara selari dan hasilnya diterima di akhir dengan
await.create_taskmembolehkan pelaksanaan serentak.
Perbezaan antara asyncio.gather, asyncio.wait, dan asyncio.as_completed
Apabila menjalankan beberapa korutin secara serentak, anda memilih mana yang ingin digunakan bergantung pada bagaimana anda ingin memperoleh hasil. gather menunggu semua selesai dan mengembalikan hasil mengikut urutan input, manakala as_completed membolehkan pemprosesan hasil sebaik sahaja ianya selesai, tanpa mengira urutan.
1import asyncio
2import random
3
4async def job(i):
5 delay = random.random() * 2
6 await asyncio.sleep(delay)
7 return (i, delay)
8
9async def gather_example():
10 # gather waits for all tasks and returns results in the same order as input
11 results = await asyncio.gather(*(job(i) for i in range(5)))
12 print("gather order:", results)
13
14async def as_completed_example():
15 # as_completed yields results as they finish (useful to process early results)
16 tasks = [asyncio.create_task(job(i)) for i in range(5)]
17 for coro in asyncio.as_completed(tasks):
18 res = await coro
19 print("completed:", res)
20
21async def main():
22 await gather_example()
23 await as_completed_example()
24
25if __name__ == "__main__":
26 asyncio.run(main())- Seperti yang ditunjukkan dalam kod ini,
gathermengembalikan hasil mengikut urutan input, sangat berguna jika anda ingin mengekalkan urutan.as_completeddigunakan jika anda ingin memproses hasil sebaik sahaja ia selesai.
Mengawal serentak: Mengehadkan pelaksanaan serentak dengan asyncio.Semaphore
Apabila terdapat had kadar API luar atau had sambungan DB, anda boleh mengawal pelaksanaan serentak dengan Semaphore.
1import asyncio
2import random
3
4sem = asyncio.Semaphore(3) # allow up to 3 concurrent workers
5
6async def limited_worker(i):
7 async with sem:
8 # Only 3 tasks can be inside this block concurrently
9 delay = random.random() * 2
10 await asyncio.sleep(delay)
11 print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14 tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15 await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18 asyncio.run(main())- Dengan menggunakan
Semaphorebersamaasync with, anda boleh mengehadkan bilangan pelaksanaan serentak dengan mudah. Ini berkesan dalam situasi dengan kekangan luar.
Kawalan eksklusif sumber dikongsi: asyncio.Lock
Lock digunakan untuk mengelakkan kemas kini serentak pada data dikongsi. asyncio.Lock adalah primitif eksklusif untuk penggunaan asinkron.
1import asyncio
2
3counter = 0
4lock = asyncio.Lock()
5
6async def incrementer(n_times):
7 global counter
8 for _ in range(n_times):
9 # Acquire lock to update shared counter safely
10 async with lock:
11 tmp = counter
12 await asyncio.sleep(0) # yield control to increase race likelihood
13 counter = tmp + 1
14
15async def main():
16 tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17 await asyncio.gather(*tasks)
18 print("final counter:", counter)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Jika beberapa tugasan mengemas kini pembolehubah dikongsi seperti
counterglobal, konflik boleh berlaku. Dengan melingkari operasi denganLock, anda boleh mengekalkan konsistensi.
Penyelarasan tugasan: asyncio.Event
Event digunakan apabila satu tugasan memberi isyarat bahawa ia sedia, dan tugasan lain menunggu isyarat ini. Ini adalah cara mudah bagi tugasan untuk berkongsi isyarat dan menyelaras antara satu sama lain.
1import asyncio
2
3event = asyncio.Event()
4
5async def waiter(name):
6 print(f"{name} is waiting for event")
7 await event.wait()
8 print(f"{name} resumed after event set")
9
10async def setter():
11 print("setter will sleep and then set the event")
12 await asyncio.sleep(1)
13 event.set()
14 print("event set by setter")
15
16async def main():
17 tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18 await asyncio.create_task(setter())
19 await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22 asyncio.run(main())Eventmempunyai penanda boolean dan apabila memanggilset(), semua tugasan yang menunggu disambung semula. Ia berguna untuk penyegerakan mudah.
Corak pengeluar-pengguna: asyncio.Queue
Dengan menggunakan Queue, pengeluar (yang mencipta data) dan pengguna (yang memproses data) boleh menyelaras dengan lancar dan secara asinkron. Juga, apabila barisan penuh, pengeluar secara automatik akan menunggu, sekaligus melaksanakan tekanan balikan secara semula jadi untuk mengelakkan pengeluaran berlebihan.
1import asyncio
2import random
3
4async def producer(q, n_items):
5 for i in range(n_items):
6 await asyncio.sleep(random.random() * 0.5)
7 item = f"item-{i}"
8 await q.put(item)
9 print("produced", item)
10 # signal consumers to stop
11 await q.put(None)
12
13async def consumer(q, name):
14 while True:
15 item = await q.get()
16 if item is None:
17 # put sentinel back for other consumers and break
18 await q.put(None)
19 break
20 await asyncio.sleep(random.random() * 1)
21 print(name, "consumed", item)
22 q.task_done()
23
24async def main():
25 q = asyncio.Queue(maxsize=5) # bounded queue to apply backpressure
26 prod = asyncio.create_task(producer(q, 10))
27 cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28 await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31 asyncio.run(main())Queuemembantu menyelaras pengeluar dan pengguna secara asinkron. Selain itu, menetapkanmaxsizemenjadikan pengeluar menunggu padaputapabila barisan penuh, mengelakkan pengeluaran berlebihan.
Mengendalikan Operasi Penyumbat Sejajar: run_in_executor
Untuk pemprosesan intensif CPU atau apabila menggunakan pustaka yang tidak menyokong async, gunakan run_in_executor untuk mewakilkan pemprosesan ke thread atau proses lain. Ini akan mengelakkan gelung peristiwa utama terhenti, membolehkan tugasan asinkron lain berjalan dengan lancar.
1import asyncio
2import time
3
4def blocking_io(x):
5 # simulate blocking I/O or CPU-bound work
6 time.sleep(2)
7 return x * x
8
9async def main():
10 loop = asyncio.get_running_loop()
11 # run blocking_io in default thread pool
12 result = await loop.run_in_executor(None, blocking_io, 3)
13 print("blocking result:", result)
14
15if __name__ == "__main__":
16 asyncio.run(main())- Memanggil fungsi segerak secara langsung akan menghalang gelung peristiwa. Dengan
run_in_executor, kod dijalankan dalam thread berasingan dan tugasan asinkron boleh terus berjalan serentak.
Contoh: Panggilan API dengan had kadar (gabungan Semaphore + run_in_executor)
Berikut adalah senario contoh di mana panggilan API dihadkan kadar dan pemprosesan berat dilakukan ke atas hasilnya. Menggabungkan Semaphore dan run_in_executor membolehkan pemprosesan berjalan dengan selamat dan cekap.
1import asyncio
2import time
3import random
4
5sem = asyncio.Semaphore(5)
6
7def heavy_sync_processing(data):
8 # simulate heavy CPU-bound work
9 time.sleep(1)
10 return f"processed-{data}"
11
12async def api_call(i):
13 await asyncio.sleep(random.random() * 0.5) # simulate network latency
14 return f"data-{i}"
15
16async def worker(i):
17 async with sem:
18 data = await api_call(i)
19 # offload CPU-bound work to threadpool
20 loop = asyncio.get_running_loop()
21 result = await loop.run_in_executor(None, heavy_sync_processing, data)
22 print(result)
23
24async def main():
25 tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26 await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29 asyncio.run(main())- Kami menggunakan
Semaphoreuntuk mengehadkan bilangan panggilan API serentak, dan pemprosesan berat ke atas data hasilnya diserahkan ke kolam thread. Memisahkan pemprosesan rangkaian dan CPU meningkatkan kecekapan.
Pembatalan dan pembersihan tugasan
Apabila tugasan dibatalkan, mengendalikan finally dan asyncio.CancelledError dengan betul adalah sangat penting. Ini memastikan fail dan sambungan dilepaskan serta keadaan pertengahan dikendalikan dengan baik, mengekalkan konsistensi dalam aplikasi.
1import asyncio
2
3async def long_running():
4 try:
5 print("started long_running")
6 await asyncio.sleep(10)
7 print("finished long_running")
8 except asyncio.CancelledError:
9 print("long_running was cancelled, cleaning up")
10 # perform cleanup here
11 raise
12
13async def main():
14 task = asyncio.create_task(long_running())
15 await asyncio.sleep(1)
16 task.cancel()
17 try:
18 await task
19 except asyncio.CancelledError:
20 print("task cancelled in main")
21
22if __name__ == "__main__":
23 asyncio.run(main())- Pembatalan dihantar sebagai pengecualian (
CancelledError), jadi lakukan pembersihan yang diperlukan dalam blokexceptdan naikkan semula pengecualian jika perlu.
Perkara utama untuk reka bentuk praktikal
Berikut adalah perkara praktikal yang berguna untuk mereka bentuk pemprosesan asinkron.
-
Kawal serentak secara eksplisit Apabila terdapat had sumber seperti API atau DB, anda boleh mengehadkan bilangan pelaksanaan serentak dengan
Semaphore. -
Kendalikan sumber dikongsi dengan selamat Jika anda perlu mengemas kini keadaan daripada beberapa tugasan, gunakan
Lock. Mengurangkan keadaan dikongsi dan mereka bentuk berasaskan data tidak boleh diubah menjadikan segalanya lebih selamat. -
Pilih cara menerima hasil Jika anda ingin memproses tugasan apabila ia siap, gunakan
asyncio.as_completed; jika anda ingin memproses hasil mengikut urutan input, gunakangather. -
Asingkan pemprosesan segerak yang berat Untuk panggilan pustaka yang memerlukan banyak CPU atau segerak, gunakan
run_in_executoratauProcessPoolExecutoruntuk mengelakkan sekatan pada gelung peristiwa. -
Rancang untuk pembatalan dan pengecualian Tulis pengendalian pengecualian yang betul untuk membersihkan sumber walaupun tugasan dibatalkan di pertengahan jalan.
-
Permudahkan ujian Abstrakkan kesan sampingan seperti I/O, masa, dan kerandoman supaya ia boleh digantikan, memudahkan ujian untuk kod asinkron.
Ringkasan
asyncio sangat berkuasa, tetapi jika anda hanya menumpukan kepada “menjalankan sesuatu secara selari”, anda mungkin akan menghadapi isu seperti perebutan sumber dikongsi, pelanggaran had sumber, atau sekatan gelung peristiwa. Dengan menggabungkan Semaphore, Lock, Event, Queue, run_in_executor, dan pengendalian pembatalan yang betul, anda boleh mereka bentuk aplikasi asinkron yang selamat dan cekap. Dengan menggunakan mekanisme seperti corak pengeluar-pengguna, had serentak, atau memisahkan pemprosesan asinkron dan yang menyekat, aliran kerja asinkron boleh dibina dengan lebih selamat dan cekap.
Anda boleh mengikuti artikel di atas menggunakan Visual Studio Code di saluran YouTube kami. Sila lihat juga saluran YouTube kami.