Le module `multiprocessing` en Python

Le module `multiprocessing` en Python

Cet article explique le module multiprocessing en Python.

Cet article présente des conseils pratiques pour écrire un code de traitement parallèle sûr et efficace en utilisant le module multiprocessing.

YouTube Video

Le module multiprocessing en Python

Bases : Pourquoi utiliser multiprocessing ?

multiprocessing permet la parallélisation au niveau des processus, vous pouvez donc paralléliser des tâches gourmandes en CPU sans être limité par le GIL (Global Interpreter Lock) de Python. Pour les tâches liées à l'I/O, threading ou asyncio peuvent être plus simples et plus adaptés.

Utilisation simple de Process

Tout d'abord, voici un exemple de base d'exécution d'une fonction dans un processus séparé en utilisant Process. Cela montre comment démarrer un processus, attendre sa fin et passer des arguments.

 1# Explanation:
 2# This example starts a separate process to run `worker` which prints messages.
 3# It demonstrates starting, joining, and passing arguments.
 4
 5from multiprocessing import Process
 6import time
 7
 8def worker(name, delay):
 9    # English comment in code per user's preference
10    for i in range(3):
11        print(f"Worker {name}: iteration {i}")
12        time.sleep(delay)
13
14if __name__ == "__main__":
15    p = Process(target=worker, args=("A", 0.5))
16    p.start()
17    print("Main: waiting for worker to finish")
18    p.join()
19    print("Main: worker finished")
  • Ce code montre le déroulement où le processus principal lance un processus enfant worker et attend sa fin avec join(). Vous pouvez passer des arguments avec args.

Parallélisation simple avec Pool (API de haut niveau)

Pool.map est utile lorsque vous souhaitez appliquer la même fonction à plusieurs tâches indépendantes. Il gère les processus de travail en interne pour vous.

 1# Explanation:
 2# Use Pool.map to parallelize a CPU-bound function across available processes.
 3# Good for "embarrassingly parallel" workloads.
 4
 5from multiprocessing import Pool, cpu_count
 6import math
 7import time
 8
 9def is_prime(n):
10    # Check primality (inefficient but CPU-heavy for demo)
11    if n < 2:
12        return False
13    for i in range(2, int(math.sqrt(n)) + 1):
14        if n % i == 0:
15            return False
16    return True
17
18if __name__ == "__main__":
19    nums = [10_000_000 + i for i in range(50)]
20    start = time.time()
21    with Pool(processes=cpu_count()) as pool:
22        results = pool.map(is_prime, nums)
23    end = time.time()
24    print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")
  • Pool peut contrôler automatiquement le nombre de travailleurs, et map renvoie les résultats dans l'ordre d'origine.

Communication inter-processus : modèle Producteur/Consommateur utilisant Queue

Queue est une file d'attente Premier Entré, Premier Sorti (FIFO) qui transfère en toute sécurité des objets entre les processus. Voici quelques modèles typiques.

 1# Explanation:
 2# Demonstrates a producer putting items into a Queue
 3# and consumer reading them.
 4# This is useful for task pipelines between processes.
 5
 6from multiprocessing import Process, Queue
 7import time
 8import random
 9
10def producer(q, n):
11    for i in range(n):
12        item = f"item-{i}"
13        print("Producer: putting", item)
14        q.put(item)
15        time.sleep(random.random() * 0.5)
16    q.put(None)  # sentinel to signal consumer to stop
17
18def consumer(q):
19    while True:
20        item = q.get()
21        if item is None:
22            break
23        print("Consumer: got", item)
24        time.sleep(0.2)
25
26if __name__ == "__main__":
27    q = Queue()
28    p = Process(target=producer, args=(q, 5))
29    c = Process(target=consumer, args=(q,))
30    p.start()
31    c.start()
32    p.join()
33    c.join()
34    print("Main: done")
  • Queue permet de transmettre les données en toute sécurité entre les processus. Il est courant d'utiliser une valeur spéciale comme None pour signaler la terminaison.

