Kawalan Penyegerakan dalam Pemprosesan Asinkron Python

Kawalan Penyegerakan dalam Pemprosesan Asinkron Python

Artikel ini menerangkan kawalan penyegerakan dalam pemprosesan asinkron Python.

Anda akan belajar langkah demi langkah, dari asas asyncio hingga corak praktikal yang biasa digunakan untuk kawalan penyegerakan.

YouTube Video

Kawalan Penyegerakan dalam Pemprosesan Asinkron Python

Dalam pemprosesan asinkron, anda boleh menjalankan beberapa tugasan secara serentak dengan mudah. Walau bagaimanapun, dalam praktik, pelarasan yang lebih maju diperlukan seperti mengawal serentak, menyelaraskan tugasan, kawalan eksklusif sumber dikongsi, menangani proses segerak yang berat, dan pembersihan selepas pembatalan.

Di sini, kita akan belajar langkah demi langkah daripada asas asyncio hingga corak praktikal yang biasa digunakan untuk penyegerakan.

Pengenalan: Asas (async / await dan create_task)

Mari kita lihat dahulu kod asinkron yang minimum. await menunggu di titik itu sehingga korutin yang dipanggil selesai, dan asyncio.create_task menjadualkan tugasan untuk pelaksanaan serentak.

 1import asyncio
 2
 3async def worker(name, delay):
 4    # Simulate async work
 5    await asyncio.sleep(delay)
 6    return f"{name} done after {delay}s"
 7
 8async def main():
 9    # Create two tasks that run concurrently.
10    t1 = asyncio.create_task(worker("A", 1))
11    t2 = asyncio.create_task(worker("B", 2))
12
13    # Await both results (this suspends until both are done).
14    result1 = await t1
15    result2 = await t2
16    print(result1, result2)
17
18if __name__ == "__main__":
19    asyncio.run(main())
  • Kod ini adalah corak tipikal di mana tugasan dicipta secara eksplisit, dijalankan secara selari dan hasilnya diterima di akhir dengan await. create_task membolehkan pelaksanaan serentak.

Perbezaan antara asyncio.gather, asyncio.wait, dan asyncio.as_completed

Apabila menjalankan beberapa korutin secara serentak, anda memilih mana yang ingin digunakan bergantung pada bagaimana anda ingin memperoleh hasil. gather menunggu semua selesai dan mengembalikan hasil mengikut urutan input, manakala as_completed membolehkan pemprosesan hasil sebaik sahaja ianya selesai, tanpa mengira urutan.

 1import asyncio
 2import random
 3
 4async def job(i):
 5    delay = random.random() * 2
 6    await asyncio.sleep(delay)
 7    return (i, delay)
 8
 9async def gather_example():
10    # gather waits for all tasks and returns results in the same order as input
11    results = await asyncio.gather(*(job(i) for i in range(5)))
12    print("gather order:", results)
13
14async def as_completed_example():
15    # as_completed yields results as they finish (useful to process early results)
16    tasks = [asyncio.create_task(job(i)) for i in range(5)]
17    for coro in asyncio.as_completed(tasks):
18        res = await coro
19        print("completed:", res)
20
21async def main():
22    await gather_example()
23    await as_completed_example()
24
25if __name__ == "__main__":
26    asyncio.run(main())
  • Seperti yang ditunjukkan dalam kod ini, gather mengembalikan hasil mengikut urutan input, sangat berguna jika anda ingin mengekalkan urutan. as_completed digunakan jika anda ingin memproses hasil sebaik sahaja ia selesai.

Mengawal serentak: Mengehadkan pelaksanaan serentak dengan asyncio.Semaphore

Apabila terdapat had kadar API luar atau had sambungan DB, anda boleh mengawal pelaksanaan serentak dengan Semaphore.

 1import asyncio
 2import random
 3
 4sem = asyncio.Semaphore(3)  # allow up to 3 concurrent workers
 5
 6async def limited_worker(i):
 7    async with sem:
 8        # Only 3 tasks can be inside this block concurrently
 9        delay = random.random() * 2
10        await asyncio.sleep(delay)
11        print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14    tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15    await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18    asyncio.run(main())
  • Dengan menggunakan Semaphore bersama async with, anda boleh mengehadkan bilangan pelaksanaan serentak dengan mudah. Ini berkesan dalam situasi dengan kekangan luar.

Kawalan eksklusif sumber dikongsi: asyncio.Lock

