असिंक्रोनस इनपुट/आउटपुट

असिंक्रोनस इनपुट/आउटपुट

यह लेख असिंक्रोनस इनपुट/आउटपुट को समझाता है।

यह गाइड आपको पाइथन में व्यावहारिक रूप से उपयोगी असिंक्रोनस इनपुट/आउटपुट की अवधारणाओं और पैटर्न्स को कदम दर कदम सरलता से समझाती है।

YouTube Video

असिंक्रोनस इनपुट/आउटपुट (I/O)

असिंक्रोनस I/O की संकल्पना

असिंक्रोनस I/O एक ऐसी प्रक्रिया है जो समय लेने वाले I/O (जैसे फाइल ऑपरेशन या नेटवर्क कम्युनिकेशन) का इंतजार करते हुए अन्य ऑपरेशनों को समानांतर में चलने की अनुमति देती है। पाइथन में, asyncio एक मानक असिंक्रोनस फ्रेमवर्क के रूप में प्रदान किया जाता है, और कई लाइब्रेरीज़ इसी मैकेनिज्म का पालन करने के लिए डिज़ाइन की गई हैं।

मूल बातें: async / await और इवेंट लूप

सबसे पहले, यहां मूल कोरूटीन लिखने का तरीका और asyncio.gather का उपयोग कर कई कोरूटीन एकसाथ चलाने का उदाहरण है।

नीचे दिया गया कोड एक न्यूनतम उदाहरण है जिसमें असिंक्रोनस फंक्शंस को एक साथ परिभाषित और चलाया गया है। sleep फंक्शन का उपयोग समानांतर निष्पादन को दिखाने के लिए किया गया है।

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • यह कोड asyncio.run() के माध्यम से इवेंट लूप शुरू करता है और तीन कोरूटीन को एक साथ चलाता है।

async with और असिंक्रोनस कॉन्टेक्स्ट मैनेजर

असिंक्रोनस प्रोसेसिंग में कनेक्शन खोलना या फाइलें बंद करना जैसी रिसोर्स मैनेजमेंट आसानी से जटिल हो सकती है। यहां async with का उपयोग करने वाले असिंक्रोनस कॉन्टेक्स्ट मैनेजर उपयोगी हो जाते हैं। इस सिंटैक्स का उपयोग सिंक्रोनस with स्टेटमेंट की तरह ही किया जाता है, लेकिन आंतरिक प्रोसेसिंग असिंक्रोनस होती है, इसलिए यह async/await फ्लो में स्वाभाविक रूप से फिट होता है।

async with का उपयोग करने के दो मुख्य कारण हैं:।

  • कनेक्शन, फाइल हैंडल, या सेशन जैसे संसाधनों को विश्वसनीय रूप से साफ करने के लिए। आप निश्चिंत रह सकते हैं कि संसाधन सही तरीके से जारी किए जाएंगे, भले ही कोई असामान्य समाप्ति हो जाए।
  • कनेक्शन स्थापित करना या बंद करना और फ्लशिंग जैसी इनिशियलाइजेशन व क्लीनअप टास्क को असिंक्रोनस तरीके से स्वचालित करने के लिए। यह मैन्युअल कोडिंग की झंझट को बचाता है और आपके कोड को अधिक स्पष्ट बनाता है।

नीचे स्क्रैच से एक साधारण असिंक्रोनस कॉन्टेक्स्ट मैनेजर बनाने का उदाहरण है।

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • __aenter__ और __aexit__ को परिभाषित करके, आप async with का उपयोग कर सकते हैं।
  • async with ब्लॉक में प्रवेश और निकासी के दौरान प्रोसेसिंग असिंक्रोनस और सुरक्षित रूप से होती है।

असिंक्रोनस फाइल I/O (aiofiles)

फाइल ऑपरेशन ब्लॉकिंग का एक क्लासिक उदाहरण हैं। aiofiles का उपयोग करके आप फाइल ऑपरेशन्स को असिंक्रोनस रूप से सुरक्षित तरीके से कर सकते हैं। आंतरिक रूप से, यह एक थ्रेड पूल का उपयोग करता है और सुनिश्चित करता है कि फ़ाइलें async with के माध्यम से सही तरीके से बंद हो जाएं।

