असिंक्रोनस इनपुट/आउटपुट
यह लेख असिंक्रोनस इनपुट/आउटपुट को समझाता है।
यह गाइड आपको पाइथन में व्यावहारिक रूप से उपयोगी असिंक्रोनस इनपुट/आउटपुट की अवधारणाओं और पैटर्न्स को कदम दर कदम सरलता से समझाती है।
YouTube Video
असिंक्रोनस इनपुट/आउटपुट (I/O)
असिंक्रोनस I/O की संकल्पना
असिंक्रोनस I/O एक ऐसी प्रक्रिया है जो समय लेने वाले I/O (जैसे फाइल ऑपरेशन या नेटवर्क कम्युनिकेशन) का इंतजार करते हुए अन्य ऑपरेशनों को समानांतर में चलने की अनुमति देती है। पाइथन में, asyncio एक मानक असिंक्रोनस फ्रेमवर्क के रूप में प्रदान किया जाता है, और कई लाइब्रेरीज़ इसी मैकेनिज्म का पालन करने के लिए डिज़ाइन की गई हैं।
मूल बातें: async / await और इवेंट लूप
सबसे पहले, यहां मूल कोरूटीन लिखने का तरीका और asyncio.gather का उपयोग कर कई कोरूटीन एकसाथ चलाने का उदाहरण है।
नीचे दिया गया कोड एक न्यूनतम उदाहरण है जिसमें असिंक्रोनस फंक्शंस को एक साथ परिभाषित और चलाया गया है। sleep फंक्शन का उपयोग समानांतर निष्पादन को दिखाने के लिए किया गया है।
1import asyncio
2
3async def worker(name: str, delay: float):
4 # Simulate I/O-bound work by sleeping
5 print(f"{name} started")
6 await asyncio.sleep(delay)
7 print(f"{name} finished after {delay}s")
8 return name, delay
9
10async def main():
11 # Run multiple coroutines concurrently
12 tasks = [
13 worker("A", 1.5),
14 worker("B", 1.0),
15 worker("C", 0.5),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())- यह कोड
asyncio.run()के माध्यम से इवेंट लूप शुरू करता है और तीन कोरूटीन को एक साथ चलाता है।
async with और असिंक्रोनस कॉन्टेक्स्ट मैनेजर
असिंक्रोनस प्रोसेसिंग में कनेक्शन खोलना या फाइलें बंद करना जैसी रिसोर्स मैनेजमेंट आसानी से जटिल हो सकती है। यहां async with का उपयोग करने वाले असिंक्रोनस कॉन्टेक्स्ट मैनेजर उपयोगी हो जाते हैं। इस सिंटैक्स का उपयोग सिंक्रोनस with स्टेटमेंट की तरह ही किया जाता है, लेकिन आंतरिक प्रोसेसिंग असिंक्रोनस होती है, इसलिए यह async/await फ्लो में स्वाभाविक रूप से फिट होता है।
async with का उपयोग करने के दो मुख्य कारण हैं:।
- कनेक्शन, फाइल हैंडल, या सेशन जैसे संसाधनों को विश्वसनीय रूप से साफ करने के लिए। आप निश्चिंत रह सकते हैं कि संसाधन सही तरीके से जारी किए जाएंगे, भले ही कोई असामान्य समाप्ति हो जाए।
- कनेक्शन स्थापित करना या बंद करना और फ्लशिंग जैसी इनिशियलाइजेशन व क्लीनअप टास्क को असिंक्रोनस तरीके से स्वचालित करने के लिए। यह मैन्युअल कोडिंग की झंझट को बचाता है और आपके कोड को अधिक स्पष्ट बनाता है।
नीचे स्क्रैच से एक साधारण असिंक्रोनस कॉन्टेक्स्ट मैनेजर बनाने का उदाहरण है।
1import asyncio
2
3class AsyncResource:
4 async def __aenter__(self):
5 print("Opening resource...")
6 await asyncio.sleep(0.5)
7 print("Resource opened")
8 return self
9
10 async def __aexit__(self, exc_type, exc, tb):
11 print("Closing resource...")
12 await asyncio.sleep(0.5)
13 print("Resource closed")
14
15async def main():
16 async with AsyncResource() as r:
17 print("Using resource...")
18
19if __name__ == "__main__":
20 asyncio.run(main())__aenter__और__aexit__को परिभाषित करके, आपasync withका उपयोग कर सकते हैं।async withब्लॉक में प्रवेश और निकासी के दौरान प्रोसेसिंग असिंक्रोनस और सुरक्षित रूप से होती है।
असिंक्रोनस फाइल I/O (aiofiles)
फाइल ऑपरेशन ब्लॉकिंग का एक क्लासिक उदाहरण हैं। aiofiles का उपयोग करके आप फाइल ऑपरेशन्स को असिंक्रोनस रूप से सुरक्षित तरीके से कर सकते हैं। आंतरिक रूप से, यह एक थ्रेड पूल का उपयोग करता है और सुनिश्चित करता है कि फ़ाइलें async with के माध्यम से सही तरीके से बंद हो जाएं।
निम्न उदाहरण एक साथ कई फाइलों को असिंक्रोनस रूप से पढ़ने का प्रदर्शन करता है। इस कोड को चलाने से पहले आपको pip install aiofiles के साथ aiofiles इंस्टॉल करना होगा।
1# pip install aiofiles
2import asyncio
3import aiofiles
4from pathlib import Path
5
6async def read_file(path: Path):
7 # Read file content asynchronously
8 async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
9 contents = await f.read()
10 return path.name, len(contents)
11
12async def main():
13 files = [
14 Path("example1.txt"),
15 Path("example2.txt"),
16 Path("example3.txt")
17 ]
18 tasks = [read_file(p) for p in files]
19 results = await asyncio.gather(*tasks)
20 for name, size in results:
21 print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24 asyncio.run(main())- यह कोड प्रत्येक फाइल की पढ़ाई को समानांतर बनाता है।
aiofilesआमतौर पर आंतरिक रूप से थ्रेड पूल का उपयोग करता है, जिससे आप ब्लॉकिंग फाइल I/O को असिंक्रोनस इंटरफेस से नियंत्रित कर सकते हैं।
असिंक्रोनस HTTP क्लाइंट (aiohttp)
नेटवर्क I/O का एक क्लासिक उदाहरण है HTTP रिक्वेस्ट को असिंक्रोनस रूप में करना, जैसा कि यहां दिखाया गया है। जब आपको समानांतर में बड़ी संख्या में HTTP रिक्वेस्ट करनी होती है, तो यह विशेष रूप से शक्तिशाली है।
नीचे aiohttp का उपयोग करके कई URL समानांतर में प्राप्त करने का उदाहरण है। आपको pip install aiohttp के साथ aiohttp इंस्टॉल करना होगा।
1# pip install aiohttp
2import asyncio
3import aiohttp
4
5async def fetch(session: aiohttp.ClientSession, url: str):
6 # Fetch a URL asynchronously and return status and size
7 async with session.get(url) as resp:
8 text = await resp.text()
9 return url, resp.status, len(text)
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [fetch(session, u) for u in urls]
14 for coro in asyncio.as_completed(tasks):
15 url, status, size = await coro
16 print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19 urls = [
20 "https://codesparklab.com/json/example1.json",
21 "https://codesparklab.com/json/example2.json",
22 "https://codesparklab.com/json/example3.json",
23 ]
24 asyncio.run(main(urls))asyncio.as_completedका उपयोग करके आप परिणामों को वैसे ही प्रोसेस कर सकते हैं जैसे टास्क पूरे हों। यह कई अनुरोधों को कुशलता से संभालने के लिए उपयोगी है।
ब्लॉकिंग I/O के साथ सह-अस्तित्व: run_in_executor
जब आप असिंक्रोनस कोड में CPU-गहन कार्यों या मौजूदा ब्लॉकिंग API के साथ काम कर रहे हों, तो loop.run_in_executor के माध्यम से ThreadPoolExecutor या ProcessPoolExecutor का उपयोग करें।
निम्नलिखित कोड एक उदाहरण है जिसमें थ्रेड पूल का उपयोग करके ब्लॉकिंग I/O वाले कार्यों को एक साथ चलाया गया है।
1import asyncio
2from concurrent.futures import ThreadPoolExecutor
3import time
4
5def blocking_io(n):
6 # Simulate a blocking I/O or CPU-bound function
7 time.sleep(n)
8 return f"slept {n}s"
9
10async def main():
11 loop = asyncio.get_running_loop()
12 with ThreadPoolExecutor() as pool:
13 tasks = [
14 loop.run_in_executor(pool, blocking_io, 1),
15 loop.run_in_executor(pool, blocking_io, 2),
16 ]
17 results = await asyncio.gather(*tasks)
18 print("Blocking results:", results)
19
20if __name__ == "__main__":
21 asyncio.run(main())run_in_executorका उपयोग करके, आप मौजूदा सिंक्रोनस कोड को बिना विशेष बदलाव किए असिंक्रोनस फ्लो में शामिल कर सकते हैं। हालांकि, आपको थ्रेड्स की संख्या और CPU लोड पर ध्यान देना चाहिए।ProcessPoolExecutorCPU-गहन कार्यों के लिए उपयुक्त है।
असिंक्रोनस सर्वर: asyncio आधारित TCP इको सर्वर
अगर आप सॉकेट्स को सीधे संभालना चाहते हैं, तो आप asyncio.start_server का उपयोग करके आसानी से असिंक्रोनस सर्वर बना सकते हैं।
निम्नलिखित उदाहरण एक साधारण इको सर्वर है जो क्लाइंट से प्राप्त डेटा को वैसा ही लौटाता है।
1import asyncio
2
3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
4 # Handle a single client: read data and echo it back
5 addr = writer.get_extra_info('peername')
6 print(f"Connection from {addr}")
7 while True:
8 data = await reader.read(1024)
9 if not data:
10 break
11 writer.write(data) # echo back
12 await writer.drain()
13 writer.close()
14 await writer.wait_closed()
15 print(f"Connection closed {addr}")
16
17async def main():
18 server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19 addr = server.sockets[0].getsockname()
20 print(f"Serving on {addr}")
21 async with server:
22 await server.serve_forever()
23
24if __name__ == "__main__":
25 asyncio.run(main())-
asyncioके साथ TCP संचार में,StreamReaderऔरStreamWriterअसिंक्रोनस इनपुट और आउटपुट में केंद्रीय भूमिका निभाते हैं।StreamReaderक्लाइंट से भेजा गया डेटा असिंक्रोनस रूप से पढ़ता है, जबकिStreamWriterका उपयोग सर्वर से क्लाइंट को प्रतिक्रिया भेजने के लिए किया जाता है। -
आप सॉकेट्स के विस्तार से संचालन को खुद संभाले बिना भी,
asyncio.start_serverका उपयोग करके आसानी से और प्रभावी तरीक़े से एक असिंक्रोनस सर्वर चालू कर सकते हैं। -
जब आप
asyncio.start_serverको हैंडलर फ़ंक्शन पास करते हैं, तो वह फ़ंक्शन अपने आर्गुमेंट्स के रूप मेंreaderऔरwriterप्राप्त करता है। इनका उपयोग करके, आप सीधे लो-लेवल सॉकेट एपीआई को संभालने की तुलना में अधिक सुरक्षित और स्पष्ट तरीक़े से कम्यूनिकेशन प्रक्रिया लागू कर सकते हैं। उदाहरण के लिए,reader.read()से डेटा प्राप्त करके औरwriter.write()कोwriter.drain()के साथ जोड़कर, आप ऐसा असिंक्रोनस ट्रांसमिशन कर सकते हैं जिससे ट्रांसमिशन पूरी तरह से हो सके। -
यह सेटअप बड़ी संख्या में एक साथ कनेक्शन संभालने के लिए उपयुक्त है और सरल प्रोटोकॉल या छोटे पैमाने की TCP सेवाओं के लिए आदर्श है।
बड़े स्ट्रीमिंग डेटा को संभालना
बड़ी फाइलों या रिस्पॉन्स को क्रमवार प्रोसेस करते समय, डेटा को टुकड़ों (chunks) में पढ़ें और लिखें ताकि मेमोरी उपयोग कम रहे। नीचे aiohttp का उपयोग करके स्ट्रीमिंग रीड का उदाहरण है।
निम्न कोड HTTP रिस्पॉन्स को टुकड़ों (chunks) में प्रोसेस करता है और डेटा मिलते ही उसे डिस्क पर लिखता है।
1import aiohttp
2import asyncio
3import aiofiles
4
5async def stream_download(url: str, dest: str):
6 # Stream download and write to file in chunks
7 async with aiohttp.ClientSession() as session:
8 async with session.get(url) as resp:
9 async with aiofiles.open(dest, 'wb') as f:
10 async for chunk in resp.content.iter_chunked(1024 * 64):
11 await f.write(chunk)
12
13if __name__ == "__main__":
14 asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))-
यह कोड एक बड़ी फ़ाइल को एक साथ नहीं लोड करता; इसके बजाय, यह डेटा को छोटे-छोटे टुकड़ों (chunks) में प्राप्त करता है और उसे असिंक्रोनस रूप से फ़ाइल में लिखता है। इसका परिणाम यह है कि यह कम मेमोरी उपयोग के साथ तेज़ी और कुशलता से डाउनलोड कर सकता है।
aiohttpअसिंक्रोनस रूप से डेटा प्राप्त करता है औरaiofilesबिना ब्लॉक किए फ़ाइल में लिखता है, जिससे अन्य प्रक्रियाओं के साथ इसे चलाना आसान होता है। -
यह तरीका बड़ी फाइलों को कुशलता से डाउनलोड और सेव करने के लिए उपयुक्त है, और मेमोरी खपत को न्यूनतम करता है।
सबप्रोसेस का असिंक्रोनस निष्पादन
अगर आप बाहरी कमांड्स को असिंक्रोनस रूप से चलाना चाहते हैं और उनके आउटपुट को वास्तविक समय में पढ़ना चाहते हैं, तो asyncio.create_subprocess_exec उपयोगी है।
नीचे एक बाहरी कमांड शुरू करने और उसके स्टैंडर्ड आउटपुट को असली समय में पढ़ने का उदाहरण है।
1import asyncio
2
3async def run_cmd(cmd):
4 # Run external command asynchronously and capture output line by line
5 proc = await asyncio.create_subprocess_exec(
6 *cmd,
7 stdout=asyncio.subprocess.PIPE,
8 stderr=asyncio.subprocess.PIPE
9 )
10
11 async def read_stream(stream, name):
12 while True:
13 line = await stream.readline()
14 if not line:
15 break
16 print(f"[{name}] {line.decode().rstrip()}")
17
18 await asyncio.gather(
19 read_stream(proc.stdout, "stdout"),
20 read_stream(proc.stderr, "stderr"),
21 )
22 await proc.wait()
23 return proc.returncode
24
25if __name__ == "__main__":
26 asyncio.run(run_cmd(["python", "--version"]))- सबप्रोसेस को असिंक्रोनस रूप से नियंत्रित करके, आप बाहरी टूल्स के लॉग वास्तविक समय में संभाल सकते हैं या कई प्रोसेसों को एक साथ चला सकते हैं।
रद्द करना और टाइमआउट संभालना
असिंक्रोनस टास्क रद्द किए जा सकते हैं। टाइमआउट लागू करने के लिए asyncio.wait_for का उपयोग करना आसान है।
नीचे एक टास्क को टाइमआउट के साथ चलाने का उदाहरण है।
1import asyncio
2
3async def slow_task():
4 await asyncio.sleep(5)
5 return "done"
6
7async def main():
8 try:
9 result = await asyncio.wait_for(slow_task(), timeout=2.0)
10 print("Result:", result)
11 except asyncio.TimeoutError:
12 print("Task timed out")
13
14if __name__ == "__main__":
15 asyncio.run(main())wait_forसमय सीमा पार होने परTimeoutErrorदेता है और आवश्यकता होने पर टास्क को रद्द कर देता है। टास्क कैंसलेशन के प्रचार और क्लीनअप में सावधानी बरतें।
कनकरेंसी नियंत्रित करना (Semaphore)
क्योंकि बहुत सारे एकसाथ कनेक्शन या अनुरोध संसाधनों को थका सकते हैं, इसलिए asyncio.Semaphore से कनकरेंसी सीमित करें।
नीचे एक सेमाफोर का उपयोग करके एक साथ डाउनलोड की सीमा निर्धारित करने का उदाहरण है।
1import asyncio
2import aiohttp
3
4semaphore = asyncio.Semaphore(3) # allow up to 3 concurrent tasks
5
6async def limited_fetch(session, url):
7 async with semaphore:
8 async with session.get(url) as resp:
9 return url, resp.status
10
11async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 tasks = [limited_fetch(session, u) for u in urls]
14 results = await asyncio.gather(*tasks)
15 print(results)
16
17if __name__ == "__main__":
18 urls = ["https://codesparklab.com/json/example.json"] * 10
19 asyncio.run(main(urls))- इस विधि से, आप बाहरी सेवाओं तक धीरे-धीरे पहुँच सकते हैं और अपने प्रोसेस पर अधिक लोड होने से बच सकते हैं।
एरर हैंडलिंग और पुन: प्रयास (रिट्राई) रणनीतियाँ
गलतियाँ तो असिंक्रोनस प्रोसेसिंग में भी स्वाभाविक रूप से हो सकती हैं। अपवादों को सही ढंग से पकड़ें और एक्सपोनेंशियल बैकऑफ जैसी पुन: प्रयास रणनीतियाँ लागू करें।
नीचे N बार तक साधारण पुन: प्रयास (रिट्राई) की एक उदाहरण कार्यान्वयन है।
1import asyncio
2import aiohttp
3import random
4
5async def fetch_with_retry(session, url, retries=3):
6 for attempt in range(1, retries + 1):
7 try:
8 async with session.get(url) as resp:
9 resp.raise_for_status()
10 text = await resp.text()
11 return text
12 except Exception as e:
13 if attempt == retries:
14 raise
15 await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18 async with aiohttp.ClientSession() as session:
19 text = await fetch_with_retry(session, "https://codesparklab.com/")
20 print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23 asyncio.run(main())- संगति और ट्रैफिक नियंत्रण के संतुलन के लिए उचित पुन: प्रयास लॉजिक महत्वपूर्ण है।
डिबगिंग और लॉगिंग के लिए सुझाव
असिंक्रोनस प्रोसेसिंग में टास्क्स एक साथ चलते हैं, जिससे समस्याओं का कारण पता लगाना मुश्किल हो सकता है। समस्याओं को कुशलता से ट्रैक करने के लिए निम्नलिखित बिंदुओं को ध्यान में रखना डिबगिंग को आसान बनाता है।
asyncio.run()औरTaskसे अपवादों को नज़रअंदाज़ करना आसान है, इसलिए बिना संभाले गए अपवादों को लॉग जरूर करें।loggingका उपयोग करते समय, अपने लॉग्स में कोरूटीन का नाम या (Python 3.8 और ऊपर में)task.get_name()शामिल करने से ट्रैकिंग आसान हो जाती है।- आप
asyncio.Task.all_tasks()का उपयोग करके कार्यों की वर्तमान स्थिति देख सकते हैं। हालाँकि, यह एपीआई केवल डिबगिंग के लिए है, और प्रदर्शन की समस्याओं या अप्रत्याशित हस्तक्षेप से बचने के लिए इसे प्रोडक्शन में सावधानी से इस्तेमाल करना चाहिए।
प्रदर्शन (परफॉर्मेंस) संबंधी विचार
जहाँ असिंक्रोनस प्रोग्रामिंग I/O इंतजार को संभालने में उत्कृष्ट है, वहीं अनुचित उपयोग से प्रदर्शन खराब भी हो सकता है। निम्नलिखित बिंदुओं को ध्यान में रखकर ऑप्टिमाइज़ करें:।
- असिंक्रोनस प्रोसेसिंग I/O-बाउंड कार्यों में तो बेहतरीन है, लेकिन CPU-बाउंड टास्क के लिए उपयुक्त नहीं है; ऐसे मामलों में प्रोसेस पूल का उपयोग करें।
- थ्रेड या प्रोसेस पूल का उपयोग करते समय, पूल का आकार और टास्क की प्रकृति पर ध्यान दें।
- अगर आप एक साथ बहुत सारे छोटे टास्क्स शुरू करते हैं तो इवेंट लूप ओवरहेड बढ़ता है—इसलिए समायोजन के लिए बैचिंग या सेमाफ़ोर का उपयोग करें।
सारांश
पाइथन का असिंक्रोनस I/O एक शक्तिशाली तंत्र है जो I/O इंतजार के समय का प्रभावी उपयोग करता है और नेट्वर्क व फाइल ऑपरेशन्स को एक साथ कुशलता से निष्पादित करता है। asyncio, aiohttp, aiofiles और run_in_executor जैसी तकनीकों को मिलाकर, आप लचीले ढंग से व्यावहारिक असिंक्रोनस एप्लिकेशन बना सकते हैं। रिसोर्स के अधिग्रहण और मुक्तिकरण को स्वचालित करने के लिए async with का उपयोग करने से आप फाइल, HTTP सत्र और लॉक जैसी असिंक्रोनस संसाधनों को सुरक्षित और विश्वसनीय तरीके से प्रबंधित कर सकते हैं। उचित एरर हैंडलिंग और कनकरेंसी मैनेजमेंट को शामिल करके, आप उच्च विश्वसनीयता वाले असिंक्रोनस प्रोग्राम को सुरक्षित रूप से चला सकते हैं।
आप हमारे YouTube चैनल पर Visual Studio Code का उपयोग करके ऊपर दिए गए लेख के साथ आगे बढ़ सकते हैं। कृपया YouTube चैनल को भी देखें।