Lock digunakan untuk mengelakkan kemas kini serentak pada data dikongsi. asyncio.Lock adalah primitif eksklusif untuk penggunaan asinkron.

 1import asyncio
 2
 3counter = 0
 4lock = asyncio.Lock()
 5
 6async def incrementer(n_times):
 7    global counter
 8    for _ in range(n_times):
 9        # Acquire lock to update shared counter safely
10        async with lock:
11            tmp = counter
12            await asyncio.sleep(0)  # yield control to increase race likelihood
13            counter = tmp + 1
14
15async def main():
16    tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17    await asyncio.gather(*tasks)
18    print("final counter:", counter)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Jika beberapa tugasan mengemas kini pembolehubah dikongsi seperti counter global, konflik boleh berlaku. Dengan melingkari operasi dengan Lock, anda boleh mengekalkan konsistensi.

Penyelarasan tugasan: asyncio.Event

Event digunakan apabila satu tugasan memberi isyarat bahawa ia sedia, dan tugasan lain menunggu isyarat ini. Ini adalah cara mudah bagi tugasan untuk berkongsi isyarat dan menyelaras antara satu sama lain.

 1import asyncio
 2
 3event = asyncio.Event()
 4
 5async def waiter(name):
 6    print(f"{name} is waiting for event")
 7    await event.wait()
 8    print(f"{name} resumed after event set")
 9
10async def setter():
11    print("setter will sleep and then set the event")
12    await asyncio.sleep(1)
13    event.set()
14    print("event set by setter")
15
16async def main():
17    tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18    await asyncio.create_task(setter())
19    await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22    asyncio.run(main())
  • Event mempunyai penanda boolean dan apabila memanggil set(), semua tugasan yang menunggu disambung semula. Ia berguna untuk penyegerakan mudah.

Corak pengeluar-pengguna: asyncio.Queue

Dengan menggunakan Queue, pengeluar (yang mencipta data) dan pengguna (yang memproses data) boleh menyelaras dengan lancar dan secara asinkron. Juga, apabila barisan penuh, pengeluar secara automatik akan menunggu, sekaligus melaksanakan tekanan balikan secara semula jadi untuk mengelakkan pengeluaran berlebihan.

 1import asyncio
 2import random
 3
 4async def producer(q, n_items):
 5    for i in range(n_items):
 6        await asyncio.sleep(random.random() * 0.5)
 7        item = f"item-{i}"
 8        await q.put(item)
 9        print("produced", item)
10    # signal consumers to stop
11    await q.put(None)
12
13async def consumer(q, name):
14    while True:
15        item = await q.get()
16        if item is None:
17            # put sentinel back for other consumers and break
18            await q.put(None)
19            break
20        await asyncio.sleep(random.random() * 1)
21        print(name, "consumed", item)
22        q.task_done()
23
24async def main():
25    q = asyncio.Queue(maxsize=5)  # bounded queue to apply backpressure
26    prod = asyncio.create_task(producer(q, 10))
27    cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28    await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31    asyncio.run(main())
  • Queue membantu menyelaras pengeluar dan pengguna secara asinkron. Selain itu, menetapkan maxsize menjadikan pengeluar menunggu pada put apabila barisan penuh, mengelakkan pengeluaran berlebihan.

Mengendalikan Operasi Penyumbat Sejajar: run_in_executor

Untuk pemprosesan intensif CPU atau apabila menggunakan pustaka yang tidak menyokong async, gunakan run_in_executor untuk mewakilkan pemprosesan ke thread atau proses lain. Ini akan mengelakkan gelung peristiwa utama terhenti, membolehkan tugasan asinkron lain berjalan dengan lancar.

 1import asyncio
 2import time
 3
 4def blocking_io(x):
 5    # simulate blocking I/O or CPU-bound work
 6    time.sleep(2)
 7    return x * x
 8
 9async def main():
10    loop = asyncio.get_running_loop()
11    # run blocking_io in default thread pool
12    result = await loop.run_in_executor(None, blocking_io, 3)
13    print("blocking result:", result)
14
15if __name__ == "__main__":
16    asyncio.run(main())
  • Memanggil fungsi segerak secara langsung akan menghalang gelung peristiwa. Dengan run_in_executor, kod dijalankan dalam thread berasingan dan tugasan asinkron boleh terus berjalan serentak.

Contoh: Panggilan API dengan had kadar (gabungan Semaphore + run_in_executor)