निम्न उदाहरण एक साथ कई फाइलों को असिंक्रोनस रूप से पढ़ने का प्रदर्शन करता है। इस कोड को चलाने से पहले आपको pip install aiofiles के साथ aiofiles इंस्टॉल करना होगा।

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • यह कोड प्रत्येक फाइल की पढ़ाई को समानांतर बनाता है। aiofiles आमतौर पर आंतरिक रूप से थ्रेड पूल का उपयोग करता है, जिससे आप ब्लॉकिंग फाइल I/O को असिंक्रोनस इंटरफेस से नियंत्रित कर सकते हैं।

असिंक्रोनस HTTP क्लाइंट (aiohttp)

नेटवर्क I/O का एक क्लासिक उदाहरण है HTTP रिक्वेस्ट को असिंक्रोनस रूप में करना, जैसा कि यहां दिखाया गया है। जब आपको समानांतर में बड़ी संख्या में HTTP रिक्वेस्ट करनी होती है, तो यह विशेष रूप से शक्तिशाली है।

नीचे aiohttp का उपयोग करके कई URL समानांतर में प्राप्त करने का उदाहरण है। आपको pip install aiohttp के साथ aiohttp इंस्टॉल करना होगा।

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • asyncio.as_completed का उपयोग करके आप परिणामों को वैसे ही प्रोसेस कर सकते हैं जैसे टास्क पूरे हों। यह कई अनुरोधों को कुशलता से संभालने के लिए उपयोगी है।

ब्लॉकिंग I/O के साथ सह-अस्तित्व: run_in_executor

जब आप असिंक्रोनस कोड में CPU-गहन कार्यों या मौजूदा ब्लॉकिंग API के साथ काम कर रहे हों, तो loop.run_in_executor के माध्यम से ThreadPoolExecutor या ProcessPoolExecutor का उपयोग करें।

निम्नलिखित कोड एक उदाहरण है जिसमें थ्रेड पूल का उपयोग करके ब्लॉकिंग I/O वाले कार्यों को एक साथ चलाया गया है।

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • run_in_executor का उपयोग करके, आप मौजूदा सिंक्रोनस कोड को बिना विशेष बदलाव किए असिंक्रोनस फ्लो में शामिल कर सकते हैं। हालांकि, आपको थ्रेड्स की संख्या और CPU लोड पर ध्यान देना चाहिए।
  • ProcessPoolExecutor CPU-गहन कार्यों के लिए उपयुक्त है।

असिंक्रोनस सर्वर: asyncio आधारित TCP इको सर्वर

अगर आप सॉकेट्स को सीधे संभालना चाहते हैं, तो आप asyncio.start_server का उपयोग करके आसानी से असिंक्रोनस सर्वर बना सकते हैं।

निम्नलिखित उदाहरण एक साधारण इको सर्वर है जो क्लाइंट से प्राप्त डेटा को वैसा ही लौटाता है।

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • asyncio के साथ TCP संचार में, StreamReader और StreamWriter असिंक्रोनस इनपुट और आउटपुट में केंद्रीय भूमिका निभाते हैं। StreamReader क्लाइंट से भेजा गया डेटा असिंक्रोनस रूप से पढ़ता है, जबकि StreamWriter का उपयोग सर्वर से क्लाइंट को प्रतिक्रिया भेजने के लिए किया जाता है।

  • आप सॉकेट्स के विस्तार से संचालन को खुद संभाले बिना भी, asyncio.start_server का उपयोग करके आसानी से और प्रभावी तरीक़े से एक असिंक्रोनस सर्वर चालू कर सकते हैं।

  • जब आप asyncio.start_server को हैंडलर फ़ंक्शन पास करते हैं, तो वह फ़ंक्शन अपने आर्गुमेंट्स के रूप में reader और writer प्राप्त करता है। इनका उपयोग करके, आप सीधे लो-लेवल सॉकेट एपीआई को संभालने की तुलना में अधिक सुरक्षित और स्पष्ट तरीक़े से कम्यूनिकेशन प्रक्रिया लागू कर सकते हैं। उदाहरण के लिए, reader.read() से डेटा प्राप्त करके और writer.write() को writer.drain() के साथ जोड़कर, आप ऐसा असिंक्रोनस ट्रांसमिशन कर सकते हैं जिससे ट्रांसमिशन पूरी तरह से हो सके।

  • यह सेटअप बड़ी संख्या में एक साथ कनेक्शन संभालने के लिए उपयुक्त है और सरल प्रोटोकॉल या छोटे पैमाने की TCP सेवाओं के लिए आदर्श है।

