পাইথন অ্যাসিনক্রোনাস প্রসেসিংয়ে সামঞ্জস্য নিয়ন্ত্রণ
এই নিবন্ধটি পাইথন অ্যাসিনক্রোনাস প্রসেসিংয়ে সামঞ্জস্য নিয়ন্ত্রণ ব্যাখ্যা করে।
আপনি ধাপে ধাপে শিখবেন, asyncio-এর বুনিয়াদি বিষয় থেকে শুরু করে, সিঙ্ক্রোনাইজেশন কন্ট্রোলে সাধারণত ব্যবহৃত ব্যবহারিক নিদর্শন পর্যন্ত।
YouTube Video
পাইথন অ্যাসিনক্রোনাস প্রসেসিংয়ে সামঞ্জস্য নিয়ন্ত্রণ
অ্যাসিনক্রোনাস প্রসেসিংয়ে আপনি সহজেই একাধিক টাস্ক একসাথে চালাতে পারেন। তবে, বাস্তবে, আরও উন্নত সমন্বয় প্রয়োজন যেমন কনকারেন্সি নিয়ন্ত্রণ, টাস্ক সমন্বয়, শেয়ারকৃত রিসোর্সের একচেটিয়া নিয়ন্ত্রণ, ভারী সিনক্রোনাস প্রসেস পরিচালনা এবং বাতিলকরণের পরে ক্লিনআপ।
এখানে, আমরা asyncio-র মৌলিক দিক থেকে শুরু করে সামঞ্জস্যের জন্য ব্যবহৃত বাস্তবিক প্যাটার্ন ধাপে ধাপে শিখবো।
পরিচিতি: মৌলিক বিষয়সমূহ (async / await এবং create_task)
প্রথমে চলুন কিছু মিনিমাল অ্যাসিনক্রোনাস কোড দেখি। await ঐ স্থানে অপেক্ষা করে যতক্ষণ না ডাকা coroutine শেষ হয়, এবং asyncio.create_task কনকারেন্ট এক্সিকিউশনের জন্য টাস্ক নির্ধারণ করে।
1import asyncio
2
3async def worker(name, delay):
4 # Simulate async work
5 await asyncio.sleep(delay)
6 return f"{name} done after {delay}s"
7
8async def main():
9 # Create two tasks that run concurrently.
10 t1 = asyncio.create_task(worker("A", 1))
11 t2 = asyncio.create_task(worker("B", 2))
12
13 # Await both results (this suspends until both are done).
14 result1 = await t1
15 result2 = await t2
16 print(result1, result2)
17
18if __name__ == "__main__":
19 asyncio.run(main())- এই কোডটি একটি সাধারণ প্যাটার্ন যেখানে টাস্কগুলি স্পষ্টভাবে তৈরি হয়, সমান্তরালে চলে, এবং শেষে
awaitদিয়ে আউটপুট পাওয়া যায়।create_taskকনকারেন্ট এক্সিকিউশন সক্ষম করে।
asyncio.gather, asyncio.wait, এবং asyncio.as_completed-এর মধ্যে পার্থক্য
একাধিক coroutine একসাথে চালানোর সময়, আপনি ফলাফল কিভাবে পেতে চান তার উপর নির্ভর করে কোনটি ব্যবহার করবেন তা নির্ধারণ করুন। gather সব শেষ না হওয়া পর্যন্ত অপেক্ষা করে এবং ইনপুট ক্রমানুসারে ফলাফল ফেরত দেয়, অপরদিকে as_completed যেকোনো ক্রমে শেষ হওয়ার সাথে সাথে ফলাফল প্রসেস করার সুযোগ দেয়।
1import asyncio
2import random
3
4async def job(i):
5 delay = random.random() * 2
6 await asyncio.sleep(delay)
7 return (i, delay)
8
9async def gather_example():
10 # gather waits for all tasks and returns results in the same order as input
11 results = await asyncio.gather(*(job(i) for i in range(5)))
12 print("gather order:", results)
13
14async def as_completed_example():
15 # as_completed yields results as they finish (useful to process early results)
16 tasks = [asyncio.create_task(job(i)) for i in range(5)]
17 for coro in asyncio.as_completed(tasks):
18 res = await coro
19 print("completed:", res)
20
21async def main():
22 await gather_example()
23 await as_completed_example()
24
25if __name__ == "__main__":
26 asyncio.run(main())- এই কোডে দেখানো হয়েছে,
gatherইনপুট ক্রমে রেজাল্ট দেয়, ফলে এটি কার্যকর যখন আপনি ক্রম সংরক্ষণ করতে চান।as_completedব্যবহার হয় যখন আপনি কোনো টাস্ক শেষ হতেই ফলাফল প্রসেস করতে চান।
কনকারেন্সি নিয়ন্ত্রণ: asyncio.Semaphore দিয়ে একসাথে এক্সিকিউশন সীমাবদ্ধ করা
যখন এক্সটার্নাল API রেট লিমিট বা DB সংযোগের সীমা থাকে, তখন Semaphore ব্যবহার করে কনকারেন্ট এক্সিকিউশন নিয়ন্ত্রণ করতে পারেন।
1import asyncio
2import random
3
4sem = asyncio.Semaphore(3) # allow up to 3 concurrent workers
5
6async def limited_worker(i):
7 async with sem:
8 # Only 3 tasks can be inside this block concurrently
9 delay = random.random() * 2
10 await asyncio.sleep(delay)
11 print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14 tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15 await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18 asyncio.run(main())async with-এর সাথেSemaphoreব্যবহার করে সহজেই একসাথে চলার সংখ্যা সীমাবদ্ধ করতে পারবেন। এটি বাহ্যিক সীমাবদ্ধতা থাকলে কার্যকর।
শেয়ারকৃত রিসোর্সের একচেটিয়া নিয়ন্ত্রণ: asyncio.Lock
Lock ব্যবহার হয় শেয়ারকৃত ডেটায় একসাথে আপডেট হওয়া রোধ করতে। asyncio.Lock অ্যাসিনক্রোনাস ব্যবহারের জন্য একচেটিয়া প্রিমিটিভ।
1import asyncio
2
3counter = 0
4lock = asyncio.Lock()
5
6async def incrementer(n_times):
7 global counter
8 for _ in range(n_times):
9 # Acquire lock to update shared counter safely
10 async with lock:
11 tmp = counter
12 await asyncio.sleep(0) # yield control to increase race likelihood
13 counter = tmp + 1
14
15async def main():
16 tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17 await asyncio.gather(*tasks)
18 print("final counter:", counter)
19
20if __name__ == "__main__":
21 asyncio.run(main())- যদি একাধিক টাস্ক শেয়ার করা ভ্যারিয়েবল (যেমন একটি গ্লোবাল
counter) আপডেট করে, কনফ্লিক্ট হতে পারে।Lockদিয়ে অপারেশন ঘিরে রাখলে সামঞ্জস্য বজায় রাখতে পারবেন।
টাস্ক সমন্বয়: asyncio.Event
যখন একটি টাস্ক প্রস্তুত হওয়ার সঙ্কেত দেয় এবং অন্যরা ঐ সঙ্কেতের জন্য অপেক্ষা করে, তখন Event ব্যবহার হয়। এটি টাস্কগুলোর মধ্যে সঙ্কেত ভাগাভাগি ও পারস্পরিক সামঞ্জস্যের একটি সহজ উপায়।
1import asyncio
2
3event = asyncio.Event()
4
5async def waiter(name):
6 print(f"{name} is waiting for event")
7 await event.wait()
8 print(f"{name} resumed after event set")
9
10async def setter():
11 print("setter will sleep and then set the event")
12 await asyncio.sleep(1)
13 event.set()
14 print("event set by setter")
15
16async def main():
17 tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18 await asyncio.create_task(setter())
19 await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22 asyncio.run(main())Event-এ একটি বুলিয়ান ফ্ল্যাগ থাকে, এবংset()কল করলে সকল অপেক্ষমাণ টাস্ক আবার শুরু হয়। এটি সাধারণ সামঞ্জস্যের জন্য উপযোগী।
প্রোডিউসার- কনজিউমার প্যাটার্ন: asyncio.Queue
Queue ব্যবহার করে প্রোডিউসার (ডেটা তৈরি করে) এবং কনজিউমার (ডেটা প্রসেস করে) সহজে ও অ্যাসিনক্রোনাসভাবে সমন্বয় করতে পারে। এছাড়াও, যখন queue ফুল হয়, প্রোডিউসার স্বয়ংক্রিয়ভাবে অপেক্ষা করে, ফলে অতিরিক্ত উৎপাদন বন্ধে স্বাভাবিকভাবে চাপ প্রয়োগ হয়।
1import asyncio
2import random
3
4async def producer(q, n_items):
5 for i in range(n_items):
6 await asyncio.sleep(random.random() * 0.5)
7 item = f"item-{i}"
8 await q.put(item)
9 print("produced", item)
10 # signal consumers to stop
11 await q.put(None)
12
13async def consumer(q, name):
14 while True:
15 item = await q.get()
16 if item is None:
17 # put sentinel back for other consumers and break
18 await q.put(None)
19 break
20 await asyncio.sleep(random.random() * 1)
21 print(name, "consumed", item)
22 q.task_done()
23
24async def main():
25 q = asyncio.Queue(maxsize=5) # bounded queue to apply backpressure
26 prod = asyncio.create_task(producer(q, 10))
27 cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28 await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31 asyncio.run(main())Queueঅ্যাসিনক্রোনাসভাবে প্রোডিউসার ও কনজিউমার সমন্বয় করতে সহায়তা করে। এছাড়া,maxsizeসেট করলে queue ফুল হলে producer কেputএ অপেক্ষা করায় এবং অতিরিক্ত উৎপাদন প্রতিরোধ হয়।
সিঙ্ক্রোনাস ব্লকিং অপারেশন পরিচালনা: run_in_executor
CPU-গুরুত্বপূর্ণ প্রসেসিং বা async সাপোর্ট না করা লাইব্রেরি ব্যবহারের ক্ষেত্রে, run_in_executor ব্যবহার করে প্রসেসিং অন্য থ্রেড বা প্রসেসে পাঠান। এতে মূল event loop থামা থেকে রক্ষা পায় এবং অন্যান্য অ্যাসিনক্রোনাস টাস্ক নির্বিঘ্নে চলে।
1import asyncio
2import time
3
4def blocking_io(x):
5 # simulate blocking I/O or CPU-bound work
6 time.sleep(2)
7 return x * x
8
9async def main():
10 loop = asyncio.get_running_loop()
11 # run blocking_io in default thread pool
12 result = await loop.run_in_executor(None, blocking_io, 3)
13 print("blocking result:", result)
14
15if __name__ == "__main__":
16 asyncio.run(main())- সরাসরি সিনক্রোনাস ফাংশন কল করলে event loop ব্লক হয়ে যাবে।
run_in_executorব্যবহারে কোড পৃথক থ্রেডে চলে এবং অ্যাসিনক্রোনাস টাস্ক কনকারেন্ট ভাবে চলতে পারে।
উদাহরণ: রেট-লিমিটেড API কল (Semaphore + run_in_executor এর সমন্বয়)
নিম্নে একটি নমুনা পরিস্থিতি দেওয়া হলো, যেখানে API কল রেট-লিমিটেড এবং ফলাফলের উপর ভারী প্রসেসিং করা হচ্ছে। Semaphore এবং run_in_executor একত্রে ব্যবহার করলে প্রসেসিং নিরাপদ ও দক্ষতার সাথে এগোয়।
1import asyncio
2import time
3import random
4
5sem = asyncio.Semaphore(5)
6
7def heavy_sync_processing(data):
8 # simulate heavy CPU-bound work
9 time.sleep(1)
10 return f"processed-{data}"
11
12async def api_call(i):
13 await asyncio.sleep(random.random() * 0.5) # simulate network latency
14 return f"data-{i}"
15
16async def worker(i):
17 async with sem:
18 data = await api_call(i)
19 # offload CPU-bound work to threadpool
20 loop = asyncio.get_running_loop()
21 result = await loop.run_in_executor(None, heavy_sync_processing, data)
22 print(result)
23
24async def main():
25 tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26 await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29 asyncio.run(main())- আমরা কনকারেন্ট API কলের সংখ্যা সীমিত করতে
Semaphoreব্যবহার করি এবং ফলাফলের ভারী প্রসেসিং থ্রেড পুলে পাঠাই। নেটওয়ার্ক ও CPU প্রসেসিং আলাদা রাখলে দক্ষতা বাড়ে।
টাস্ক বাতিল এবং ক্লিনআপ
টাস্ক বাতিল হলে, finally ও asyncio.CancelledError সঠিকভাবে হ্যান্ডেল করা খুবই গুরুত্বপূর্ণ। এটি নিশ্চিত করে যে ফাইল ও কানেকশন মুক্ত হয় এবং মধ্যবর্তী অবস্থা সঠিকভাবে হ্যান্ডেল হয়, ফলে অ্যাপ্লিকেশনের সামঞ্জস্য বজায় থাকে।
1import asyncio
2
3async def long_running():
4 try:
5 print("started long_running")
6 await asyncio.sleep(10)
7 print("finished long_running")
8 except asyncio.CancelledError:
9 print("long_running was cancelled, cleaning up")
10 # perform cleanup here
11 raise
12
13async def main():
14 task = asyncio.create_task(long_running())
15 await asyncio.sleep(1)
16 task.cancel()
17 try:
18 await task
19 except asyncio.CancelledError:
20 print("task cancelled in main")
21
22if __name__ == "__main__":
23 asyncio.run(main())- বাতিলকরণ একটি এক্সসেপশন হিসেবে (
CancelledError) আসে, তাই প্রয়োজনীয় ক্লিনআপexceptব্লকে করুন এবং দরকার হলে এক্সসেপশন আবার তুলুন।
বাস্তব ডিজাইনের মূল পয়েন্ট
নিম্নের বিষয়গুলো অ্যাসিনক্রোনাস প্রসেসিং ডিজাইন করার ক্ষেত্রে সহায়ক।
-
স্পষ্টভাবে কনকারেন্সি নিয়ন্ত্রণ করুন API বা DB-র মতো রিসোর্স সীমা থাকলে
Semaphoreব্যবহার করে কনকারেন্ট এক্সিকিউশন সীমিত করতে পারেন। -
শেয়ারকৃত রিসোর্স নিরাপদে পরিচালনা করুন একাধিক টাস্ক থেকে স্টেট আপডেট করতে হলে
Lockব্যবহার করুন। শেয়ারড স্টেট কমানো ও অপরিবর্তনীয় ডেটার চারপাশে ডিজাইন করলে আরও নিরাপদ হয়। -
ফলাফল কিভাবে নেবেন তা নির্বাচন করুন যদি টাস্ক শেষ হওয়ার সাথে সাথে প্রসেস করতে চান,
asyncio.as_completedব্যবহার করুন; ইনপুট ক্রমে প্রসেস করতে চাইলেgatherব্যবহার করুন। -
ভারী সিনক্রোনাস প্রসেসিং আলাদা করুন CPU-গুরুত্বপূর্ণ বা সিনক্রোনাস লাইব্রেরি কলের জন্য event loop ব্লক হওয়া এড়াতে
run_in_executorঅথবাProcessPoolExecutorব্যবহার করুন। -
বাতিলকরণ ও এক্সসেপশন পরিকল্পনা করুন মাঝপথে কোনো টাস্ক বাতিল হলেও রিসোর্স সুরক্ষিতভাবে ক্লিন আপ করার জন্য উপযুক্ত এক্সসেপশন হ্যান্ডলিং লিখুন।
-
পরীক্ষণ সহজ করুন I/O, সময়, র্যান্ডমনেসের মতো সাইড ইফেক্ট অ্যাবস্ট্রাক্ট করুন, যাতে এগুলো প্রতিস্থাপন করা সহজ হয় এবং অ্যাসিনক্রোনাস কোড টেস্ট করা সহজ হয়।
সারসংক্ষেপ
asyncio শক্তিশালী, কিন্তু শুধু "সমান্তরালে চালানো" তে মনোযোগ দিলে শেয়ারকৃত রিসোর্স দ্বন্দ্ব, রিসোর্স সীমা লঙ্ঘন বা event loop ব্লকিং-এর মতো সমস্যা হতে পারে। Semaphore, Lock, Event, Queue, run_in_executor এবং সঠিক বাতিলকরণ পরিচালনা একত্রিত করে নিরাপদ ও দক্ষ অ্যাসিনক্রোনাস অ্যাপ্লিকেশন ডিজাইন করা যায়। প্রোডিউসার-কনজিউমার প্যাটার্ন, কনকারেন্সি সীমা বা অ্যাসিনক্রোনাস ও ব্লকিং প্রসেস আলাদা করার মতো ব্যবস্থাগুলি ব্যবহার করে অ্যাসিনক্রোনাস ওয়ার্কফ্লো আরও নিরাপদ ও দক্ষভাবে তৈরি করা যায়।
আপনি আমাদের ইউটিউব চ্যানেলে ভিজ্যুয়াল স্টুডিও কোড ব্যবহার করে উপরের নিবন্ধটি অনুসরণ করতে পারেন। দয়া করে ইউটিউব চ্যানেলটিও দেখুন।