O módulo `multiprocessing` no Python

O módulo `multiprocessing` no Python

Este artigo explica o módulo multiprocessing no Python.

Este artigo apresenta dicas práticas para escrever código de processamento paralelo seguro e eficiente usando o módulo multiprocessing.

YouTube Video

O módulo multiprocessing no Python

Básico: Por que usar o multiprocessing?

O multiprocessing possibilita a paralelização com base em processos, então você pode paralelizar tarefas que exigem muito da CPU sem ser restringido pelo GIL (Global Interpreter Lock) do Python. Para tarefas limitadas por I/O, threading ou asyncio podem ser mais simples e adequados.

Uso simples do Process

Primeiramente, aqui está um exemplo básico de como executar uma função em um processo separado usando Process. Isso demonstra como iniciar um processo, esperar por sua conclusão e passar argumentos.

 1# Explanation:
 2# This example starts a separate process to run `worker` which prints messages.
 3# It demonstrates starting, joining, and passing arguments.
 4
 5from multiprocessing import Process
 6import time
 7
 8def worker(name, delay):
 9    # English comment in code per user's preference
10    for i in range(3):
11        print(f"Worker {name}: iteration {i}")
12        time.sleep(delay)
13
14if __name__ == "__main__":
15    p = Process(target=worker, args=("A", 0.5))
16    p.start()
17    print("Main: waiting for worker to finish")
18    p.join()
19    print("Main: worker finished")
  • Este código mostra o fluxo em que o processo principal inicia um processo filho worker e aguarda sua conclusão usando join(). Você pode passar argumentos usando args.

Paralelização simples com Pool (API de alto nível)

O Pool.map é útil quando você deseja aplicar a mesma função a várias tarefas independentes. Ele gerencia os processos de trabalho internamente para você.

 1# Explanation:
 2# Use Pool.map to parallelize a CPU-bound function across available processes.
 3# Good for "embarrassingly parallel" workloads.
 4
 5from multiprocessing import Pool, cpu_count
 6import math
 7import time
 8
 9def is_prime(n):
10    # Check primality (inefficient but CPU-heavy for demo)
11    if n < 2:
12        return False
13    for i in range(2, int(math.sqrt(n)) + 1):
14        if n % i == 0:
15            return False
16    return True
17
18if __name__ == "__main__":
19    nums = [10_000_000 + i for i in range(50)]
20    start = time.time()
21    with Pool(processes=cpu_count()) as pool:
22        results = pool.map(is_prime, nums)
23    end = time.time()
24    print(f"Found primes: {sum(results)} / {len(nums)} in {end-start:.2f}s")
  • O Pool pode controlar automaticamente o número de trabalhadores, e o map retorna os resultados na ordem original.

Comunicação entre processos: padrão Produtor/Consumidor usando Queue

Queue é uma fila Primeiro a Entrar, Primeiro a Sair (FIFO) que transfere objetos com segurança entre processos. Abaixo estão alguns padrões típicos.

 1# Explanation:
 2# Demonstrates a producer putting items into a Queue
 3# and consumer reading them.
 4# This is useful for task pipelines between processes.
 5
 6from multiprocessing import Process, Queue
 7import time
 8import random
 9
10def producer(q, n):
11    for i in range(n):
12        item = f"item-{i}"
13        print("Producer: putting", item)
14        q.put(item)
15        time.sleep(random.random() * 0.5)
16    q.put(None)  # sentinel to signal consumer to stop
17
18def consumer(q):
19    while True:
20        item = q.get()
21        if item is None:
22            break
23        print("Consumer: got", item)
24        time.sleep(0.2)
25
26if __name__ == "__main__":
27    q = Queue()
28    p = Process(target=producer, args=(q, 5))
29    c = Process(target=consumer, args=(q,))
30    p.start()
31    c.start()
32    p.join()
33    c.join()
34    print("Main: done")
  • O Queue permite que você transmita dados com segurança entre processos. É comum usar um valor especial como None para sinalizar a terminação.

