पायथन में `मल्टीप्रोसेसिंग` मॉड्यूल

पायथन में `मल्टीप्रोसेसिंग` मॉड्यूल

यह लेख पायथन में मल्टीप्रोसेसिंग मॉड्यूल को समझाता है।

यह लेख multiprocessing मॉड्यूल का उपयोग करके सुरक्षित और प्रभावी समांतर प्रोसेसिंग कोड लिखने के लिए व्यावहारिक सुझाव प्रस्तुत करता है।

YouTube Video

पायथन में मल्टीप्रोसेसिंग मॉड्यूल

मूल बातें: multiprocessing का उपयोग क्यों करें?

multiprocessing प्रक्रिया-आधारित समानांतरता सक्षम करता है, जिससे आप Python के GIL (Global Interpreter Lock) की सीमा के बिना CPU-बद्ध कार्यों को समानांतर बना सकते हैं। I/O-बाउंड कार्यों के लिए, threading या asyncio अधिक सरल और उपयुक्त हो सकते हैं।

Process का सरल उपयोग

सबसे पहले, यहाँ एक अलग प्रक्रिया में Process का उपयोग करके फ़ंक्शन चलाने का एक मौलिक उदाहरण दिया गया है। यह दिखाता है कि प्रक्रिया कैसे शुरू करें, उसकी समाप्ति तक प्रतीक्षा करें, और तर्क पास करें।

 1# Explanation:
 2# This example starts a separate process to run `worker` which prints messages.
 3# It demonstrates starting, joining, and passing arguments.
 4
 5from multiprocessing import Process
 6import time
 7
 8def worker(name, delay):
 9    # English comment in code per user's preference
10    for i in range(3):
11        print(f"Worker {name}: iteration {i}")
12        time.sleep(delay)
13
14if __name__ == "__main__":
15    p = Process(target=worker, args=("A", 0.5))
16    p.start()
17    print("Main: waiting for worker to finish")
18    p.join()
19    print("Main: worker finished")
  • यह कोड उस प्रवाह को दिखाता है जहाँ मुख्य प्रक्रिया एक उप-प्रक्रिया worker शुरू करती है और उसके पूरा होने तक join() का उपयोग करके प्रतीक्षा करती है। आप args का उपयोग करके तर्क (arguments) पास कर सकते हैं।

Pool (हाई-लेवल API) के साथ सरल समानांतरता

Pool.map तब उपयोगी होता है जब आप एक ही फ़ंक्शन को कई स्वतंत्र कार्यों पर लागू करना चाहते हैं। यह आपके लिए कार्यकर्ता प्रक्रियाओं का प्रबंध खुद करता है।

 1# Explanation:
 2# Use Pool.map to parallelize a CPU-bound function across available processes.
 3# Good for "embarrassingly parallel" workloads.
 4
 5from multiprocessing import Pool, cpu_count
 6import math
 7import time
 8
 9def is_prime(n):
10    # Check primality (inefficient but CPU-heavy for demo)
11    if n < 2:
12        return False
13    for i in range(2, int(math.sqrt(n)) + 1):
14        if n % i == 0:
15            return False
16    return True
17
18if __name__ == "__main__":
19    nums = [10_000_000 + i for i in range(50)]
20    start = time.time()
21    with Pool(processes=cpu_count()) as pool:
22        results = pool.map(is_prime, nums)
23    end = time.time()
24    print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")
  • Pool स्वचालित रूप से कार्यकर्ताओं की संख्या नियंत्रित कर सकता है, और map परिणामों को मूल क्रम में लौटाता है।

इंटरप्रोसेस संचार: Queue का उपयोग करके प्रोड्यूसर/कंज्यूमर पैटर्न

Queue एक First-In-First-Out (FIFO) कतार है जो प्रक्रियाओं के बीच वस्तुओं का सुरक्षित रूप से स्थानांतरण करती है। नीचे कुछ सामान्य पैटर्न दिए गए हैं।

 1# Explanation:
 2# Demonstrates a producer putting items into a Queue
 3# and consumer reading them.
 4# This is useful for task pipelines between processes.
 5
 6from multiprocessing import Process, Queue
 7import time
 8import random
 9
