পাইথনের `multiprocessing` মডিউল
এই নিবন্ধটি পাইথনের multiprocessing মডিউলটি ব্যাখ্যা করে।
এই প্রবন্ধে multiprocessing মডিউল ব্যবহার করে নিরাপদ ও দক্ষ সমান্তরাল প্রসেসিং কোড লেখার জন্য ব্যবহারিক টিপস উপস্থাপন করা হয়েছে।
YouTube Video
পাইথনের multiprocessing মডিউল
ভিত্তি: কেন multiprocessing ব্যবহার করবেন?
multiprocessing প্রসেস ভিত্তিক সমান্তরালতা সক্ষম করে, তাই আপনি পাইথনের GIL (গ্লোবাল ইন্টারপ্রেটার লক)-এর সীমাবদ্ধতা ছাড়াই CPU-বাউন্ড কাজ সমান্তরালভাবে চালাতে পারেন। I/O-বাউন্ড কাজের জন্য, threading বা asyncio আরও সহজ এবং উপযুক্ত হতে পারে।
Process-এর সহজ ব্যবহার
প্রথমে, এখানে Process ব্যবহার করে একটি ফাংশন পৃথক প্রসেসে চালানোর একটি সাধারণ উদাহরণ দেওয়া হয়েছে। এটি দেখায় কিভাবে একটি প্রসেস শুরু করতে, তার সমাপ্তির জন্য অপেক্ষা করতে, এবং আর্গুমেন্ট পাঠাতে হয়।
1# Explanation:
2# This example starts a separate process to run `worker` which prints messages.
3# It demonstrates starting, joining, and passing arguments.
4
5from multiprocessing import Process
6import time
7
8def worker(name, delay):
9 # English comment in code per user's preference
10 for i in range(3):
11 print(f"Worker {name}: iteration {i}")
12 time.sleep(delay)
13
14if __name__ == "__main__":
15 p = Process(target=worker, args=("A", 0.5))
16 p.start()
17 print("Main: waiting for worker to finish")
18 p.join()
19 print("Main: worker finished")- এই কোডটি দেখায় কিভাবে প্রধান প্রসেস একটি চাইল্ড প্রসেস
workerশুরু করে এবং তার সমাপ্তির জন্যjoin()ব্যবহার করে অপেক্ষা করে। আপনিargsব্যবহার করে আর্গুমেন্ট পাঠাতে পারেন।
Pool (উচ্চ-স্তরের API)-এর মাধ্যমে সহজ সমান্তরালতা
যখন আপনি একই ফাংশন একাধিক স্বতন্ত্র টাস্কে প্রয়োগ করতে চান তখন Pool.map খুবই উপযোগী। এটি আপনার জন্য অভ্যন্তরীণভাবে কর্মী প্রসেস পরিচালনা করে।
1# Explanation:
2# Use Pool.map to parallelize a CPU-bound function across available processes.
3# Good for "embarrassingly parallel" workloads.
4
5from multiprocessing import Pool, cpu_count
6import math
7import time
8
9def is_prime(n):
10 # Check primality (inefficient but CPU-heavy for demo)
11 if n < 2:
12 return False
13 for i in range(2, int(math.sqrt(n)) + 1):
14 if n % i == 0:
15 return False
16 return True
17
18if __name__ == "__main__":
19 nums = [10_000_000 + i for i in range(50)]
20 start = time.time()
21 with Pool(processes=cpu_count()) as pool:
22 results = pool.map(is_prime, nums)
23 end = time.time()
24 print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")Poolস্বয়ংক্রিয়ভাবে কর্মীর সংখ্যা নিয়ন্ত্রণ করতে পারে, এবংmapমূল ক্রমে রেজাল্ট ফেরত দেয়।
ইন্টার-প্রসেস যোগাযোগ: Queue ব্যবহার করে প্রোডিউসার/কনজিউমার প্যাটার্ন
Queue হল একটি First-In-First-Out (FIFO) কিউ, যা প্রসেসগুলির মধ্যে অবজেক্ট নিরাপদে স্থানান্তর করে। নিম্নে কিছু সাধারণ প্যাটার্ন দেওয়া হল।
1# Explanation:
2# Demonstrates a producer putting items into a Queue
3# and consumer reading them.
4# This is useful for task pipelines between processes.
5
6from multiprocessing import Process, Queue
7import time
8import random
9
10def producer(q, n):
11 for i in range(n):
12 item = f"item-{i}"
13 print("Producer: putting", item)
14 q.put(item)
15 time.sleep(random.random() * 0.5)
16 q.put(None) # sentinel to signal consumer to stop
17
18def consumer(q):
19 while True:
20 item = q.get()
21 if item is None:
22 break
23 print("Consumer: got", item)
24 time.sleep(0.2)
25
26if __name__ == "__main__":
27 q = Queue()
28 p = Process(target=producer, args=(q, 5))
29 c = Process(target=consumer, args=(q,))
30 p.start()
31 c.start()
32 p.join()
33 c.join()
34 print("Main: done")Queueআপনাকে নিরাপদে প্রসেসগুলোর মধ্যে ডাটা আদান-প্রদান করতে দেয়। সমাপ্তি সংকেত দেওয়ার জন্যNoneএর মতো একটি বিশেষ মান ব্যবহার করা সাধারণ ব্যাপার।
শেয়ার্ড মেমোরি: Value এবং Array
আপনি যদি বিভিন্ন প্রসেসের মধ্যে ছোট সংখ্যা বা অ্যারে ভাগ করতে চান তাহলে Value এবং Array ব্যবহার করতে পারেন। সংঘাত এড়াতে আপনাকে লক ব্যবহার করতে হবে।
1# Explanation:
2# Use Value to share a single integer counter
3# and Array for a small numeric array.
4# Show how to use a Lock to avoid race conditions.
5
6from multiprocessing import Process, Value, Array, Lock
7import time
8
9def increment(counter, lock, times):
10 for _ in range(times):
11 with lock:
12 counter.value += 1
13
14def update_array(arr):
15 for i in range(len(arr)):
16 arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19 lock = Lock()
20 counter = Value('i', 0) # 'i' = signed int
21 shared_arr = Array('i', [0, 0, 0])
22
23 p1 = Process(target=increment, args=(counter, lock, 1000))
24 p2 = Process(target=increment, args=(counter, lock, 1000))
25 a = Process(target=update_array, args=(shared_arr,))
26
27 p1.start(); p2.start(); a.start()
28 p1.join(); p2.join(); a.join()
29
30 print("Counter:", counter.value)
31 print("Array:", list(shared_arr))ValueএবংArrayনিম্ন-স্তরের মেকানিজম (C ভাষার স্তরে শেয়ার্ড মেমোরি) ব্যবহার করে ডেটা শেয়ার করে, পাইথন নিজে নয়। অতএব, এটি দ্রুত অল্প পরিমাণ ডেটা পড়া ও লেখা জন্য উপযুক্ত, কিন্তু বড় পরিমাণ ডেটা পরিচালনার জন্য উপযুক্ত নয়।।
উন্নত ভাগাভাগি: Manager-এর মাধ্যমে ভাগ করা অবজেক্ট (ডিকশনারি, লিস্ট)
যদি আপনি আরও নমনীয় অবজেক্ট (লিস্ট, ডিকশনারি ইত্যাদি) শেয়ার করতে চান, তাহলে Manager() ব্যবহার করুন।
1# Explanation:
2# Manager provides proxy objects like dict/list
3# that can be shared across processes.
4# Good for moderate-size shared state and easier programming model.
5
6from multiprocessing import Process, Manager
7import time
8
9def worker(shared_dict, key, value):
10 shared_dict[key] = value
11
12if __name__ == "__main__":
13 with Manager() as manager:
14 d = manager.dict()
15 processes = []
16 for i in range(5):
17 p = Process(target=worker, args=(d, f"k{i}", i*i))
18 p.start()
19 processes.append(p)
20 for p in processes:
21 p.join()
22 print("Shared dict:", dict(d))Managerডিকশনারি এবং লিস্ট ভাগ করে নেওয়ার জন্য সুবিধাজনক, কিন্তু প্রতিবার অ্যাক্সেস করার সময় ডেটা প্রসেসগুলোর মধ্যে পাঠায় এবংpickleরূপান্তর প্রয়োজন হয়। অতএব, বড় পরিমাণ ডেটা বারবার আপডেট করা হলে প্রক্রিয়াকরণ ধীর হয়ে যাবে।
সিঙ্ক্রোনাইজেশন মেকানিজম: কিভাবে Lock এবং Semaphore ব্যবহার করবেন
Lock বা Semaphore ব্যবহার করে ভাগাভাগি করা রিসোর্সে একযোগে অ্যাক্সেস নিয়ন্ত্রণ করুন। আপনি এগুলোকে with স্টেটমেন্টের সাথে সংক্ষেপে ব্যবহার করতে পারেন।
1# Explanation:
2# Demonstrates using Lock to prevent simultaneous access to a critical section.
3# Locks are necessary when shared resources are not atomic.
4
5from multiprocessing import Process, Lock, Value
6
7def safe_add(counter, lock):
8 for _ in range(10000):
9 with lock:
10 counter.value += 1
11
12if __name__ == "__main__":
13 lock = Lock()
14 counter = Value('i', 0)
15 p1 = Process(target=safe_add, args=(counter, lock))
16 p2 = Process(target=safe_add, args=(counter, lock))
17 p1.start(); p2.start()
18 p1.join(); p2.join()
19 print("Counter:", counter.value)- লক ডেটা রেস প্রতিরোধ করে, কিন্তু যদি লক করা অংশটি খুব বড় হয়, তাহলে সমান্তরাল প্রক্রিয়াকরণের কর্মদক্ষতা কমে যাবে। শুধুমাত্র প্রয়োজনীয় অংশগুলি ক্রিটিক্যাল সেকশন হিসেবে সুরক্ষিত করা উচিত।
UNIX-এ fork এবং Windows-এ আচরণের পার্থক্য
UNIX সিস্টেমে প্রসেসগুলো ডিফল্টভাবে fork ব্যবহার করে ডুপ্লিকেট হয়, যা মেমোরির জন্য কপি-অন-রাইট কার্যকর করে তোলে। Windows এ প্রসেসগুলো spawn (মডিউলগুলি পুনরায় ইম্পোর্ট করে) ব্যবহার করে শুরু হয়, তাই এন্ট্রি পয়েন্ট নিরাপত্তা এবং গ্লোবাল ইনিশিয়ালাইজেশনে সতর্ক থাকতে হয়।
1# Explanation: Check start method (fork/spawn) and set it if needed.
2# Useful for debugging platform-dependent behavior.
3
4from multiprocessing import get_start_method, set_start_method
5
6if __name__ == "__main__":
7 print("Start method:", get_start_method())
8
9 # uncomment to force spawn on Unix for testing
10 # set_start_method('spawn')set_start_methodশুধুমাত্র একবার, প্রোগ্রামের শুরুতে কল করা যায়। লাইব্রেরির ভেতরে এটিকে ইচ্ছামতো পরিবর্তন না করাই নিরাপদ।
ব্যবহারিক উদাহরণ: CPU-বাউন্ড ওয়ার্কলোডের বেঞ্চমার্কিং (তুলনা)
নীচে একটি স্ক্রিপ্ট দেওয়া হলো যা কেবলমাত্র দেখাতে পারে multiprocessing ব্যবহার করে প্রসেসিং কীভাবে আরও দ্রুতগতিতে হতে পারে। এখানে আমরা Pool ব্যবহার করি।
1# Explanation:
2# Compare sequential vs parallel execution times for CPU-bound task.
3# Helps understand speedup and overhead.
4
5import time
6from multiprocessing import Pool, cpu_count
7import math
8
9def heavy_task(n):
10 s = 0
11 for i in range(1, n):
12 s += math.sqrt(i)
13 return s
14
15def run_sequential(nums):
16 return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19 with Pool(processes=cpu_count()) as p:
20 return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23 nums = [2000000] * 8 # heavy tasks
24 t0 = time.time()
25 run_sequential(nums)
26 seq = time.time() - t0
27 t1 = time.time()
28 run_parallel(nums)
29 par = time.time() - t1
30 print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")- এই উদাহরণ দেখায় যে, টাস্ক লোড ও প্রসেস সংখ্যা নির্ভর করে, ওভারহেডের কারণে কখনো কখনো সমান্তরালতা অকার্যকর হতে পারে। কাজ যত বড় ও স্বতন্ত্র হবে, তত বেশি লাভ হবে।
গুরুত্বপূর্ণ মৌলিক নিয়মাবলি
multiprocessing নিরাপদ এবং কার্যকরভাবে ব্যবহারের মূল বিষয়গুলো নিচে দেওয়া হলো।
- Windows এ চাইল্ড প্রসেস শুরু হলে মডিউলগুলো পুনরায় ইম্পোর্ট হয়, তাই আপনার স্ক্রিপ্টের এন্ট্রি পয়েন্টে অবশ্যই
if __name__ == "__main__":ব্যবহার করুন। - ইন্টার-প্রসেস কমিউনিকেশন সিরিয়ালাইজড (
pickleরূপান্তরের মাধ্যমে) হয়, তাই বড় অবজেক্ট স্থানান্তর ব্যয়বহুল হয়ে যায়। multiprocessingপ্রসেস তৈরি করে, তাই সাধারণতmultiprocessing.cpu_count()অনুযায়ী প্রসেস সংখ্যা নির্ধারণ করা হয়।- একটি worker-এর মধ্যে আরেকটি
Poolতৈরি করা জটিল হয়ে যায়, তাই যতটা সম্ভবPoolএর nested instances তৈরি এড়ানো উচিত। - চাইল্ড প্রসেসে যে এক্সেপশনগুলি ঘটে, তা মূল প্রসেস থেকে সনাক্ত করা কঠিন, তাই স্পষ্টভাবে লগিং এবং এরর হ্যান্ডলিং বাস্তবায়ন করা প্রয়োজন।
- CPU অনুযায়ী প্রসেস সংখ্যা নির্ধারণ করুন এবং I/O-বাউন্ড কাজের জন্য থ্রেড ব্যবহার বিবেচনা করুন।
প্র্যাকটিক্যাল ডিজাইন পরামর্শ
সমান্তরাল প্রক্রিয়াকরণ ডিজাইন করার জন্য কিছু উপযোগী ধারণা ও প্যাটার্ন নিচে দেওয়া হল।
- 'পাইপলাইনিং'-এর মাধ্যমে ইনপুট পড়া (I/O), প্রিপ্রসেসিং (মাল্টি-CPU), এবং এগ্রিগেশন (সিরিয়াল)-এর মতো রোলে প্রসেসগুলো ভাগ করে নেওয়া কার্যকর।
- ডিবাগিং সহজ করতে, সমান্তরাল করার আগে একক প্রসেসে চলা যাচাই করুন।
- লগিংয়ের জন্য, প্রতি প্রসেস আলাদা লগ আউটপুট করুন (যেমন, ফাইল নেমে PID ব্যবহার করুন) যাতে সমস্যা চিহ্নিত করা সহজ হয়।
- প্রত্যাবর্তন (retry) এবং টাইমআউট মেকানিজম প্রস্তুত রাখুন যাতে কোনো প্রসেস হ্যাং করলে নিরাপদে পুনরুদ্ধার করা যায়।
সারসংক্ষেপ (তাৎক্ষণিক ব্যবহারের জন্য মূল পয়েন্টগুলো)
সমান্তরাল প্রসেসিং শক্তিশালী, তবে কাজের প্রকৃতি, ডেটার আকার ও ইন্টার-প্রসেস যোগাযোগের খরচ সঠিকভাবে মূল্যায়ন করা গুরুত্বপূর্ণ। multiprocessing CPU-বাউন্ড প্রসেসিংয়ের জন্য কার্যকর, তবে খারাপ ডিজাইন বা সমন্বয় সংক্রান্ত ভুলে কর্মদক্ষতা কমে যেতে পারে। আপনি যদি মৌলিক নিয়ম ও প্যাটার্ন অনুসরণ করেন, তাহলে আপনি নিরাপদ ও দক্ষ সমান্তরাল প্রোগ্রাম তৈরি করতে পারবেন।
আপনি আমাদের ইউটিউব চ্যানেলে ভিজ্যুয়াল স্টুডিও কোড ব্যবহার করে উপরের নিবন্ধটি অনুসরণ করতে পারেন। দয়া করে ইউটিউব চ্যানেলটিও দেখুন।