Python-এ `threading` মডিউল
এই প্রবন্ধটি Python-এ threading মডিউলটি বর্ণনা করে।
YouTube Video
Python-এ threading মডিউল
Python-এর threading মডিউল একটি স্ট্যান্ডার্ড লাইব্রেরি যা মাল্টিথ্রেডিং প্রোগ্রামিং সমর্থন করে। থ্রেড ব্যবহার করে একাধিক প্রক্রিয়া একসাথে চালানো সম্ভব, যা প্রোগ্রামের পারফরম্যান্স উন্নত করতে পারে, বিশেষত ব্লকিং অপারেশনগুলি যেমন I/O ওয়েটের ক্ষেত্রে। পাইথনের গ্লোবাল ইন্টারপ্রেটার লক (GIL) এর কারণে, সিপিইউ-বাউন্ড কাজের জন্য মাল্টিথ্রেডিংয়ের কার্যকারিতা সীমিত, তবে এটি I/O-বাউন্ড কাজের জন্য দক্ষতার সাথে কাজ করে।
পরবর্তী অংশগুলোতে threading মডিউল ব্যবহার এবং থ্রেড নিয়ন্ত্রনের মৌলিক বিষয়গুলো বর্ণনা করা হয়েছে।
থ্রেডের মৌলিক ব্যবহার
থ্রেড তৈরি এবং চালানো
থ্রেড তৈরি করতে এবং একসাথে প্রসেসিং সম্পাদনের জন্য threading.Thread ক্লাসটি ব্যবহার করুন। একটি থ্রেড তৈরি করতে এবং তা কার্যকর করতে টার্গেট ফাংশন নির্ধারণ করুন।
1import threading
2import time
3
4# Function to be executed in a thread
5def worker():
6 print("Worker thread started")
7 time.sleep(2)
8 print("Worker thread finished")
9
10# Create and start the thread
11thread = threading.Thread(target=worker)
12thread.start()
13
14# Processing in the main thread
15print("Main thread continues to run")
16
17# Wait for the thread to finish
18thread.join()
19print("Main thread finished")- এই উদাহরণে,
workerফাংশনটি একটি পৃথক থ্রেডে কার্যকর হওয়া হয়, যখন মূল থ্রেডটি নিরবচ্ছিন্নভাবে কার্যক্রম চালিয়ে যায়।join()মেথড ডেকে, মূল থ্রেডটি সাব-থ্রেড শেষ হওয়া পর্যন্ত অপেক্ষা করে।
থ্রেডের নামকরণ
থ্রেডগুলিকে অর্থবহ নাম দেওয়া লগিং এবং ডিবাগিংকে সহজ করে তোলে। আপনি এটি name আর্গুমেন্ট দিয়ে নির্ধারণ করতে পারেন।
1import threading
2import time
3
4# Function to be executed in a thread
5def worker():
6 print("Worker thread started")
7 time.sleep(2)
8 print("Worker thread finished")
9
10t = threading.Thread(
11 target=worker,
12 args=("named-worker", 0.3),
13 name="MyWorkerThread"
14)
15
16t.start()
17
18print("Active threads:", threading.active_count())
19for th in threading.enumerate():
20 print(" -", th.name)
21
22t.join()threading.enumerate()বর্তমানে চলমান সকল থ্রেডের একটি তালিকা প্রদান করে, যা ডিবাগিং এবং অবস্থা পর্যবেক্ষণের জন্য সহায়ক।
থ্রেড ক্লাস উত্তরাধিকার করা
যদি আপনি থ্রেড কার্যকরকারী ক্লাসটি কাস্টমাইজ করতে চান, তবে আপনি threading.Thread ক্লাসটি উত্তরাধিকার করে একটি নতুন ক্লাস সংজ্ঞায়িত করতে পারেন।
1import threading
2import time
3
4# Inherit from the Thread class
5class WorkerThread(threading.Thread):
6 def __init__(self, name, delay, repeat=3):
7 super().__init__(name=name)
8 self.delay = delay
9 self.repeat = repeat
10 self.results = []
11
12 def run(self):
13 for i in range(self.repeat):
14 msg = f"{self.name} step {i+1}"
15 print(msg)
16 self.results.append(msg)
17 time.sleep(self.delay)
18
19# Create and start the threads
20t1 = WorkerThread("Worker-A", delay=0.4, repeat=3)
21t2 = WorkerThread("Worker-B", delay=0.2, repeat=5)
22
23t1.start()
24t2.start()
25
26t1.join()
27t2.join()
28
29print("Results A:", t1.results)
30print("Results B:", t2.results)- এই উদাহরণে, থ্রেডের আচরণ নির্ধারণ করতে
run()মেথডটি ওভাররাইড করা হয়েছে, যাতে প্রতিটি থ্রেড তাদের নিজস্ব ডেটা সংরক্ষণ করতে পারে। এটি তখন উপকারী যখন থ্রেডগুলো জটিল প্রক্রিয়াকরণ করে বা আপনি চান প্রতিটি থ্রেডের নিজস্ব স্বতন্ত্র ডেটা থাকুক।
থ্রেডগুলির মধ্যে সমন্বয়
যখন একাধিক থ্রেড একযোগে ভাগ করা রিসোর্সে অ্যাক্সেস করে, তখন ডেটা রেস ঘটতে পারে। এটি প্রতিরোধ করার জন্য, threading মডিউল বিভিন্ন সিঙ্ক্রোনাইজেশন মেকানিজম সরবরাহ করে।
লক (Lock)
Lock অবজেক্টটি থ্রেডগুলির মধ্যে রিসোর্সগুলির উপর একচেটিয়া নিয়ন্ত্রণ বাস্তবায়নের জন্য ব্যবহৃত হয়। একটি থ্রেড রিসোর্স লক করার সময় অন্যান্য থ্রেডগুলি সেই রিসোর্সে অ্যাক্সেস করতে পারে না।
1import threading
2
3lock = threading.Lock()
4shared_resource = 0
5
6def worker():
7 global shared_resource
8 with lock: # Acquire the lock
9 local_copy = shared_resource
10 local_copy += 1
11 shared_resource = local_copy
12
13threads = [threading.Thread(target=worker) for _ in range(5)]
14
15for t in threads:
16 t.start()
17
18for t in threads:
19 t.join()
20
21print(f"Final value of shared resource: {shared_resource}")- এই উদাহরণে, পাঁচটি থ্রেড একটি ভাগ করা রিসোর্সে অ্যাক্সেস করে, কিন্তু
Lockব্যবহার করা হয় একাধিক থ্রেড দ্বারা একই সাথে ডেটা সংশোধন প্রতিরোধ করতে।
পুনঃপ্রবেশযোগ্য লক (RLock)
যদি একই থ্রেডকে বারবার লক অর্জন করতে হয়, তাহলে RLock (পুনঃপ্রবেশযোগ্য লক) ব্যবহার করুন। এটি রিকারসিভ কল বা এমন লাইব্রেরি কলের জন্য সহায়ক, যেখানে বিভিন্ন কলের মধ্যে লক অর্জন করা হতে পারে।
1import threading
2
3rlock = threading.RLock()
4shared = []
5
6def outer():
7 with rlock:
8 shared.append("outer")
9 inner()
10
11def inner():
12 with rlock:
13 shared.append("inner")
14
15t = threading.Thread(target=outer)
16t.start()
17t.join()
18print(shared)RLockব্যবহারে একই থ্রেড বারবার একই লক অর্জন করতে পারে, যার ফলে নেস্টেড লক অর্জনের সময় ডেডলক এড়ানো যায়।
শর্তাবস্থা (Condition)
Condition থ্রেডগুলিকে একটি নির্দিষ্ট শর্ত পূরণ হওয়া পর্যন্ত অপেক্ষা করতে ব্যবহার করা হয়। যখন একটি থ্রেড কোনো শর্ত পূরণ করে, আপনি অন্য থ্রেডকে জানাতে notify() অথবা সমস্ত অপেক্ষমাণ থ্রেডকে জানাতে notify_all() কল করতে পারেন।
নিচে একটি Condition ব্যবহার করে প্রডিউসার ও কনজিউমারের একটি উদাহরণ দেওয়া হলো।
1import threading
2
3condition = threading.Condition()
4shared_data = []
5
6def producer():
7 with condition:
8 shared_data.append(1)
9 print("Produced an item")
10 condition.notify() # Notify the consumer
11
12def consumer():
13 with condition:
14 condition.wait() # Wait until the condition is met
15
16 item = shared_data.pop(0)
17 print(f"Consumed an item: {item}")
18
19# Create the threads
20producer_thread = threading.Thread(target=producer)
21consumer_thread = threading.Thread(target=consumer)
22
23consumer_thread.start()
24producer_thread.start()
25
26producer_thread.join()
27consumer_thread.join()- এই কোডে একটি
Conditionব্যবহার করা হয়েছে যাতে ডেটা যোগ হলে প্রডিউসার নোটিফাই করে এবং কনজিউমার সেই নোটিফিকেশনের জন্য অপেক্ষা করে ডেটা নেয়, ফলে সমন্বয় (সিঙ্ক্রোনাইজেশন) নিশ্চিত হয়।
থ্রেড ডেমনাইজেশন
মূল থ্রেডের সমাপ্তির সময় ডেমন থ্রেডগুলো জোরপূর্বক বন্ধ করা হয়। যখন সাধারণ থ্রেডগুলিকে সমাপ্তির জন্য অপেক্ষা করতে হয়, ডেমন থ্রেডগুলো স্বয়ংক্রিয়ভাবে বন্ধ হয়।
1import threading
2import time
3
4def worker():
5 while True:
6 print("Working...")
7 time.sleep(1)
8
9# Create a daemon thread
10thread = threading.Thread(target=worker)
11thread.daemon = True # Set as a daemon thread
12
13thread.start()
14
15# Processing in the main thread
16time.sleep(3)
17print("Main thread finished")- এই উদাহরণে,
workerথ্রেডটিকে ডেমনাইজ করা হয়েছে, তাই এটি মূল থ্রেডের সমাপ্তির সময় জোরপূর্বক বন্ধ হয়।
ThreadPoolExecutor এর মাধ্যমে থ্রেড পরিচালনা
threading মডিউল ছাড়াও, আপনি concurrent.futures মডিউলের ThreadPoolExecutor ব্যবহার করে থ্রেড পুল পরিচালনা এবং প্যারালাল টাস্ক সম্পাদন করতে পারেন।
1from concurrent.futures import ThreadPoolExecutor
2import time
3
4def worker(seconds):
5 print(f"Sleeping for {seconds} second(s)")
6 time.sleep(seconds)
7 return f"Finished sleeping for {seconds} second(s)"
8
9with ThreadPoolExecutor(max_workers=3) as executor:
10 futures = [executor.submit(worker, i) for i in range(1, 4)]
11 for future in futures:
12 print(future.result())ThreadPoolExecutorএকটি থ্রেড পুল তৈরি করে এবং দক্ষতার সাথে টাস্ক প্রক্রিয়াকরণ করে।max_workersদিয়ে একসঙ্গে চলমান থ্রেডের সংখ্যা নির্দিষ্ট করুন।
থ্রেডগুলির মধ্যে ইভেন্ট যোগাযোগ
threading.Event ব্যবহার করে, আপনি থ্রেডগুলির মধ্যে একটি ফ্ল্যাগ সেট করতে পারেন যাতে অন্যান্য থ্রেডদের ইভেন্টটি সম্পন্ন হওয়ার বিষয়ে জানানো যায়।
1import threading
2import time
3
4event = threading.Event()
5
6def worker():
7 print("Waiting for event to be set")
8 event.wait() # Wait until the event is set
9
10 print("Event received, continuing work")
11
12thread = threading.Thread(target=worker)
13thread.start()
14
15time.sleep(2)
16print("Setting the event")
17event.set() # Set the event and notify the thread- এই কোডটি দেখায় কিভাবে ওয়ার্কার থ্রেড
Eventসিগনালের জন্য অপেক্ষা করে এবং যখন মেইন থ্রেডevent.set()কল করে, তখন প্রসেসিং পুনরায় শুরু হয়।
থ্রেডে এক্সেপশন হ্যান্ডলিং এবং থ্রেড টার্মিনেশন
যখন থ্রেডে এক্সেপশন ঘটে, তখন তা সরাসরি মেইন থ্রেডে প্রকাশিত হয় না, তাই এক্সেপশন ক্যাপচার ও আদান-প্রদানের একটি প্যাটার্ন প্রয়োজন।
1import threading
2import queue
3
4def worker(err_q):
5 try:
6 raise ValueError("Something bad")
7 except Exception as e:
8 err_q.put(e)
9
10q = queue.Queue()
11t = threading.Thread(target=worker, args=(q,))
12t.start()
13t.join()
14if not q.empty():
15 exc = q.get()
16 print("Worker raised:", exc)Queue-এর মধ্যে এক্সসেপশনগুলি রেখে, এবং সেগুলি মূল থ্রেডে নিয়ে এসে, আপনি সহজেই ত্রুটি (failure) সনাক্ত করতে পারেন। যদি আপনিconcurrent.futures.ThreadPoolExecutorব্যবহার করেন, তবে এক্সেপশনগুলিfuture.result()এর মাধ্যমে পুনরায় তোলা হয়, ফলে এগুলি সহজে হ্যান্ডেল করা যায়।
GIL (গ্লোবাল ইন্টারপ্রেটার লক) এবং এর প্রভাব
CPython-এ GIL (গ্লোবাল ইন্টারপ্রেটার লক) এর কারণে, একই প্রসেসে একাধিক পাইথন বাইটকোড একসাথে চলে না। যেসব টাস্ক CPU-নির্ভর, যেমন ভারী গণনা, সেগুলোর জন্য multiprocessing ব্যবহার করার পরামর্শ দেওয়া হয়। অন্যদিকে, ফাইল পড়া বা নেটওয়ার্ক কমিউনিকেশনের মতো I/O-নির্ভর টাস্কের জন্য threading কার্যকরভাবে কাজ করে।
সারসংক্ষেপ
Python-এর threading মডিউল ব্যবহার করে, আপনি মাল্টিথ্রেডেড প্রোগ্রাম বাস্তবায়ন করতে পারেন এবং একাধিক প্রক্রিয়া একসঙ্গে সম্পাদন করতে পারেন। Lock এবং Condition-এর মত সিঙ্ক্রোনাইজেশন মেকানিজম ব্যবহার করে, আপনি শেয়ার্ড রিসোর্সগুলিতে নিরাপদে অ্যাক্সেস করতে এবং জটিল সিঙ্ক্রোনাইজেশন সম্পাদিত করতে পারেন। একই সঙ্গে, ডেমন থ্রেড বা ThreadPoolExecutor ব্যবহার করে থ্রেড পরিচালনা এবং দক্ষ প্যারালাল প্রক্রিয়াকরণ আরও সহজ হয়ে যায়।
আপনি আমাদের ইউটিউব চ্যানেলে ভিজ্যুয়াল স্টুডিও কোড ব্যবহার করে উপরের নিবন্ধটি অনুসরণ করতে পারেন। দয়া করে ইউটিউব চ্যানেলটিও দেখুন।