10def producer(q, n):
11    for i in range(n):
12        item = f"item-{i}"
13        print("Producer: putting", item)
14        q.put(item)
15        time.sleep(random.random() * 0.5)
16    q.put(None)  # sentinel to signal consumer to stop
17
18def consumer(q):
19    while True:
20        item = q.get()
21        if item is None:
22            break
23        print("Consumer: got", item)
24        time.sleep(0.2)
25
26if __name__ == "__main__":
27    q = Queue()
28    p = Process(target=producer, args=(q, 5))
29    c = Process(target=consumer, args=(q,))
30    p.start()
31    c.start()
32    p.join()
33    c.join()
34    print("Main: done")
  • Queue आपको प्रक्रियाओं के बीच डेटा सुरक्षित रूप से पास करने की सुविधा देता है। समाप्ति का संकेत देने के लिए None जैसी विशेष मान का उपयोग करना सामान्य है।

साझा मेमोरी: Value और Array

जब आप प्रक्रियाओं के बीच छोटे नंबर या एरे साझा करना चाहते हैं, तो आप Value और Array का उपयोग कर सकते हैं। संघर्ष से बचने के लिए आपको लॉक का उपयोग करना आवश्यक है।

 1# Explanation:
 2# Use Value to share a single integer counter
 3# and Array for a small numeric array.
 4# Show how to use a Lock to avoid race conditions.
 5
 6from multiprocessing import Process, Value, Array, Lock
 7import time
 8
 9def increment(counter, lock, times):
10    for _ in range(times):
11        with lock:
12            counter.value += 1
13
14def update_array(arr):
15    for i in range(len(arr)):
16        arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19    lock = Lock()
20    counter = Value('i', 0)  # 'i' = signed int
21    shared_arr = Array('i', [0, 0, 0])
22
23    p1 = Process(target=increment, args=(counter, lock, 1000))
24    p2 = Process(target=increment, args=(counter, lock, 1000))
25    a = Process(target=update_array, args=(shared_arr,))
26
27    p1.start(); p2.start(); a.start()
28    p1.join(); p2.join(); a.join()
29
30    print("Counter:", counter.value)
31    print("Array:", list(shared_arr))
  • Value और Array डेटा को प्रक्रियाओं के बीच निचले-स्तरीय यंत्रणाओं (C भाषा स्तर पर साझा मेमोरी) से साझा करते हैं, न कि स्वयं पाइथन से। इसलिए, यह थोड़ी मात्रा में डेटा को तेजी से पढ़ने और लिखने के लिए उपयुक्त है, लेकिन बड़ी मात्रा में डेटा को संभालने के लिए उपयुक्त नहीं है।

उन्नत साझाकरण: Manager के साथ साझा ऑब्जेक्ट्स (dicts, lists)

अगर आप सूची या डिक्शनरी जैसे और अधिक लचीले साझा ऑब्जेक्ट का उपयोग करना चाहते हैं, तो Manager() का उपयोग करें।

 1# Explanation:
 2# Manager provides proxy objects like dict/list
 3# that can be shared across processes.
 4# Good for moderate-size shared state and easier programming model.
 5
 6from multiprocessing import Process, Manager
 7import time
 8
 9def worker(shared_dict, key, value):
10    shared_dict[key] = value
11
12if __name__ == "__main__":
13    with Manager() as manager:
14        d = manager.dict()
15        processes = []
16        for i in range(5):
17            p = Process(target=worker, args=(d, f"k{i}", i*i))
18            p.start()
19            processes.append(p)
20        for p in processes:
21            p.join()
22        print("Shared dict:", dict(d))
  • Manager शब्दकोशों और सूचियों को साझा करने के लिए सुविधाजनक है, लेकिन हर बार उपयोग के दौरान डेटा प्रक्रियाओं के बीच भेजा जाता है और pickle रूपांतरण की आवश्यकता होती है। इसलिए, बार-बार बड़ी मात्रा में डेटा को अद्यतन करने से प्रोसेसिंग धीमी हो जाएगी।

