पाइथन असिंक्रोनस प्रोसेसिंग में समकालिकता नियंत्रण

पाइथन असिंक्रोनस प्रोसेसिंग में समकालिकता नियंत्रण

यह लेख पाइथन असिंक्रोनस प्रोसेसिंग में समकालिकता नियंत्रण की व्याख्या करता है।

आप चरण-दर-चरण सीखेंगे, asyncio की मूल बातें से लेकर वे व्यावहारिक पैटर्न तक, जो सामान्यतः समकालSynchronization नियंत्रण के लिए उपयोग किए जाते हैं।

YouTube Video

पाइथन असिंक्रोनस प्रोसेसिंग में समकालिकता नियंत्रण

असिंक्रोनस प्रोसेसिंग में, आप आसानी से कई कार्य एक साथ चला सकते हैं। हालांकि, व्यवहार में, समकालिकता नियंत्रण, कार्यों का समन्वय, साझा संसाधनों का विशेष नियंत्रण, भारी सिंक्रोनस प्रोसेसिंग से निपटना, और रद्दीकरण के बाद सफाई जैसे और उन्नत समायोजन आवश्यक होते हैं।

यहाँ, हम asyncio की बुनियादी बातों से लेकर सिंक्रोनाइजेशन के लिए आमतौर पर उपयोग किए जाने वाले व्यावहारिक पैटर्न तक चरण दर चरण सीखेंगे।

परिचय: मूल बातें (async / await और create_task)

चलिए पहले कुछ न्यूनतम असिंक्रोनस कोड देखते हैं। await उस बिंदु पर तब तक प्रतीक्षा करता है जब तक कि बुलाया गया coroutine पूरा नहीं हो जाता, और asyncio.create_task किसी टास्क को समकालिक निष्पादन के लिए अनुसूचित करता है।

 1import asyncio
 2
 3async def worker(name, delay):
 4    # Simulate async work
 5    await asyncio.sleep(delay)
 6    return f"{name} done after {delay}s"
 7
 8async def main():
 9    # Create two tasks that run concurrently.
10    t1 = asyncio.create_task(worker("A", 1))
11    t2 = asyncio.create_task(worker("B", 2))
12
13    # Await both results (this suspends until both are done).
14    result1 = await t1
15    result2 = await t2
16    print(result1, result2)
17
18if __name__ == "__main__":
19    asyncio.run(main())
  • यह कोड एक विशिष्ट पैटर्न है जहाँ कार्यों को स्पष्ट रूप से बनाया जाता है, समानांतर में चलाया जाता है, और परिणाम अंत में await के साथ प्राप्त किए जाते हैं। create_task समकालिक निष्पादन को सक्षम बनाता है।

asyncio.gather, asyncio.wait, और asyncio.as_completed के बीच अंतर

जब आप एक साथ कई कोरूटीन चलाते हैं, तो परिणाम प्राप्त करने के तरीके के अनुसार आप उपयोग करने के लिए विकल्प चुनते हैं। gather सभी के समाप्त होने की प्रतीक्षा करता है और इनपुट क्रम में परिणाम लौटाता है, जबकि as_completed परिणाम प्राप्त होते ही उन्हें प्रोसेस करने की अनुमति देता है, क्रम की परवाह किए बिना।

 1import asyncio
 2import random
 3
 4async def job(i):
 5    delay = random.random() * 2
 6    await asyncio.sleep(delay)
 7    return (i, delay)
 8
 9async def gather_example():
10    # gather waits for all tasks and returns results in the same order as input
11    results = await asyncio.gather(*(job(i) for i in range(5)))
12    print("gather order:", results)
13
14async def as_completed_example():
15    # as_completed yields results as they finish (useful to process early results)
16    tasks = [asyncio.create_task(job(i)) for i in range(5)]
17    for coro in asyncio.as_completed(tasks):
18        res = await coro
19        print("completed:", res)
20
21async def main():
22    await gather_example()
23    await as_completed_example()
24
25if __name__ == "__main__":
26    asyncio.run(main())
  • जैसा कि इस कोड में दिखाया गया है, gather परिणाम इनपुट क्रम में लौटाता है, जिससे यह तब उपयोगी बनता है जब आप क्रम संरक्षित रखना चाहते हैं। as_completed तब उपयोग किया जाता है जब आप परिणामों को पूरा होते ही प्रोसेस करना चाहते हैं।