बड़े स्ट्रीमिंग डेटा को संभालना

बड़ी फाइलों या रिस्पॉन्स को क्रमवार प्रोसेस करते समय, डेटा को टुकड़ों (chunks) में पढ़ें और लिखें ताकि मेमोरी उपयोग कम रहे। नीचे aiohttp का उपयोग करके स्ट्रीमिंग रीड का उदाहरण है।

निम्न कोड HTTP रिस्पॉन्स को टुकड़ों (chunks) में प्रोसेस करता है और डेटा मिलते ही उसे डिस्क पर लिखता है।

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • यह कोड एक बड़ी फ़ाइल को एक साथ नहीं लोड करता; इसके बजाय, यह डेटा को छोटे-छोटे टुकड़ों (chunks) में प्राप्त करता है और उसे असिंक्रोनस रूप से फ़ाइल में लिखता है। इसका परिणाम यह है कि यह कम मेमोरी उपयोग के साथ तेज़ी और कुशलता से डाउनलोड कर सकता है। aiohttp असिंक्रोनस रूप से डेटा प्राप्त करता है और aiofiles बिना ब्लॉक किए फ़ाइल में लिखता है, जिससे अन्य प्रक्रियाओं के साथ इसे चलाना आसान होता है।

  • यह तरीका बड़ी फाइलों को कुशलता से डाउनलोड और सेव करने के लिए उपयुक्त है, और मेमोरी खपत को न्यूनतम करता है।

सबप्रोसेस का असिंक्रोनस निष्पादन

अगर आप बाहरी कमांड्स को असिंक्रोनस रूप से चलाना चाहते हैं और उनके आउटपुट को वास्तविक समय में पढ़ना चाहते हैं, तो asyncio.create_subprocess_exec उपयोगी है।

नीचे एक बाहरी कमांड शुरू करने और उसके स्टैंडर्ड आउटपुट को असली समय में पढ़ने का उदाहरण है।

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • सबप्रोसेस को असिंक्रोनस रूप से नियंत्रित करके, आप बाहरी टूल्स के लॉग वास्तविक समय में संभाल सकते हैं या कई प्रोसेसों को एक साथ चला सकते हैं।

रद्द करना और टाइमआउट संभालना

असिंक्रोनस टास्क रद्द किए जा सकते हैं। टाइमआउट लागू करने के लिए asyncio.wait_for का उपयोग करना आसान है।

नीचे एक टास्क को टाइमआउट के साथ चलाने का उदाहरण है।

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • wait_for समय सीमा पार होने पर TimeoutError देता है और आवश्यकता होने पर टास्क को रद्द कर देता है। टास्क कैंसलेशन के प्रचार और क्लीनअप में सावधानी बरतें।

कनकरेंसी नियंत्रित करना (Semaphore)

क्योंकि बहुत सारे एकसाथ कनेक्शन या अनुरोध संसाधनों को थका सकते हैं, इसलिए asyncio.Semaphore से कनकरेंसी सीमित करें।

नीचे एक सेमाफोर का उपयोग करके एक साथ डाउनलोड की सीमा निर्धारित करने का उदाहरण है।

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • इस विधि से, आप बाहरी सेवाओं तक धीरे-धीरे पहुँच सकते हैं और अपने प्रोसेस पर अधिक लोड होने से बच सकते हैं।

एरर हैंडलिंग और पुन: प्रयास (रिट्राई) रणनीतियाँ