Memória compartilhada: Value e Array

Você pode usar Value e Array quando quiser compartilhar pequenos números ou arrays entre processos. Você precisa usar locks para evitar conflitos.

 1# Explanation:
 2# Use Value to share a single integer counter
 3# and Array for a small numeric array.
 4# Show how to use a Lock to avoid race conditions.
 5
 6from multiprocessing import Process, Value, Array, Lock
 7import time
 8
 9def increment(counter, lock, times):
10    for _ in range(times):
11        with lock:
12            counter.value += 1
13
14def update_array(arr):
15    for i in range(len(arr)):
16        arr[i] = arr[i] + 1
17
18if __name__ == "__main__":
19    lock = Lock()
20    counter = Value('i', 0)  # 'i' = signed int
21    shared_arr = Array('i', [0, 0, 0])
22
23    p1 = Process(target=increment, args=(counter, lock, 1000))
24    p2 = Process(target=increment, args=(counter, lock, 1000))
25    a = Process(target=update_array, args=(shared_arr,))
26
27    p1.start(); p2.start(); a.start()
28    p1.join(); p2.join(); a.join()
29
30    print("Counter:", counter.value)
31    print("Array:", list(shared_arr))
  • Value e Array compartilham dados entre processos usando mecanismos de baixo nível (memória compartilhada no nível da linguagem C), e não o próprio Python. Portanto, é adequado para ler e escrever pequenas quantidades de dados rapidamente, mas não é adequado para lidar com grandes volumes de dados..

Compartilhamento avançado: Objetos compartilhados (dicts, listas) com Manager

Se você deseja usar objetos compartilhados mais flexíveis como listas ou dicionários, utilize Manager().

 1# Explanation:
 2# Manager provides proxy objects like dict/list
 3# that can be shared across processes.
 4# Good for moderate-size shared state and easier programming model.
 5
 6from multiprocessing import Process, Manager
 7import time
 8
 9def worker(shared_dict, key, value):
10    shared_dict[key] = value
11
12if __name__ == "__main__":
13    with Manager() as manager:
14        d = manager.dict()
15        processes = []
16        for i in range(5):
17            p = Process(target=worker, args=(d, f"k{i}", i*i))
18            p.start()
19            processes.append(p)
20        for p in processes:
21            p.join()
22        print("Shared dict:", dict(d))
  • Manager é conveniente para compartilhar dicionários e listas, mas cada acesso envia dados entre processos e exige conversão com pickle. Portanto, atualizar grandes quantidades de dados com frequência irá desacelerar o processamento.

Mecanismos de sincronização: como usar Lock e Semaphore

Use Lock ou Semaphore para controlar o acesso concorrente a recursos compartilhados. Você pode usá-los de forma sucinta com a instrução with.

 1# Explanation:
 2# Demonstrates using Lock to prevent simultaneous access to a critical section.
 3# Locks are necessary when shared resources are not atomic.
 4
 5from multiprocessing import Process, Lock, Value
 6
 7def safe_add(counter, lock):
 8    for _ in range(10000):
 9        with lock:
10            counter.value += 1
11
12if __name__ == "__main__":
13    lock = Lock()
14    counter = Value('i', 0)
15    p1 = Process(target=safe_add, args=(counter, lock))
16    p2 = Process(target=safe_add, args=(counter, lock))
17    p1.start(); p2.start()
18    p1.join(); p2.join()
19    print("Counter:", counter.value)
  • Os locks previnem condições de corrida, mas se a região bloqueada for muito grande, o desempenho do processamento paralelo diminuirá. Apenas as partes necessárias devem ser protegidas como seção crítica.

Diferenças entre fork no UNIX e o comportamento no Windows

Em sistemas UNIX, os processos são duplicados usando fork por padrão, tornando a cópia-sob-escrita da memória eficiente. O Windows inicia os processos usando spawn (que reimporta os módulos), então é preciso tomar cuidado com a proteção do ponto de entrada e a inicialização global.

 1# Explanation: Check start method (fork/spawn) and set it if needed.
 2# Useful for debugging platform-dependent behavior.
 3
 4from multiprocessing import get_start_method, set_start_method
 5
 6if __name__ == "__main__":
 7    print("Start method:", get_start_method())
 8
 9    # uncomment to force spawn on Unix for testing