समकालिकता नियंत्रण: asyncio.Semaphore के साथ एक साथ निष्पादन को सीमित करना

जब बाहरी API रेट लिमिट या DB कनेक्शन लिमिट हो, तो आप Semaphore के माध्यम से समकालिक निष्पादन को नियंत्रित कर सकते हैं।

 1import asyncio
 2import random
 3
 4sem = asyncio.Semaphore(3)  # allow up to 3 concurrent workers
 5
 6async def limited_worker(i):
 7    async with sem:
 8        # Only 3 tasks can be inside this block concurrently
 9        delay = random.random() * 2
10        await asyncio.sleep(delay)
11        print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14    tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15    await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18    asyncio.run(main())
  • async with के साथ Semaphore का उपयोग करके, आप एक साथ निष्पादन की संख्या आसानी से सीमित कर सकते हैं। यह बाहरी बाधाओं वाले हालात में प्रभावी है।

साझा संसाधनों का विशेष नियंत्रण: asyncio.Lock

Lock साझा डेटा में एक साथ अपडेट को रोकने के लिए उपयोग किया जाता है। asyncio.Lock असिंक्रोनस उपयोग के लिए एक विशिष्ट प्राथमिकता है।

 1import asyncio
 2
 3counter = 0
 4lock = asyncio.Lock()
 5
 6async def incrementer(n_times):
 7    global counter
 8    for _ in range(n_times):
 9        # Acquire lock to update shared counter safely
10        async with lock:
11            tmp = counter
12            await asyncio.sleep(0)  # yield control to increase race likelihood
13            counter = tmp + 1
14
15async def main():
16    tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17    await asyncio.gather(*tasks)
18    print("final counter:", counter)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • अगर कई कार्य एक साझा वेरिएबल (जैसे कि ग्लोबल counter) को अपडेट करते हैं, तो टकराव हो सकता है। Lock के साथ ऑपरेशनों को समेट कर, आप एकरूपता बनाए रख सकते हैं।

टास्क समन्वय: asyncio.Event

Event तब उपयोग किया जाता है जब एक टास्क सिग्नल करता है कि वह तैयार है और अन्य कार्य इस सिग्नल की प्रतीक्षा करते हैं। यह कार्यों के बीच सिग्नल साझा करने और एक-दूसरे के साथ सिंक्रनाइज़ करने का एक आसान तरीका है।

 1import asyncio
 2
 3event = asyncio.Event()
 4
 5async def waiter(name):
 6    print(f"{name} is waiting for event")
 7    await event.wait()
 8    print(f"{name} resumed after event set")
 9
10async def setter():
11    print("setter will sleep and then set the event")
12    await asyncio.sleep(1)
13    event.set()
14    print("event set by setter")
15
16async def main():
17    tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18    await asyncio.create_task(setter())
19    await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22    asyncio.run(main())
  • Event के पास एक बूलियन फ्लैग होता है, और set() कॉल करने पर सभी प्रतीक्षा कर रहे टास्क फिर से शुरू हो जाते हैं। यह सादी सिंक्रनाइज़ेशन के लिए उपयोगी है।

प्रोड्यूसर-कंज़्यूमर पैटर्न: asyncio.Queue

Queue का उपयोग करके, डेटा बनाने वाले प्रोड्यूसर और डेटा प्रोसेस करने वाले कंज़्यूमर सुचारू और असिंक्रोनस रूप से समन्वय कर सकते हैं। साथ ही, जब queue भर जाती है तो प्रोड्यूसर स्वतः प्रतीक्षा करते हैं, इससे अतिरिक्त उत्पादन रोकने के लिए स्वाभाविक रूप से बैकप्रेशर लागू होता है।

 1import asyncio
 2import random
 3
 4async def producer(q, n_items):
 5    for i in range(n_items):
 6        await asyncio.sleep(random.random() * 0.5)
 7        item = f"item-{i}"
 8        await q.put(item)
 9        print("produced", item)