Mémoire partagée : Value et Array

Vous pouvez utiliser Value et Array lorsque vous souhaitez partager des nombres ou des tableaux entre les processus. Vous devez utiliser des verrous pour éviter les conflits.

 1# Explanation:
 2# Use Value to share a single integer counter
 3# and Array for a small numeric array.
 4# Show how to use a Lock to avoid race conditions.
 5
 6from multiprocessing import Process, Value, Array, Lock
 7import time
 8
 9def increment(counter, lock, times):
10    for _ in range(times):
11        with lock:
12            counter.value += 1
13
14def update_array(arr):
15    for i in range(len(arr)):
16        arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19    lock = Lock()
20    counter = Value('i', 0)  # 'i' = signed int
21    shared_arr = Array('i', [0, 0, 0])
22
23    p1 = Process(target=increment, args=(counter, lock, 1000))
24    p2 = Process(target=increment, args=(counter, lock, 1000))
25    a = Process(target=update_array, args=(shared_arr,))
26
27    p1.start(); p2.start(); a.start()
28    p1.join(); p2.join(); a.join()
29
30    print("Counter:", counter.value)
31    print("Array:", list(shared_arr))
  • Value et Array partagent les données entre processus en utilisant des mécanismes de bas niveau (mémoire partagée au niveau du langage C), et non Python lui-même. Par conséquent, il convient pour lire et écrire rapidement de petites quantités de données, mais il n'est pas adapté au traitement de grandes quantités de données..

Partage avancé : objets partagés (dict, list) avec Manager

Si vous souhaitez utiliser des objets partagés plus flexibles comme des listes ou des dictionnaires, utilisez Manager().

 1# Explanation:
 2# Manager provides proxy objects like dict/list
 3# that can be shared across processes.
 4# Good for moderate-size shared state and easier programming model.
 5
 6from multiprocessing import Process, Manager
 7import time
 8
 9def worker(shared_dict, key, value):
10    shared_dict[key] = value
11
12if __name__ == "__main__":
13    with Manager() as manager:
14        d = manager.dict()
15        processes = []
16        for i in range(5):
17            p = Process(target=worker, args=(d, f"k{i}", i*i))
18            p.start()
19            processes.append(p)
20        for p in processes:
21            p.join()
22        print("Shared dict:", dict(d))
  • Manager est pratique pour partager des dictionnaires et des listes, mais chaque accès envoie des données entre les processus et nécessite une conversion avec pickle. Par conséquent, la mise à jour fréquente de grandes quantités de données ralentira le traitement.

Mécanismes de synchronisation : comment utiliser Lock et Semaphore

Utilisez Lock ou Semaphore pour contrôler l'accès concurrent aux ressources partagées. Vous pouvez les utiliser de manière concise avec l'instruction with.

 1# Explanation:
 2# Demonstrates using Lock to prevent simultaneous access to a critical section.
 3# Locks are necessary when shared resources are not atomic.
 4
 5from multiprocessing import Process, Lock, Value
 6
 7def safe_add(counter, lock):
 8    for _ in range(10000):
 9        with lock:
10            counter.value += 1
11
12if __name__ == "__main__":
13    lock = Lock()
14    counter = Value('i', 0)
15    p1 = Process(target=safe_add, args=(counter, lock))
16    p2 = Process(target=safe_add, args=(counter, lock))
17    p1.start(); p2.start()
18    p1.join(); p2.join()
19    print("Counter:", counter.value)
  • Les verrous empêchent les conditions de concurrence sur les données, mais si la zone verrouillée est trop grande, la performance du traitement parallèle diminuera. Seules les parties nécessaires doivent être protégées en tant que section critique.

Différences entre fork sous UNIX et le comportement sous Windows