Berikut adalah senario contoh di mana panggilan API dihadkan kadar dan pemprosesan berat dilakukan ke atas hasilnya. Menggabungkan Semaphore dan run_in_executor membolehkan pemprosesan berjalan dengan selamat dan cekap.

 1import asyncio
 2import time
 3import random
 4
 5sem = asyncio.Semaphore(5)
 6
 7def heavy_sync_processing(data):
 8    # simulate heavy CPU-bound work
 9    time.sleep(1)
10    return f"processed-{data}"
11
12async def api_call(i):
13    await asyncio.sleep(random.random() * 0.5)  # simulate network latency
14    return f"data-{i}"
15
16async def worker(i):
17    async with sem:
18        data = await api_call(i)
19        # offload CPU-bound work to threadpool
20        loop = asyncio.get_running_loop()
21        result = await loop.run_in_executor(None, heavy_sync_processing, data)
22        print(result)
23
24async def main():
25    tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26    await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29    asyncio.run(main())
  • Kami menggunakan Semaphore untuk mengehadkan bilangan panggilan API serentak, dan pemprosesan berat ke atas data hasilnya diserahkan ke kolam thread. Memisahkan pemprosesan rangkaian dan CPU meningkatkan kecekapan.

Pembatalan dan pembersihan tugasan

Apabila tugasan dibatalkan, mengendalikan finally dan asyncio.CancelledError dengan betul adalah sangat penting. Ini memastikan fail dan sambungan dilepaskan serta keadaan pertengahan dikendalikan dengan baik, mengekalkan konsistensi dalam aplikasi.

 1import asyncio
 2
 3async def long_running():
 4    try:
 5        print("started long_running")
 6        await asyncio.sleep(10)
 7        print("finished long_running")
 8    except asyncio.CancelledError:
 9        print("long_running was cancelled, cleaning up")
10        # perform cleanup here
11        raise
12
13async def main():
14    task = asyncio.create_task(long_running())
15    await asyncio.sleep(1)
16    task.cancel()
17    try:
18        await task
19    except asyncio.CancelledError:
20        print("task cancelled in main")
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • Pembatalan dihantar sebagai pengecualian (CancelledError), jadi lakukan pembersihan yang diperlukan dalam blok except dan naikkan semula pengecualian jika perlu.

Perkara utama untuk reka bentuk praktikal

Berikut adalah perkara praktikal yang berguna untuk mereka bentuk pemprosesan asinkron.

  • Kawal serentak secara eksplisit Apabila terdapat had sumber seperti API atau DB, anda boleh mengehadkan bilangan pelaksanaan serentak dengan Semaphore.

  • Kendalikan sumber dikongsi dengan selamat Jika anda perlu mengemas kini keadaan daripada beberapa tugasan, gunakan Lock. Mengurangkan keadaan dikongsi dan mereka bentuk berasaskan data tidak boleh diubah menjadikan segalanya lebih selamat.

  • Pilih cara menerima hasil Jika anda ingin memproses tugasan apabila ia siap, gunakan asyncio.as_completed; jika anda ingin memproses hasil mengikut urutan input, gunakan gather.

  • Asingkan pemprosesan segerak yang berat Untuk panggilan pustaka yang memerlukan banyak CPU atau segerak, gunakan run_in_executor atau ProcessPoolExecutor untuk mengelakkan sekatan pada gelung peristiwa.

  • Rancang untuk pembatalan dan pengecualian Tulis pengendalian pengecualian yang betul untuk membersihkan sumber walaupun tugasan dibatalkan di pertengahan jalan.

  • Permudahkan ujian Abstrakkan kesan sampingan seperti I/O, masa, dan kerandoman supaya ia boleh digantikan, memudahkan ujian untuk kod asinkron.

Ringkasan

asyncio sangat berkuasa, tetapi jika anda hanya menumpukan kepada “menjalankan sesuatu secara selari”, anda mungkin akan menghadapi isu seperti perebutan sumber dikongsi, pelanggaran had sumber, atau sekatan gelung peristiwa. Dengan menggabungkan Semaphore, Lock, Event, Queue, run_in_executor, dan pengendalian pembatalan yang betul, anda boleh mereka bentuk aplikasi asinkron yang selamat dan cekap. Dengan menggunakan mekanisme seperti corak pengeluar-pengguna, had serentak, atau memisahkan pemprosesan asinkron dan yang menyekat, aliran kerja asinkron boleh dibina dengan lebih selamat dan cekap.

Anda boleh mengikuti artikel di atas menggunakan Visual Studio Code di saluran YouTube kami. Sila lihat juga saluran YouTube kami.

YouTube Video