10    # signal consumers to stop
11    await q.put(None)
12
13async def consumer(q, name):
14    while True:
15        item = await q.get()
16        if item is None:
17            # put sentinel back for other consumers and break
18            await q.put(None)
19            break
20        await asyncio.sleep(random.random() * 1)
21        print(name, "consumed", item)
22        q.task_done()
23
24async def main():
25    q = asyncio.Queue(maxsize=5)  # bounded queue to apply backpressure
26    prod = asyncio.create_task(producer(q, 10))
27    cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28    await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31    asyncio.run(main())
  • Queue प्रोड्यूसर और कंज़्यूमर के असिंक्रोनस समन्वय में मदद करता है। साथ ही, maxsize सेट करने से जब queue पूरी भर जाती है, तो प्रोड्यूसर put पर प्रतीक्षा करता है, जिससे अतिरिक्त उत्पादन रुकता है।

समकालिक ब्लॉकिंग संचालन को संभालना: run_in_executor

CPU-केंद्रित प्रोसेसिंग या ऐसी लाइब्रेरी का उपयोग करते समय जो async समर्थित न हो, प्रोसेसिंग को किसी अन्य थ्रेड या प्रोसेस को सौंपने के लिए run_in_executor का उपयोग करें। ऐसा करने से मुख्य इवेंट लूप रुकता नहीं है, जिससे अन्य असिंक्रोनस टास्क आसानी से चल सकते हैं।

 1import asyncio
 2import time
 3
 4def blocking_io(x):
 5    # simulate blocking I/O or CPU-bound work
 6    time.sleep(2)
 7    return x * x
 8
 9async def main():
10    loop = asyncio.get_running_loop()
11    # run blocking_io in default thread pool
12    result = await loop.run_in_executor(None, blocking_io, 3)
13    print("blocking result:", result)
14
15if __name__ == "__main__":
16    asyncio.run(main())
  • सिंक्रोनस फंक्शनों को सीधे कॉल करना इवेंट लूप को ब्लॉक कर देगा। run_in_executor के साथ, कोड एक अलग थ्रेड में चलता है और असिंक्रोनस टास्क समानांतर में प्रगति कर सकते हैं।

उदाहरण: सीमित दर वाली API कॉल्स (Semaphore + run_in_executor का संयोजन)

निम्न एक उदाहरण है जहाँ API कॉल्स की दर सीमित है और परिणामों पर भारी प्रोसेसिंग की जाती है। Semaphore और run_in_executor का संयोजन प्रोसेसिंग को सुरक्षित और कुशलता से संचालित करने देता है।

 1import asyncio
 2import time
 3import random
 4
 5sem = asyncio.Semaphore(5)
 6
 7def heavy_sync_processing(data):
 8    # simulate heavy CPU-bound work
 9    time.sleep(1)
10    return f"processed-{data}"
11
12async def api_call(i):
13    await asyncio.sleep(random.random() * 0.5)  # simulate network latency
14    return f"data-{i}"
15
16async def worker(i):
17    async with sem:
18        data = await api_call(i)
19        # offload CPU-bound work to threadpool
20        loop = asyncio.get_running_loop()
21        result = await loop.run_in_executor(None, heavy_sync_processing, data)
22        print(result)
23
24async def main():
25    tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26    await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29    asyncio.run(main())
  • हम समकालिक API कॉल्स की संख्या सीमित करने के लिए Semaphore का उपयोग करते हैं, और परिणामस्वरूप डेटा पर भारी प्रोसेसिंग को एक थ्रेड पूल को सौंपते हैं। नेटवर्क और CPU प्रोसेसिंग को अलग करने से दक्षता बढ़ती है।

टास्क रद्द करना और सफाई