समकालिकरण तंत्र: Lock और Semaphore का उपयोग कैसे करें

साझा संसाधनों तक समवर्ती पहुँच को नियंत्रित करने के लिए Lock या Semaphore का उपयोग करें। आप इन्हें with स्टेटमेंट के साथ संक्षिप्त रूप से उपयोग कर सकते हैं।

 1# Explanation:
 2# Demonstrates using Lock to prevent simultaneous access to a critical section.
 3# Locks are necessary when shared resources are not atomic.
 4
 5from multiprocessing import Process, Lock, Value
 6
 7def safe_add(counter, lock):
 8    for _ in range(10000):
 9        with lock:
10            counter.value += 1
11
12if __name__ == "__main__":
13    lock = Lock()
14    counter = Value('i', 0)
15    p1 = Process(target=safe_add, args=(counter, lock))
16    p2 = Process(target=safe_add, args=(counter, lock))
17    p1.start(); p2.start()
18    p1.join(); p2.join()
19    print("Counter:", counter.value)
  • लॉक्स डेटा रेस को रोकते हैं, लेकिन यदि लॉक किया गया क्षेत्र बहुत बड़ा है, तो समानांतर प्रोसेसिंग की प्रदर्शन क्षमता घट जाएगी। केवल आवश्यक भागों को ही क्रिटिकल सेक्शन के रूप में संरक्षित किया जाना चाहिए।

UNIX पर fork और Windows पर व्यवहार में अंतर

UNIX सिस्टमों पर, प्रक्रियाओं को डिफ़ॉल्ट रूप से fork के माध्यम से डुप्लिकेट किया जाता है, जिससे मेमोरी कॉपी ऑन राइट कुशल हो जाती है। Windows में प्रक्रियाएँ spawn का उपयोग करके शुरू होती हैं (जो मॉड्यूल्स को पुनः आयात करता है), इसलिए आपको प्रवेश बिंदु सुरक्षा और ग्लोबल इनिशियलाइज़ेशन का ध्यान रखना चाहिए।

 1# Explanation: Check start method (fork/spawn) and set it if needed.
 2# Useful for debugging platform-dependent behavior.
 3
 4from multiprocessing import get_start_method, set_start_method
 5
 6if __name__ == "__main__":
 7    print("Start method:", get_start_method())
 8
 9    # uncomment to force spawn on Unix for testing
10    # set_start_method('spawn')
  • set_start_method को केवल आपके प्रोग्राम की शुरुआत में एक बार ही कॉल किया जा सकता है। लाइब्रेरी के अंदर इसे मनमाने ढंग से बदलना सुरक्षित नहीं है।

व्यावहारिक उदाहरण: CPU-बाउंड कार्यभारों का बेंचमार्किंग (तुलना)

नीचे एक स्क्रिप्ट है जो यह आसानी से तुलना करती है कि multiprocessing का उपयोग कर समानांतर प्रसंस्करण कितना तेज हो सकता है। यहाँ, हम Pool का उपयोग करते हैं।

 1# Explanation:
 2# Compare sequential vs parallel execution times for CPU-bound task.
 3# Helps understand speedup and overhead.
 4
 5import time
 6from multiprocessing import Pool, cpu_count
 7import math
 8
 9def heavy_task(n):
10    s = 0
11    for i in range(1, n):
12        s += math.sqrt(i)
13    return s
14
15def run_sequential(nums):
16    return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19    with Pool(processes=cpu_count()) as p:
20        return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23    nums = [2000000] * 8  # heavy tasks
24    t0 = time.time()
25    run_sequential(nums)
26    seq = time.time() - t0
27    t1 = time.time()
28    run_parallel(nums)
29    par = time.time() - t1
30    print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")
  • यह उदाहरण दिखाता है कि कार्यभार और प्रक्रियाओं की संख्या के आधार पर, ओवरहेड के कारण कभी-कभी समानांतरता अप्रभावी हो सकती है। कार्य जितने बड़े (“भारी”) और स्वतंत्र होंगे, लाभ उतना अधिक होगा।