गलतियाँ तो असिंक्रोनस प्रोसेसिंग में भी स्वाभाविक रूप से हो सकती हैं। अपवादों को सही ढंग से पकड़ें और एक्सपोनेंशियल बैकऑफ जैसी पुन: प्रयास रणनीतियाँ लागू करें।

नीचे N बार तक साधारण पुन: प्रयास (रिट्राई) की एक उदाहरण कार्यान्वयन है।

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • संगति और ट्रैफिक नियंत्रण के संतुलन के लिए उचित पुन: प्रयास लॉजिक महत्वपूर्ण है।

डिबगिंग और लॉगिंग के लिए सुझाव

असिंक्रोनस प्रोसेसिंग में टास्क्स एक साथ चलते हैं, जिससे समस्याओं का कारण पता लगाना मुश्किल हो सकता है। समस्याओं को कुशलता से ट्रैक करने के लिए निम्नलिखित बिंदुओं को ध्यान में रखना डिबगिंग को आसान बनाता है।

  • asyncio.run() और Task से अपवादों को नज़रअंदाज़ करना आसान है, इसलिए बिना संभाले गए अपवादों को लॉग जरूर करें।
  • logging का उपयोग करते समय, अपने लॉग्स में कोरूटीन का नाम या (Python 3.8 और ऊपर में) task.get_name() शामिल करने से ट्रैकिंग आसान हो जाती है।
  • आप asyncio.Task.all_tasks() का उपयोग करके कार्यों की वर्तमान स्थिति देख सकते हैं। हालाँकि, यह एपीआई केवल डिबगिंग के लिए है, और प्रदर्शन की समस्याओं या अप्रत्याशित हस्तक्षेप से बचने के लिए इसे प्रोडक्शन में सावधानी से इस्तेमाल करना चाहिए।

प्रदर्शन (परफॉर्मेंस) संबंधी विचार

जहाँ असिंक्रोनस प्रोग्रामिंग I/O इंतजार को संभालने में उत्कृष्ट है, वहीं अनुचित उपयोग से प्रदर्शन खराब भी हो सकता है। निम्नलिखित बिंदुओं को ध्यान में रखकर ऑप्टिमाइज़ करें:।

  • असिंक्रोनस प्रोसेसिंग I/O-बाउंड कार्यों में तो बेहतरीन है, लेकिन CPU-बाउंड टास्क के लिए उपयुक्त नहीं है; ऐसे मामलों में प्रोसेस पूल का उपयोग करें।
  • थ्रेड या प्रोसेस पूल का उपयोग करते समय, पूल का आकार और टास्क की प्रकृति पर ध्यान दें।
  • अगर आप एक साथ बहुत सारे छोटे टास्क्स शुरू करते हैं तो इवेंट लूप ओवरहेड बढ़ता है—इसलिए समायोजन के लिए बैचिंग या सेमाफ़ोर का उपयोग करें।

सारांश

पाइथन का असिंक्रोनस I/O एक शक्तिशाली तंत्र है जो I/O इंतजार के समय का प्रभावी उपयोग करता है और नेट्वर्क व फाइल ऑपरेशन्स को एक साथ कुशलता से निष्पादित करता है। asyncio, aiohttp, aiofiles और run_in_executor जैसी तकनीकों को मिलाकर, आप लचीले ढंग से व्यावहारिक असिंक्रोनस एप्लिकेशन बना सकते हैं। रिसोर्स के अधिग्रहण और मुक्तिकरण को स्वचालित करने के लिए async with का उपयोग करने से आप फाइल, HTTP सत्र और लॉक जैसी असिंक्रोनस संसाधनों को सुरक्षित और विश्वसनीय तरीके से प्रबंधित कर सकते हैं। उचित एरर हैंडलिंग और कनकरेंसी मैनेजमेंट को शामिल करके, आप उच्च विश्वसनीयता वाले असिंक्रोनस प्रोग्राम को सुरक्षित रूप से चला सकते हैं।

आप हमारे YouTube चैनल पर Visual Studio Code का उपयोग करके ऊपर दिए गए लेख के साथ आगे बढ़ सकते हैं। कृपया YouTube चैनल को भी देखें।

YouTube Video