Sur les systèmes UNIX, les processus sont dupliqués par défaut avec fork, ce qui rend la gestion copy-on-write de la mémoire efficace. Windows lance les processus avec spawn (qui réimporte les modules), il faut donc faire attention à la protection du point d'entrée et à l'initialisation globale.

 1# Explanation: Check start method (fork/spawn) and set it if needed.
 2# Useful for debugging platform-dependent behavior.
 3
 4from multiprocessing import get_start_method, set_start_method
 5
 6if __name__ == "__main__":
 7    print("Start method:", get_start_method())
 8
 9    # uncomment to force spawn on Unix for testing
10    # set_start_method('spawn')
  • set_start_method ne peut être appelé qu'une seule fois, au début de votre programme. Il est plus sûr de ne pas modifier cela arbitrairement dans les bibliothèques.

Exemple pratique : Benchmark de charges de travail CPU-bound (comparaison)

Ci-dessous se trouve un script qui compare simplement à quel point le traitement peut être plus rapide avec la parallélisation utilisant multiprocessing. Ici, nous utilisons Pool.

 1# Explanation:
 2# Compare sequential vs parallel execution times for CPU-bound task.
 3# Helps understand speedup and overhead.
 4
 5import time
 6from multiprocessing import Pool, cpu_count
 7import math
 8
 9def heavy_task(n):
10    s = 0
11    for i in range(1, n):
12        s += math.sqrt(i)
13    return s
14
15def run_sequential(nums):
16    return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19    with Pool(processes=cpu_count()) as p:
20        return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23    nums = [2000000] * 8  # heavy tasks
24    t0 = time.time()
25    run_sequential(nums)
26    seq = time.time() - t0
27    t1 = time.time()
28    run_parallel(nums)
29    par = time.time() - t1
30    print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")
  • Cet exemple montre qu'en fonction de la charge de travail des tâches et du nombre de processus, la parallélisation peut parfois être inefficace à cause de la surcharge. Plus les tâches sont lourdes et indépendantes, plus le gain est important.

Règles de base importantes

Voici les points de base pour utiliser multiprocessing de manière sûre et efficace.

  • Sous Windows, les modules sont réimportés lors du démarrage des processus enfants ; il faut donc protéger l'entrée de votre script avec if __name__ == "__main__":.
  • La communication inter-processus est sérialisée (avec conversion pickle), donc le transfert de gros objets devient coûteux.
  • Puisque multiprocessing crée des processus, il est courant de choisir le nombre de processus selon multiprocessing.cpu_count().
  • Créer un autre Pool dans un worker devient complexe, il faut donc éviter d’imbriquer autant que possible les instances de Pool.
  • Puisque les exceptions survenant dans les processus enfants sont difficiles à détecter depuis le processus principal, il est nécessaire d’implémenter explicitement la journalisation et la gestion des erreurs.
  • Ajustez le nombre de processus en fonction du CPU, et envisagez d'utiliser des threads pour les tâches I/O.

Conseils pratiques de conception

Vous trouverez ci-dessous quelques concepts et modèles utiles pour la conception du traitement parallèle.

  • Il est efficace de séparer les processus selon les rôles : lecture des entrées (I/O), prétraitement (multi-CPU), agrégation (série), en utilisant le ‘pipelining’.
  • Pour faciliter le débogage, vérifiez d'abord le fonctionnement en mode séquentiel avant de paralléliser.
  • Pour les journaux, séparez les sorties de logs par processus (par exemple, en incluant le PID dans les noms de fichiers) pour faciliter la localisation des erreurs.
  • Prévoyez des mécanismes de reprise et de timeout afin de pouvoir récupérer en toute sécurité même si un processus se bloque.

Résumé (Points clés à retenir immédiatement)

Le traitement parallèle est puissant, mais il est important de bien évaluer la nature des tâches, la taille des données et le coût de la communication entre processus. multiprocessing est efficace pour les traitements CPU-bound, mais une mauvaise conception ou des erreurs de synchronisation peuvent réduire les performances. Si vous suivez les règles et modèles de base, vous pourrez créer des programmes parallèles sûrs et efficaces.

Vous pouvez suivre l'article ci-dessus avec Visual Studio Code sur notre chaîne YouTube. Veuillez également consulter la chaîne YouTube.

YouTube Video