महत्वपूर्ण मौलिक नियम

नीचे multiprocessing को सुरक्षित और प्रभावी रूप से उपयोग करने के मूल बिंदु दिए गए हैं।

  • Windows में, जब चाइल्ड प्रोसेस शुरू होता है, तब मॉड्यूल्स पुनः आयात होते हैं, इसलिए आपको अपनी स्क्रिप्ट का एंट्री प्वाइंट if __name__ == "__main__": से सुरक्षित करना चाहिए।
  • इंटर-प्रोसेस संचार को क्रमबद्ध (serialize) किया जाता है (pickle रूपांतरण के साथ), इसलिए बड़े ऑब्जेक्ट्स का स्थानांतरण महंगा हो जाता है।
  • multiprocessing प्रक्रियाएँ बनाता है, इसलिए प्रक्रियाओं की संख्या का निर्धारण अक्सर multiprocessing.cpu_count() के आधार पर किया जाता है।
  • वर्कर के भीतर एक और Pool बनाना जटिल हो जाता है, इसलिए आपको जितना संभव हो सके Pool उदाहरणों को नेस्ट करने से बचना चाहिए।
  • चाइल्ड प्रक्रियाओं में होने वाले अपवादों का मुख्य प्रक्रिया से पता लगाना कठिन होता है, इसलिए लॉगिंग और त्रुटि प्रबंधन को स्पष्ट रूप से लागू करना आवश्यक है।
  • प्रक्रियाओं की संख्या CPU के अनुसार निर्धारित करें, और I/O-बाउंड कार्यों के लिए थ्रेड्स का उपयोग करने पर विचार करें।

व्यावहारिक डिज़ाइन परामर्श

समानांतर प्रोसेसिंग डिज़ाइन करने के लिए नीचे कुछ उपयोगी अवधारणाएँ और पैटर्न दिए गए हैं।

  • 'पाइपलाइनिंग' के माध्यम से प्रक्रियाओं को इनपुट रीडिंग (I/O), पूर्वप्रसंस्करण (मल्टी-CPU) और एग्रीगेशन (सीरियल) जैसी भूमिकाओं में विभाजित करना कुशल है।
  • डिबगिंग को सरल बनाने के लिए, समानांतर बनाए बिना पहले एकल प्रक्रिया में संचालन की जांच करें।
  • लॉगिंग के लिए, प्रत्येक प्रक्रिया का लॉग आउटपुट अलग रखें (जैसे, फाइल नामों में PID जोड़ें) ताकि समस्याओं को अलग-अलग पहचानना आसान हो।
  • पुनः प्रयास और टाइमआउट तंत्र तैयार रखें ताकि यदि कोई प्रक्रिया अटक जाए तो आप सुरक्षित रूप से पुनर्प्राप्त कर सकें।

सारांश (मुख्य बिंदु जिन्हें आप तुरंत उपयोग कर सकते हैं)

समानांतर प्रोसेसिंग शक्तिशाली है, लेकिन कार्यों की प्रकृति, डेटा का आकार और इंटर-प्रोसेस संचार लागत का सही आकलन करना महत्वपूर्ण है। multiprocessing CPU-बाउंड प्रोसेसिंग के लिए प्रभावी है, लेकिन खराब डिज़ाइन या सिंक्रोनाइज़ेशन में गलतियाँ प्रदर्शन घटा सकती हैं। यदि आप मूल नियमों और पैटर्न का पालन करते हैं, तो आप सुरक्षित और प्रभावी समानांतर प्रोग्राम बना सकते हैं।

आप हमारे YouTube चैनल पर Visual Studio Code का उपयोग करके ऊपर दिए गए लेख के साथ आगे बढ़ सकते हैं। कृपया YouTube चैनल को भी देखें।

YouTube Video