Pagkontrol ng Synchronization sa Asynchronous na Pagproseso ng Python
Ipinaliliwanag ng artikulong ito ang pagkokontrol ng synchronization sa asynchronous na pagproseso gamit ang Python.
Matututo ka nang paunti-unti, mula sa mga batayan ng asyncio hanggang sa mga praktikal na pattern na karaniwang ginagamit para sa synchronization control.
YouTube Video
Pagkontrol ng Synchronization sa Asynchronous na Pagproseso ng Python
Sa asynchronous na pagproseso, madaling makatakbo ng sabay-sabay ang maraming gawain. Gayunpaman, sa aktwal na paggamit, kinakailangan ng mas masusing mga pagsasaayos tulad ng pagkokontrol ng concurrency, pagco-coordinate ng mga gawain, eksklusibong kontrol sa mga shared na resources, paghawak sa mabibigat na synchronous na proseso, at paglilinis matapos ang pagkakansela.
Dito, matututuhan natin nang sunud-sunod mula sa mga batayan ng asyncio hanggang sa mga praktikal na pattern na madalas gamitin sa synchronization.
Panimula: Mga Batayan (async / await at create_task)
Tingnan muna natin ang ilang pinakapayak na asynchronous na code. Ang await ay maghihintay sa puntong iyon hanggang matapos ang tinawag na coroutine, at ang asyncio.create_task naman ay nag-seschedule ng gawain para tumakbo ng sabay-sabay.
1import asyncio
2
3async def worker(name, delay):
4 # Simulate async work
5 await asyncio.sleep(delay)
6 return f"{name} done after {delay}s"
7
8async def main():
9 # Create two tasks that run concurrently.
10 t1 = asyncio.create_task(worker("A", 1))
11 t2 = asyncio.create_task(worker("B", 2))
12
13 # Await both results (this suspends until both are done).
14 result1 = await t1
15 result2 = await t2
16 print(result1, result2)
17
18if __name__ == "__main__":
19 asyncio.run(main())- Ang code na ito ay karaniwang pattern kung saan tuwirang nililikha ang mga gawain, pinapatakbo nang magkasabay, at tinatanggap ang resulta sa huli gamit ang
await. Pinapaganang mag-execute nang sabay-sabay angcreate_task.
Mga Pagkakaiba ng asyncio.gather, asyncio.wait, at asyncio.as_completed
Kapag nagpapatakbo ng maraming coroutines nang sabay-sabay, pipiliin mo kung alin ang gagamitin depende kung paano mo gustong kunin ang mga resulta. Ang gather ay naghihintay na matapos lahat at ibinabalik ang mga resulta ayon sa pagkakasunod ng input, habang pinapayagan naman ng as_completed na iproseso ang resulta agad-agad ayon sa pagkakatapos, anuman ang order.
1import asyncio
2import random
3
4async def job(i):
5 delay = random.random() * 2
6 await asyncio.sleep(delay)
7 return (i, delay)
8
9async def gather_example():
10 # gather waits for all tasks and returns results in the same order as input
11 results = await asyncio.gather(*(job(i) for i in range(5)))
12 print("gather order:", results)
13
14async def as_completed_example():
15 # as_completed yields results as they finish (useful to process early results)
16 tasks = [asyncio.create_task(job(i)) for i in range(5)]
17 for coro in asyncio.as_completed(tasks):
18 res = await coro
19 print("completed:", res)
20
21async def main():
22 await gather_example()
23 await as_completed_example()
24
25if __name__ == "__main__":
26 asyncio.run(main())- Gaya ng ipinapakita sa code na ito, ibinabalik ng
gatherang mga resulta batay sa pagkakaayos ng input, na mainam kung nais mong mapanatili ang order. Ginagamit angas_completedkung gusto mong iproseso agad ang mga resulta kapag natapos na.
Pagkokontrol ng Concurrency: Paglilimita ng sabayang pagpapatakbo gamit ang asyncio.Semaphore
Kapag may limitasyon sa rate ng external API o DB connection, maaari mong kontrolin ang sabayang pagtakbo sa pamamagitan ng Semaphore.
1import asyncio
2import random
3
4sem = asyncio.Semaphore(3) # allow up to 3 concurrent workers
5
6async def limited_worker(i):
7 async with sem:
8 # Only 3 tasks can be inside this block concurrently
9 delay = random.random() * 2
10 await asyncio.sleep(delay)
11 print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14 tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15 await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18 asyncio.run(main())- Gamit ang
Semaphorekasabay angasync with, madaling malilimitahan ang bilang ng sabayang pagpapatakbo. Epektibo ito sa mga pagkakataong may panlabas na mga limitasyon.
Eksklusibong kontrol sa mga shared na resources: asyncio.Lock
Ginagamit ang Lock upang maiwasan ang sabayan na pag-update ng mga shared na data. Ang asyncio.Lock ay eksklusibong primitive para sa asynchronous na paggamit.
1import asyncio
2
3counter = 0
4lock = asyncio.Lock()
5
6async def incrementer(n_times):
7 global counter
8 for _ in range(n_times):
9 # Acquire lock to update shared counter safely
10 async with lock:
11 tmp = counter
12 await asyncio.sleep(0) # yield control to increase race likelihood
13 counter = tmp + 1
14
15async def main():
16 tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17 await asyncio.gather(*tasks)
18 print("final counter:", counter)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Kung maraming gawain ang nag-u-update ng isang shared variable tulad ng global na
counter, maaaring magkaroon ng conflict. Sa pagbabalot ng operasyon gamit angLock, mapapanatili mo ang consistency.
Pagkokoordina ng Gawain: asyncio.Event
Ginagamit ang Event kapag nangangailangan na sumenyas ang isang gawain na ito ay handa na, at maghihintay ang ibang gawain sa senyal na ito. Ito ay simpleng paraan upang magbahaginan ng senyas ang mga gawain at mag-synchronize sa isa’t isa.
1import asyncio
2
3event = asyncio.Event()
4
5async def waiter(name):
6 print(f"{name} is waiting for event")
7 await event.wait()
8 print(f"{name} resumed after event set")
9
10async def setter():
11 print("setter will sleep and then set the event")
12 await asyncio.sleep(1)
13 event.set()
14 print("event set by setter")
15
16async def main():
17 tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18 await asyncio.create_task(setter())
19 await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22 asyncio.run(main())- May boolean flag ang
Event, at sa pagtawag ngset(), magpapatuloy ang lahat ng naghihintay na gawain. Kapaki-pakinabang ito para sa simpleng synchronization.
Producer-consumer Pattern: asyncio.Queue
Sa paggamit ng Queue, maaaring magka-coordina nang maayos at asynchronous ang mga producer (lumilikha ng data) at consumer (nagpo-proseso ng data). Kapag puno ang queue, kusa ring maghihintay ang mga producer, kaya natural na naipapatupad ang backpressure para maiwasan ang overproduction.
1import asyncio
2import random
3
4async def producer(q, n_items):
5 for i in range(n_items):
6 await asyncio.sleep(random.random() * 0.5)
7 item = f"item-{i}"
8 await q.put(item)
9 print("produced", item)
10 # signal consumers to stop
11 await q.put(None)
12
13async def consumer(q, name):
14 while True:
15 item = await q.get()
16 if item is None:
17 # put sentinel back for other consumers and break
18 await q.put(None)
19 break
20 await asyncio.sleep(random.random() * 1)
21 print(name, "consumed", item)
22 q.task_done()
23
24async def main():
25 q = asyncio.Queue(maxsize=5) # bounded queue to apply backpressure
26 prod = asyncio.create_task(producer(q, 10))
27 cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28 await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31 asyncio.run(main())- Tinutulungan ng
Queuena magka-coordina ang producer at consumer sa asynchronous na paraan. Bukod dito, kapag inilagay angmaxsize, maghihintay ang producer saputkapag puno ang queue, kaya naiiwasan ang overproduction.
Paghawak ng Mga Synchronous Blocking Operation: run_in_executor
Para sa CPU-intensive na pagproseso o paggamit ng mga library na hindi sumusuporta sa async, gamitin ang run_in_executor upang ipasa ang pagproseso sa ibang thread o proseso. Sa ganitong paraan, naiwasan ang pagtigil ng main event loop kaya tuloy-tuloy pa ring tatakbo ang iba pang asynchronous na gawain.
1import asyncio
2import time
3
4def blocking_io(x):
5 # simulate blocking I/O or CPU-bound work
6 time.sleep(2)
7 return x * x
8
9async def main():
10 loop = asyncio.get_running_loop()
11 # run blocking_io in default thread pool
12 result = await loop.run_in_executor(None, blocking_io, 3)
13 print("blocking result:", result)
14
15if __name__ == "__main__":
16 asyncio.run(main())- Kapag direktang tinawag ang synchronous na mga function, mae-stack o mabablock ang event loop. Sa pamamagitan ng
run_in_executor, ang code ay tatakbo sa hiwalay na thread at magpapatuloy nang sabay-sabay ang mga asynchronous na gawain.
Halimbawa: Rate-limited na API calls (pinagsamang Semaphore at run_in_executor)
Narito ang isang halimbawa kung saan may rate limit ang API calls at may mabigat na pagproseso sa resulta. Sa pagsasama ng Semaphore at run_in_executor, maaaring tumuloy ang pagproseso nang ligtas at mahusay.
1import asyncio
2import time
3import random
4
5sem = asyncio.Semaphore(5)
6
7def heavy_sync_processing(data):
8 # simulate heavy CPU-bound work
9 time.sleep(1)
10 return f"processed-{data}"
11
12async def api_call(i):
13 await asyncio.sleep(random.random() * 0.5) # simulate network latency
14 return f"data-{i}"
15
16async def worker(i):
17 async with sem:
18 data = await api_call(i)
19 # offload CPU-bound work to threadpool
20 loop = asyncio.get_running_loop()
21 result = await loop.run_in_executor(None, heavy_sync_processing, data)
22 print(result)
23
24async def main():
25 tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26 await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29 asyncio.run(main())- Gumagamit tayo ng
Semaphorepara limitahan ang bilang ng sabayang API calls, at ang mabigat na pagproseso ng resulta ay ipinapasa sa thread pool. Ang paghihiwalay ng network processing at CPU processing ay nagpapahusay ng kahusayan.
Pagkansela ng Gawain at Paglilinis
Kapag kinansela ang isang gawain, mahalagang maayos na hawakan ang finally at asyncio.CancelledError. Ito ay magsisiguro na mare-release ang mga file at koneksyon, at maayos na mahahawakan ang intermediate na estado, upang mapanatili ang consistency ng application.
1import asyncio
2
3async def long_running():
4 try:
5 print("started long_running")
6 await asyncio.sleep(10)
7 print("finished long_running")
8 except asyncio.CancelledError:
9 print("long_running was cancelled, cleaning up")
10 # perform cleanup here
11 raise
12
13async def main():
14 task = asyncio.create_task(long_running())
15 await asyncio.sleep(1)
16 task.cancel()
17 try:
18 await task
19 except asyncio.CancelledError:
20 print("task cancelled in main")
21
22if __name__ == "__main__":
23 asyncio.run(main())- Ang pagkansela ay dumarating bilang exception (
CancelledError), kaya’t gawin ang kinakailangang paglilinis saexceptblock at i-re-raise ang exception kung kinakailangan.
Mga Mahalagang Punto para sa Praktikal na Disenyo
Narito ang mga praktikal na puntos na magagamit sa pagdisenyo ng asynchronous na pagproseso.
-
Tuwirang kontrolin ang concurrency Kapag may mga limitasyon ang resources gaya ng API o DB, maaari mong limitahan ang bilang ng sabayang pagpapatakbo gamit ang
Semaphore. -
Hawakan nang ligtas ang mga shared na resources Kung kailangang mag-update ng estado mula sa maraming gawain, gumamit ng
Lock. Ang pagbabawas ng shared state at pagdidisenyo gamit ang immutable na data ay nagpapaligtas pa lalo. -
Pumili kung paano tatanggapin ang mga resulta Kung gusto mong iproseso ang mga gawain kapag natatapos na, gamitin ang
asyncio.as_completed; kung gusto mo namang iproseso batay sa input order, gamitin anggather. -
Ihiwalay ang mabibigat na synchronous na pagproseso Para sa CPU-intensive o synchronous na tawag sa library, gamitin ang
run_in_executoroProcessPoolExecutorupang hindi ma-block ang event loop. -
Magplano para sa pagkansela at mga exception Sumulat ng tamang exception handling para malinis na mapalaya ang mga resources kahit makansela ang gawain sa kalagitnaan.
-
Gawing madali ang pagsusulit I-abstract ang mga side effect tulad ng I/O, oras, at randomness upang madaling mapalitan at mapadali ang pag-test sa asynchronous na code.
Buod
Makapangyarihan ang asyncio, ngunit kung magpo-focus ka lang sa 'pagpapatakbo ng mga bagay nang sabay-sabay', maaari kang makaharap ng mga isyu gaya ng agawan sa shared resources, paglabag sa limitasyon ng resources, o pagbabara ng event loop. Sa pagsasama ng Semaphore, Lock, Event, Queue, run_in_executor, at tamang paghawak sa pagkansela, makakabuo ka ng ligtas at mahusay na asynchronous na mga application. Sa paggamit ng mga mekanismo tulad ng producer-consumer pattern, paglimita ng concurrency, o paghihiwalay ng asynchronous at blocking na pagproseso, mas ligtas at mahusay ang pagbuo ng mga asynchronous na workflow.
Maaari mong sundan ang artikulo sa itaas gamit ang Visual Studio Code sa aming YouTube channel. Paki-check din ang aming YouTube channel.