जब कोई टास्क रद्द हो, तब finally और asyncio.CancelledError को सही तरीके से संभालना बहुत जरूरी है। यह सुनिश्चित करता है कि फाइलें और कनेक्शन मुक्त हो जाएं और मध्यवर्ती अवस्थाएं सही ढंग से संभाल ली जाएं, जिससे एप्लीकेशन की एकरूपता बनी रहती है।

 1import asyncio
 2
 3async def long_running():
 4    try:
 5        print("started long_running")
 6        await asyncio.sleep(10)
 7        print("finished long_running")
 8    except asyncio.CancelledError:
 9        print("long_running was cancelled, cleaning up")
10        # perform cleanup here
11        raise
12
13async def main():
14    task = asyncio.create_task(long_running())
15    await asyncio.sleep(1)
16    task.cancel()
17    try:
18        await task
19    except asyncio.CancelledError:
20        print("task cancelled in main")
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • रद्द करने की सूचना एक अपवाद (CancelledError) के रूप में दी जाती है, इसलिए आवश्यक सफाई कार्य except ब्लॉक में करें और आवश्यक हो तो अपवाद को फिर से उठाएं।

व्यावहारिक डिज़ाइन के महत्वपूर्ण बिंदु

निम्नलिखित व्यावहारिक बिंदु असिंक्रोनस प्रोसेसिंग डिज़ाइन के लिए उपयोगी हैं।

  • समकालिकता को स्पष्ट रूप से नियंत्रित करें जब API या DB जैसे संसाधनों की सीमाएं हों, आप Semaphore के माध्यम से समकालिक निष्पादन की संख्या सीमित कर सकते हैं।

  • साझा संसाधनों को सुरक्षित रूप से संभालें अगर आपको कई टास्क से स्टेट अपडेट करनी है, तो Lock का उपयोग करें। साझा स्टेट को कम करना और अपरिवर्तनीय डेटा पर डिज़ाइन करना चीजों को और सुरक्षित बनाता है।

  • परिणाम प्राप्त करने का तरीका चुनें अगर आप कार्यों को उनके समाप्त होते ही प्रोसेस करना चाहते हैं, तो asyncio.as_completed का उपयोग करें; अगर आप इनपुट क्रम में प्रोसेस करना चाहते हैं, तो gather का उपयोग करें।

  • भारी सिंक्रोनस प्रोसेसिंग को अलग करें CPU-केंद्रित या सिंक्रोनस लाइब्रेरी कॉल के लिए, इवेंट लूप को ब्लॉक होने से बचाने हेतु run_in_executor या ProcessPoolExecutor का प्रयोग करें।

  • रद्दीकरण और अपवादों के लिए पूर्व योजना बनाएं सही अपवाद हैंडलिंग लिखें ताकि यदि कोई टास्क बीच में रद्द हो जाए तब भी संसाधनों की सुरक्षा के साथ सफाई हो सके।

  • टेस्टिंग को आसान बनाएं I/O, समय, और रैंडमनेस जैसे साइड इफेक्ट को संक्षिप्त करें ताकि उन्हें बदला जा सके और असिंक्रोनस कोड की टेस्टिंग आसान हो।

सारांश

asyncio शक्तिशाली है, लेकिन यदि आप केवल “चीजों को समानांतर में चलाने” पर ध्यान दें, तो साझा संसाधनों की होड़, संसाधन सीमा उल्लंघन या इवेंट लूप ब्लॉकिंग जैसी समस्याएं आ सकती हैं। Semaphore, Lock, Event, Queue, run_in_executor, और उचित कैंसिलेशन हैंडलिंग को मिला कर, आप सुरक्षित और कुशल असिंक्रोनस एप्लीकेशन डिज़ाइन कर सकते हैं। प्रोड्यूसर-कंज़्यूमर पैटर्न, समकालिकता सीमित करना या असिंक्रोनस और ब्लॉकिंग प्रोसेसिंग को अलग करना, जैसी प्रणालियों का उपयोग करके असिंक्रोनस कार्य-पद्धतियाँ और अधिक सुरक्षित व कुशलतापूर्वक तैयार की जा सकती हैं।

आप हमारे YouTube चैनल पर Visual Studio Code का उपयोग करके ऊपर दिए गए लेख के साथ आगे बढ़ सकते हैं। कृपया YouTube चैनल को भी देखें।

YouTube Video