10    # set_start_method('spawn')
  • set_start_method só pode ser chamado uma vez no início do seu programa. É mais seguro não alterar isso arbitrariamente em bibliotecas.

Exemplo prático: Benchmark de cargas de trabalho limitadas pela CPU (comparação)

Abaixo está um script que simplesmente compara o quanto o processamento pode ser mais rápido com a paralelização usando multiprocessing. Aqui, usamos Pool.

 1# Explanation:
 2# Compare sequential vs parallel execution times for CPU-bound task.
 3# Helps understand speedup and overhead.
 4
 5import time
 6from multiprocessing import Pool, cpu_count
 7import math
 8
 9def heavy_task(n):
10    s = 0
11    for i in range(1, n):
12        s += math.sqrt(i)
13    return s
14
15def run_sequential(nums):
16    return [heavy_task(n) for n in nums]
17
18def run_parallel(nums):
19    with Pool(processes=cpu_count()) as p:
20        return p.map(heavy_task, nums)
21
22if __name__ == "__main__":
23    nums = [2000000] * 8  # heavy tasks
24    t0 = time.time()
25    run_sequential(nums)
26    seq = time.time() - t0
27    t1 = time.time()
28    run_parallel(nums)
29    par = time.time() - t1
30    print(f"Sequential: {seq:.2f}s, Parallel: {par:.2f}s")
  • Este exemplo mostra que, dependendo da carga das tarefas e do número de processos, a paralelização pode, às vezes, ser ineficaz devido ao overhead. Quanto maiores e mais independentes as tarefas, maior o benefício.

Regras básicas importantes

Abaixo estão os pontos básicos para usar o multiprocessing de forma segura e eficiente.

  • No Windows, os módulos são reimportados quando os processos filhos iniciam, então é necessário proteger o ponto de entrada do seu script com if __name__ == "__main__":.
  • A comunicação entre processos é serializada (com conversão pickle), portanto a transferência de objetos grandes se torna custosa.
  • Como o multiprocessing cria processos, é comum decidir o número de processos com base em multiprocessing.cpu_count().
  • Criar outro Pool dentro de um worker se torna complexo, portanto você deve evitar aninhar instâncias de Pool tanto quanto possível.
  • Como exceções que ocorrem em processos filhos são difíceis de detectar a partir do processo principal, é necessário implementar explicitamente registros e tratamento de erros.
  • Defina o número de processos conforme o CPU e considere usar threads para tarefas limitadas por I/O.

Dicas práticas de design

Abaixo estão alguns conceitos e padrões úteis para projetar processamento paralelo.

  • É eficiente separar os processos em funções como leitura de entrada (I/O), pré-processamento (multi-CPU) e agregação (serial) por meio de 'pipelining'.
  • Para simplificar a depuração, primeiro verifique o funcionamento em processo único antes de paralelizar.
  • Para logging, separe as saídas de log por processo (por exemplo, incluindo o PID nos nomes dos arquivos) para facilitar o isolamento de problemas.
  • Prepare mecanismos de retry e timeout para que seja possível recuperar a aplicação com segurança caso um processo trave.

Resumo (Pontos-chave que você pode usar imediatamente)

O processamento paralelo é poderoso, mas é importante avaliar corretamente a natureza das tarefas, o tamanho dos dados e o custo de comunicação entre processos. O multiprocessing é eficaz para processamento limitado pela CPU, mas um design ruim ou erros de sincronização podem reduzir o desempenho. Se você seguir as regras e padrões básicos, poderá construir programas paralelos seguros e eficientes.

Você pode acompanhar o artigo acima usando o Visual Studio Code em nosso canal do YouTube. Por favor, confira também o canal do YouTube.

YouTube Video