पाइथन असिंक्रोनस प्रोसेसिंग में समकालिकता नियंत्रण
यह लेख पाइथन असिंक्रोनस प्रोसेसिंग में समकालिकता नियंत्रण की व्याख्या करता है।
आप चरण-दर-चरण सीखेंगे, asyncio की मूल बातें से लेकर वे व्यावहारिक पैटर्न तक, जो सामान्यतः समकालSynchronization नियंत्रण के लिए उपयोग किए जाते हैं।
YouTube Video
पाइथन असिंक्रोनस प्रोसेसिंग में समकालिकता नियंत्रण
असिंक्रोनस प्रोसेसिंग में, आप आसानी से कई कार्य एक साथ चला सकते हैं। हालांकि, व्यवहार में, समकालिकता नियंत्रण, कार्यों का समन्वय, साझा संसाधनों का विशेष नियंत्रण, भारी सिंक्रोनस प्रोसेसिंग से निपटना, और रद्दीकरण के बाद सफाई जैसे और उन्नत समायोजन आवश्यक होते हैं।
यहाँ, हम asyncio की बुनियादी बातों से लेकर सिंक्रोनाइजेशन के लिए आमतौर पर उपयोग किए जाने वाले व्यावहारिक पैटर्न तक चरण दर चरण सीखेंगे।
परिचय: मूल बातें (async / await और create_task)
चलिए पहले कुछ न्यूनतम असिंक्रोनस कोड देखते हैं। await उस बिंदु पर तब तक प्रतीक्षा करता है जब तक कि बुलाया गया coroutine पूरा नहीं हो जाता, और asyncio.create_task किसी टास्क को समकालिक निष्पादन के लिए अनुसूचित करता है।
1import asyncio
2
3async def worker(name, delay):
4 # Simulate async work
5 await asyncio.sleep(delay)
6 return f"{name} done after {delay}s"
7
8async def main():
9 # Create two tasks that run concurrently.
10 t1 = asyncio.create_task(worker("A", 1))
11 t2 = asyncio.create_task(worker("B", 2))
12
13 # Await both results (this suspends until both are done).
14 result1 = await t1
15 result2 = await t2
16 print(result1, result2)
17
18if __name__ == "__main__":
19 asyncio.run(main())- यह कोड एक विशिष्ट पैटर्न है जहाँ कार्यों को स्पष्ट रूप से बनाया जाता है, समानांतर में चलाया जाता है, और परिणाम अंत में
awaitके साथ प्राप्त किए जाते हैं।create_taskसमकालिक निष्पादन को सक्षम बनाता है।
asyncio.gather, asyncio.wait, और asyncio.as_completed के बीच अंतर
जब आप एक साथ कई कोरूटीन चलाते हैं, तो परिणाम प्राप्त करने के तरीके के अनुसार आप उपयोग करने के लिए विकल्प चुनते हैं। gather सभी के समाप्त होने की प्रतीक्षा करता है और इनपुट क्रम में परिणाम लौटाता है, जबकि as_completed परिणाम प्राप्त होते ही उन्हें प्रोसेस करने की अनुमति देता है, क्रम की परवाह किए बिना।
1import asyncio
2import random
3
4async def job(i):
5 delay = random.random() * 2
6 await asyncio.sleep(delay)
7 return (i, delay)
8
9async def gather_example():
10 # gather waits for all tasks and returns results in the same order as input
11 results = await asyncio.gather(*(job(i) for i in range(5)))
12 print("gather order:", results)
13
14async def as_completed_example():
15 # as_completed yields results as they finish (useful to process early results)
16 tasks = [asyncio.create_task(job(i)) for i in range(5)]
17 for coro in asyncio.as_completed(tasks):
18 res = await coro
19 print("completed:", res)
20
21async def main():
22 await gather_example()
23 await as_completed_example()
24
25if __name__ == "__main__":
26 asyncio.run(main())- जैसा कि इस कोड में दिखाया गया है,
gatherपरिणाम इनपुट क्रम में लौटाता है, जिससे यह तब उपयोगी बनता है जब आप क्रम संरक्षित रखना चाहते हैं।as_completedतब उपयोग किया जाता है जब आप परिणामों को पूरा होते ही प्रोसेस करना चाहते हैं।
समकालिकता नियंत्रण: asyncio.Semaphore के साथ एक साथ निष्पादन को सीमित करना
जब बाहरी API रेट लिमिट या DB कनेक्शन लिमिट हो, तो आप Semaphore के माध्यम से समकालिक निष्पादन को नियंत्रित कर सकते हैं।
1import asyncio
2import random
3
4sem = asyncio.Semaphore(3) # allow up to 3 concurrent workers
5
6async def limited_worker(i):
7 async with sem:
8 # Only 3 tasks can be inside this block concurrently
9 delay = random.random() * 2
10 await asyncio.sleep(delay)
11 print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14 tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15 await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18 asyncio.run(main())async withके साथSemaphoreका उपयोग करके, आप एक साथ निष्पादन की संख्या आसानी से सीमित कर सकते हैं। यह बाहरी बाधाओं वाले हालात में प्रभावी है।
साझा संसाधनों का विशेष नियंत्रण: asyncio.Lock
Lock साझा डेटा में एक साथ अपडेट को रोकने के लिए उपयोग किया जाता है। asyncio.Lock असिंक्रोनस उपयोग के लिए एक विशिष्ट प्राथमिकता है।
1import asyncio
2
3counter = 0
4lock = asyncio.Lock()
5
6async def incrementer(n_times):
7 global counter
8 for _ in range(n_times):
9 # Acquire lock to update shared counter safely
10 async with lock:
11 tmp = counter
12 await asyncio.sleep(0) # yield control to increase race likelihood
13 counter = tmp + 1
14
15async def main():
16 tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17 await asyncio.gather(*tasks)
18 print("final counter:", counter)
19
20if __name__ == "__main__":
21 asyncio.run(main())- अगर कई कार्य एक साझा वेरिएबल (जैसे कि ग्लोबल
counter) को अपडेट करते हैं, तो टकराव हो सकता है।Lockके साथ ऑपरेशनों को समेट कर, आप एकरूपता बनाए रख सकते हैं।
टास्क समन्वय: asyncio.Event
Event तब उपयोग किया जाता है जब एक टास्क सिग्नल करता है कि वह तैयार है और अन्य कार्य इस सिग्नल की प्रतीक्षा करते हैं। यह कार्यों के बीच सिग्नल साझा करने और एक-दूसरे के साथ सिंक्रनाइज़ करने का एक आसान तरीका है।
1import asyncio
2
3event = asyncio.Event()
4
5async def waiter(name):
6 print(f"{name} is waiting for event")
7 await event.wait()
8 print(f"{name} resumed after event set")
9
10async def setter():
11 print("setter will sleep and then set the event")
12 await asyncio.sleep(1)
13 event.set()
14 print("event set by setter")
15
16async def main():
17 tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18 await asyncio.create_task(setter())
19 await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22 asyncio.run(main())Eventके पास एक बूलियन फ्लैग होता है, औरset()कॉल करने पर सभी प्रतीक्षा कर रहे टास्क फिर से शुरू हो जाते हैं। यह सादी सिंक्रनाइज़ेशन के लिए उपयोगी है।
प्रोड्यूसर-कंज़्यूमर पैटर्न: asyncio.Queue
Queue का उपयोग करके, डेटा बनाने वाले प्रोड्यूसर और डेटा प्रोसेस करने वाले कंज़्यूमर सुचारू और असिंक्रोनस रूप से समन्वय कर सकते हैं। साथ ही, जब queue भर जाती है तो प्रोड्यूसर स्वतः प्रतीक्षा करते हैं, इससे अतिरिक्त उत्पादन रोकने के लिए स्वाभाविक रूप से बैकप्रेशर लागू होता है।
1import asyncio
2import random
3
4async def producer(q, n_items):
5 for i in range(n_items):
6 await asyncio.sleep(random.random() * 0.5)
7 item = f"item-{i}"
8 await q.put(item)
9 print("produced", item)
10 # signal consumers to stop
11 await q.put(None)
12
13async def consumer(q, name):
14 while True:
15 item = await q.get()
16 if item is None:
17 # put sentinel back for other consumers and break
18 await q.put(None)
19 break
20 await asyncio.sleep(random.random() * 1)
21 print(name, "consumed", item)
22 q.task_done()
23
24async def main():
25 q = asyncio.Queue(maxsize=5) # bounded queue to apply backpressure
26 prod = asyncio.create_task(producer(q, 10))
27 cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28 await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31 asyncio.run(main())Queueप्रोड्यूसर और कंज़्यूमर के असिंक्रोनस समन्वय में मदद करता है। साथ ही,maxsizeसेट करने से जब queue पूरी भर जाती है, तो प्रोड्यूसरputपर प्रतीक्षा करता है, जिससे अतिरिक्त उत्पादन रुकता है।
समकालिक ब्लॉकिंग संचालन को संभालना: run_in_executor
CPU-केंद्रित प्रोसेसिंग या ऐसी लाइब्रेरी का उपयोग करते समय जो async समर्थित न हो, प्रोसेसिंग को किसी अन्य थ्रेड या प्रोसेस को सौंपने के लिए run_in_executor का उपयोग करें। ऐसा करने से मुख्य इवेंट लूप रुकता नहीं है, जिससे अन्य असिंक्रोनस टास्क आसानी से चल सकते हैं।
1import asyncio
2import time
3
4def blocking_io(x):
5 # simulate blocking I/O or CPU-bound work
6 time.sleep(2)
7 return x * x
8
9async def main():
10 loop = asyncio.get_running_loop()
11 # run blocking_io in default thread pool
12 result = await loop.run_in_executor(None, blocking_io, 3)
13 print("blocking result:", result)
14
15if __name__ == "__main__":
16 asyncio.run(main())- सिंक्रोनस फंक्शनों को सीधे कॉल करना इवेंट लूप को ब्लॉक कर देगा।
run_in_executorके साथ, कोड एक अलग थ्रेड में चलता है और असिंक्रोनस टास्क समानांतर में प्रगति कर सकते हैं।
उदाहरण: सीमित दर वाली API कॉल्स (Semaphore + run_in_executor का संयोजन)
निम्न एक उदाहरण है जहाँ API कॉल्स की दर सीमित है और परिणामों पर भारी प्रोसेसिंग की जाती है। Semaphore और run_in_executor का संयोजन प्रोसेसिंग को सुरक्षित और कुशलता से संचालित करने देता है।
1import asyncio
2import time
3import random
4
5sem = asyncio.Semaphore(5)
6
7def heavy_sync_processing(data):
8 # simulate heavy CPU-bound work
9 time.sleep(1)
10 return f"processed-{data}"
11
12async def api_call(i):
13 await asyncio.sleep(random.random() * 0.5) # simulate network latency
14 return f"data-{i}"
15
16async def worker(i):
17 async with sem:
18 data = await api_call(i)
19 # offload CPU-bound work to threadpool
20 loop = asyncio.get_running_loop()
21 result = await loop.run_in_executor(None, heavy_sync_processing, data)
22 print(result)
23
24async def main():
25 tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26 await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29 asyncio.run(main())- हम समकालिक API कॉल्स की संख्या सीमित करने के लिए
Semaphoreका उपयोग करते हैं, और परिणामस्वरूप डेटा पर भारी प्रोसेसिंग को एक थ्रेड पूल को सौंपते हैं। नेटवर्क और CPU प्रोसेसिंग को अलग करने से दक्षता बढ़ती है।
टास्क रद्द करना और सफाई
जब कोई टास्क रद्द हो, तब finally और asyncio.CancelledError को सही तरीके से संभालना बहुत जरूरी है। यह सुनिश्चित करता है कि फाइलें और कनेक्शन मुक्त हो जाएं और मध्यवर्ती अवस्थाएं सही ढंग से संभाल ली जाएं, जिससे एप्लीकेशन की एकरूपता बनी रहती है।
1import asyncio
2
3async def long_running():
4 try:
5 print("started long_running")
6 await asyncio.sleep(10)
7 print("finished long_running")
8 except asyncio.CancelledError:
9 print("long_running was cancelled, cleaning up")
10 # perform cleanup here
11 raise
12
13async def main():
14 task = asyncio.create_task(long_running())
15 await asyncio.sleep(1)
16 task.cancel()
17 try:
18 await task
19 except asyncio.CancelledError:
20 print("task cancelled in main")
21
22if __name__ == "__main__":
23 asyncio.run(main())- रद्द करने की सूचना एक अपवाद (
CancelledError) के रूप में दी जाती है, इसलिए आवश्यक सफाई कार्यexceptब्लॉक में करें और आवश्यक हो तो अपवाद को फिर से उठाएं।
व्यावहारिक डिज़ाइन के महत्वपूर्ण बिंदु
निम्नलिखित व्यावहारिक बिंदु असिंक्रोनस प्रोसेसिंग डिज़ाइन के लिए उपयोगी हैं।
-
समकालिकता को स्पष्ट रूप से नियंत्रित करें जब API या DB जैसे संसाधनों की सीमाएं हों, आप
Semaphoreके माध्यम से समकालिक निष्पादन की संख्या सीमित कर सकते हैं। -
साझा संसाधनों को सुरक्षित रूप से संभालें अगर आपको कई टास्क से स्टेट अपडेट करनी है, तो
Lockका उपयोग करें। साझा स्टेट को कम करना और अपरिवर्तनीय डेटा पर डिज़ाइन करना चीजों को और सुरक्षित बनाता है। -
परिणाम प्राप्त करने का तरीका चुनें अगर आप कार्यों को उनके समाप्त होते ही प्रोसेस करना चाहते हैं, तो
asyncio.as_completedका उपयोग करें; अगर आप इनपुट क्रम में प्रोसेस करना चाहते हैं, तोgatherका उपयोग करें। -
भारी सिंक्रोनस प्रोसेसिंग को अलग करें CPU-केंद्रित या सिंक्रोनस लाइब्रेरी कॉल के लिए, इवेंट लूप को ब्लॉक होने से बचाने हेतु
run_in_executorयाProcessPoolExecutorका प्रयोग करें। -
रद्दीकरण और अपवादों के लिए पूर्व योजना बनाएं सही अपवाद हैंडलिंग लिखें ताकि यदि कोई टास्क बीच में रद्द हो जाए तब भी संसाधनों की सुरक्षा के साथ सफाई हो सके।
-
टेस्टिंग को आसान बनाएं I/O, समय, और रैंडमनेस जैसे साइड इफेक्ट को संक्षिप्त करें ताकि उन्हें बदला जा सके और असिंक्रोनस कोड की टेस्टिंग आसान हो।
सारांश
asyncio शक्तिशाली है, लेकिन यदि आप केवल “चीजों को समानांतर में चलाने” पर ध्यान दें, तो साझा संसाधनों की होड़, संसाधन सीमा उल्लंघन या इवेंट लूप ब्लॉकिंग जैसी समस्याएं आ सकती हैं। Semaphore, Lock, Event, Queue, run_in_executor, और उचित कैंसिलेशन हैंडलिंग को मिला कर, आप सुरक्षित और कुशल असिंक्रोनस एप्लीकेशन डिज़ाइन कर सकते हैं। प्रोड्यूसर-कंज़्यूमर पैटर्न, समकालिकता सीमित करना या असिंक्रोनस और ब्लॉकिंग प्रोसेसिंग को अलग करना, जैसी प्रणालियों का उपयोग करके असिंक्रोनस कार्य-पद्धतियाँ और अधिक सुरक्षित व कुशलतापूर्वक तैयार की जा सकती हैं।
आप हमारे YouTube चैनल पर Visual Studio Code का उपयोग करके ऊपर दिए गए लेख के साथ आगे बढ़ सकते हैं। कृपया YouTube चैनल को भी देखें।