Parallel Processing in Python#

  • KEYWORDS: multiprocessing, threading, concurrent.futures, joblib, parallel computing

Introduction#

Many computational tasks in science and engineering are embarrassingly parallel - they can be broken into independent pieces that run simultaneously. Examples include:

  • Parameter sweeps: solving an ODE with 1000 different rate constants

  • Monte Carlo simulations: running 10,000 random trials for uncertainty quantification

  • Batch processing: analyzing hundreds of experimental data files

  • Cross-validation: training models on different data subsets

Modern computers have multiple CPU cores, but Python code runs on a single core by default. This chapter shows you how to use all your cores effectively.

import numpy as np
import matplotlib.pyplot as plt
import time
import os

# How many CPU cores do we have?
print(f"Available CPU cores: {os.cpu_count()}")
Available CPU cores: 4

CPU-bound vs I/O-bound Tasks#

Before diving into parallel processing, we need to understand two types of tasks:

CPU-bound tasks spend most of their time doing computation:

  • Numerical integration

  • Matrix operations

  • Solving differential equations

  • Image processing algorithms

I/O-bound tasks spend most of their time waiting for input/output:

  • Reading/writing files

  • Network requests (downloading data, API calls)

  • Database queries

This distinction is critical because Python’s Global Interpreter Lock (GIL) affects how we parallelize each type.

The Global Interpreter Lock (GIL)#

Python has a mechanism called the Global Interpreter Lock (GIL) that allows only one thread to execute Python bytecode at a time. This was a design decision to simplify memory management in CPython.

Implications:

  • Threads do NOT speed up CPU-bound Python code (they actually run one at a time!)

  • Threads DO speed up I/O-bound code (while one thread waits for I/O, another can run)

  • Processes bypass the GIL entirely (each process has its own Python interpreter)

This is why we use:

  • threading for I/O-bound tasks (file operations, network requests)

  • multiprocessing for CPU-bound tasks (numerical computations)

Let’s demonstrate this with a simple example.

def cpu_intensive_task(n):
    """A CPU-bound task: compute sum of squares."""
    total = 0
    for i in range(n):
        total += i ** 2
    return total

# Time a single call
n = 1_000_000
start = time.perf_counter()
result = cpu_intensive_task(n)
elapsed = time.perf_counter() - start
print(f"Single call: {elapsed:.3f} seconds")
Single call: 0.059 seconds

Threading: For I/O-bound Tasks#

The threading module creates lightweight threads that share memory. They’re ideal for I/O-bound tasks where threads spend time waiting.

Example: Simulating I/O with sleep#

Let’s simulate reading multiple data files (using time.sleep to represent I/O wait time).

import threading

def simulate_file_read(file_id, results):
    """Simulate reading a file (I/O-bound task)."""
    time.sleep(0.5)  # Simulate I/O wait
    results[file_id] = f"Data from file {file_id}"

# Sequential approach
n_files = 5
results_seq = {}

start = time.perf_counter()
for i in range(n_files):
    simulate_file_read(i, results_seq)
sequential_time = time.perf_counter() - start
print(f"Sequential: {sequential_time:.2f} seconds")

# Threaded approach
results_threaded = {}
threads = []

start = time.perf_counter()
for i in range(n_files):
    t = threading.Thread(target=simulate_file_read, args=(i, results_threaded))
    threads.append(t)
    t.start()

# Wait for all threads to complete
for t in threads:
    t.join()
    
threaded_time = time.perf_counter() - start
print(f"Threaded: {threaded_time:.2f} seconds")
print(f"Speedup: {sequential_time / threaded_time:.1f}x")
Sequential: 2.50 seconds
Threaded: 0.50 seconds
Speedup: 5.0x

The threaded version is ~5x faster because all threads can wait for I/O simultaneously.

Why threads DON’T help with CPU-bound tasks#

Let’s try the same approach with our CPU-intensive function:

def cpu_task_wrapper(n, results, idx):
    results[idx] = cpu_intensive_task(n)

n = 500_000
n_tasks = 4

# Sequential
results_seq = {}
start = time.perf_counter()
for i in range(n_tasks):
    results_seq[i] = cpu_intensive_task(n)
sequential_time = time.perf_counter() - start
print(f"Sequential: {sequential_time:.3f} seconds")

# Threaded (won't help due to GIL!)
results_threaded = {}
threads = []

start = time.perf_counter()
for i in range(n_tasks):
    t = threading.Thread(target=cpu_task_wrapper, args=(n, results_threaded, i))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

threaded_time = time.perf_counter() - start
print(f"Threaded: {threaded_time:.3f} seconds")
print(f"Speedup: {sequential_time / threaded_time:.2f}x (no improvement due to GIL!)")
Sequential: 0.116 seconds
Threaded: 0.116 seconds
Speedup: 1.01x (no improvement due to GIL!)

Notice that threading provides no speedup (and may even be slower due to overhead). The GIL ensures only one thread runs Python code at a time.

Multiprocessing: For CPU-bound Tasks#

The multiprocessing module creates separate Python processes, each with its own GIL. This allows true parallel execution on multiple cores.

Example: Parameter Sweep for Reaction Kinetics#

Consider a first-order reaction \(A \rightarrow B\) with rate \(r = k C_A\). We want to study how the final conversion depends on the rate constant \(k\). This requires solving the ODE:

\[\frac{dC_A}{dt} = -k C_A\]

for many different values of \(k\).

from scipy.integrate import solve_ivp

def solve_reaction(k, C_A0=1.0, t_final=10.0):
    """Solve first-order reaction kinetics for a given rate constant k.
    
    Returns the final conversion X = (C_A0 - C_A) / C_A0
    """
    def ode(t, C):
        C_A = C[0]
        return [-k * C_A]
    
    sol = solve_ivp(ode, [0, t_final], [C_A0], dense_output=True)
    C_A_final = sol.y[0, -1]
    conversion = (C_A0 - C_A_final) / C_A0
    return k, conversion

# Test it
k_test, conv_test = solve_reaction(0.5)
print(f"k = {k_test}, conversion = {conv_test:.4f}")
k = 0.5, conversion = 0.9932
import multiprocessing as mp

# Generate 200 rate constants to test
k_values = np.linspace(0.01, 2.0, 200)

# Sequential approach
start = time.perf_counter()
results_seq = [solve_reaction(k) for k in k_values]
sequential_time = time.perf_counter() - start
print(f"Sequential: {sequential_time:.3f} seconds")

# Parallel approach using multiprocessing Pool
start = time.perf_counter()
with mp.Pool(processes=mp.cpu_count()) as pool:
    results_parallel = pool.map(solve_reaction, k_values)
parallel_time = time.perf_counter() - start
print(f"Parallel ({mp.cpu_count()} cores): {parallel_time:.3f} seconds")
print(f"Speedup: {sequential_time / parallel_time:.2f}x")
Sequential: 0.115 seconds
Parallel (4 cores): 0.125 seconds
Speedup: 0.92x
# Visualize the results
k_vals = [r[0] for r in results_parallel]
conversions = [r[1] for r in results_parallel]

plt.figure(figsize=(8, 5))
plt.plot(k_vals, conversions, 'b-', linewidth=2)
plt.xlabel('Rate constant k (1/s)', fontsize=12)
plt.ylabel('Conversion X', fontsize=12)
plt.title('First-order Reaction: Conversion vs Rate Constant', fontsize=14)
plt.grid(True, alpha=0.3)
plt.ylim(0, 1.05);
../_images/1dbdaa63879ac8aa960b9c29dc39d94ac1a12be37a7060f92e2208a44dfaca9a.png

Important Notes about Multiprocessing#

  1. Pickling: Data passed between processes must be picklable (serializable). Most numpy arrays and simple objects work fine, but lambda functions and some objects don’t.

  2. Overhead: Creating processes has overhead. For very fast tasks, the overhead may exceed the benefit.

  3. Memory: Each process has its own memory space. Large data is copied to each process.

  4. Top-level functions: Functions used with Pool.map() must be defined at the module level (not inside another function).

concurrent.futures: The Modern Interface#

The concurrent.futures module provides a clean, unified API for both threading and multiprocessing. It’s the recommended approach for most parallel tasks.

  • ThreadPoolExecutor: For I/O-bound tasks

  • ProcessPoolExecutor: For CPU-bound tasks

Both have the same interface, making it easy to switch between them.

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed

# Using ProcessPoolExecutor for our reaction kinetics
start = time.perf_counter()
with ProcessPoolExecutor(max_workers=mp.cpu_count()) as executor:
    # map() returns results in order
    results = list(executor.map(solve_reaction, k_values))
elapsed = time.perf_counter() - start
print(f"ProcessPoolExecutor: {elapsed:.3f} seconds")
ProcessPoolExecutor: 0.117 seconds

Example: Monte Carlo Uncertainty Quantification#

A powerful application of parallel processing is Monte Carlo simulation. Suppose we’re measuring the heat transfer coefficient \(h\) for a cooling fin, and we want to estimate the uncertainty in the steady-state temperature.

The fin equation gives us: $\(T(x) = T_\infty + (T_0 - T_\infty) \frac{\cosh(m(L-x))}{\cosh(mL)}\)$

where \(m = \sqrt{\frac{hP}{kA}}\).

If \(h\) has measurement uncertainty (normally distributed), what’s the distribution of the tip temperature \(T(L)\)?

def fin_tip_temperature(h, T0=100, T_inf=25, L=0.1, k=200, P=0.04, A=1e-4):
    """
    Calculate the tip temperature of a cooling fin.
    
    Parameters:
    -----------
    h : float
        Heat transfer coefficient (W/m^2-K)
    T0 : float
        Base temperature (°C)
    T_inf : float
        Ambient temperature (°C)
    L : float
        Fin length (m)
    k : float
        Thermal conductivity (W/m-K)
    P : float
        Perimeter (m)
    A : float
        Cross-sectional area (m^2)
    """
    m = np.sqrt(h * P / (k * A))
    T_tip = T_inf + (T0 - T_inf) / np.cosh(m * L)
    return T_tip

# Nominal value
h_nominal = 50  # W/m^2-K
T_nominal = fin_tip_temperature(h_nominal)
print(f"Nominal tip temperature: {T_nominal:.2f} °C")
Nominal tip temperature: 73.60 °C
# Monte Carlo simulation with uncertainty in h
np.random.seed(42)
n_samples = 10000
h_std = 5  # Standard deviation in h measurement

# Generate random samples of h
h_samples = np.random.normal(h_nominal, h_std, n_samples)
h_samples = h_samples[h_samples > 0]  # h must be positive

# Sequential Monte Carlo
start = time.perf_counter()
T_results_seq = [fin_tip_temperature(h) for h in h_samples]
sequential_time = time.perf_counter() - start
print(f"Sequential: {sequential_time:.3f} seconds")

# Parallel Monte Carlo
start = time.perf_counter()
with ProcessPoolExecutor() as executor:
    T_results_parallel = list(executor.map(fin_tip_temperature, h_samples))
parallel_time = time.perf_counter() - start
print(f"Parallel: {parallel_time:.3f} seconds")
Sequential: 0.008 seconds
Parallel: 1.947 seconds
# Analyze and visualize results
T_array = np.array(T_results_parallel)

fig, axes = plt.subplots(1, 2, figsize=(12, 4))

# Histogram of tip temperatures
axes[0].hist(T_array, bins=50, density=True, alpha=0.7, color='steelblue', edgecolor='black')
axes[0].axvline(T_nominal, color='red', linestyle='--', linewidth=2, label=f'Nominal = {T_nominal:.1f}°C')
axes[0].axvline(T_array.mean(), color='green', linestyle='-', linewidth=2, label=f'Mean = {T_array.mean():.1f}°C')
axes[0].set_xlabel('Tip Temperature (°C)', fontsize=12)
axes[0].set_ylabel('Probability Density', fontsize=12)
axes[0].set_title('Monte Carlo: Fin Tip Temperature Distribution', fontsize=12)
axes[0].legend()

# Scatter plot: h vs T_tip
axes[1].scatter(h_samples[:1000], T_array[:1000], alpha=0.5, s=10)
axes[1].set_xlabel('Heat Transfer Coefficient h (W/m²K)', fontsize=12)
axes[1].set_ylabel('Tip Temperature (°C)', fontsize=12)
axes[1].set_title('Sensitivity: T_tip vs h', fontsize=12)

plt.tight_layout()

print(f"\nMonte Carlo Results ({len(T_array)} samples):")
print(f"  Mean tip temperature: {T_array.mean():.2f} °C")
print(f"  Standard deviation: {T_array.std():.2f} °C")
print(f"  95% confidence interval: [{np.percentile(T_array, 2.5):.2f}, {np.percentile(T_array, 97.5):.2f}] °C")
Monte Carlo Results (10000 samples):
  Mean tip temperature: 73.66 °C
  Standard deviation: 1.86 °C
  95% confidence interval: [70.16, 77.48] °C
../_images/41a74ab06cec9212870f708eabcae56569c44257e74a1a0195d71599622e98a4.png

Using submit() and as_completed()#

Sometimes you want to process results as they become available, rather than waiting for all tasks to complete. The submit() method returns Future objects, and as_completed() yields them as they finish.

def slow_computation(x):
    """A computation that takes variable time."""
    sleep_time = np.random.uniform(0.1, 0.5)
    time.sleep(sleep_time)
    return x ** 2, sleep_time

# Process results as they complete
values = list(range(8))

with ProcessPoolExecutor(max_workers=4) as executor:
    # Submit all tasks
    future_to_value = {executor.submit(slow_computation, v): v for v in values}
    
    # Process as they complete (not in submission order)
    for future in as_completed(future_to_value):
        original_value = future_to_value[future]
        result, elapsed = future.result()
        print(f"  {original_value}² = {result} (took {elapsed:.2f}s)")
  2² = 4 (took 0.23s)
  0² = 0 (took 0.23s)
  1² = 1 (took 0.23s)
  3² = 9 (took 0.23s)
  4² = 16 (took 0.14s)
  5² = 25 (took 0.14s)
  6² = 36 (took 0.14s)
  7² = 49 (took 0.14s)

Error Handling in Parallel Code#

When running many parallel tasks, some may fail. It’s important to handle errors gracefully.

def risky_computation(x):
    """A computation that might fail."""
    if x == 5:
        raise ValueError(f"Cannot process x={x}")
    return x ** 2

values = list(range(10))
results = []
errors = []

with ProcessPoolExecutor(max_workers=4) as executor:
    future_to_value = {executor.submit(risky_computation, v): v for v in values}
    
    for future in as_completed(future_to_value):
        value = future_to_value[future]
        try:
            result = future.result()
            results.append((value, result))
        except Exception as e:
            errors.append((value, str(e)))

print(f"Successful: {len(results)} tasks")
print(f"Failed: {len(errors)} tasks")
if errors:
    print(f"Errors: {errors}")
Successful: 9 tasks
Failed: 1 tasks
Errors: [(5, 'Cannot process x=5')]

Joblib: Parallel Computing Made Easy#

Joblib is a popular library in the scientific Python ecosystem. It’s used internally by scikit-learn and provides a simple interface for parallel computing.

Key features:

  • Simple Parallel and delayed syntax

  • Automatic backend selection (processes or threads)

  • Built-in progress reporting

  • Memory mapping for large numpy arrays

  • Result caching with Memory

from joblib import Parallel, delayed

# Basic syntax: Parallel(n_jobs=...)(delayed(func)(args) for args in iterable)

# Sequential
start = time.perf_counter()
results_seq = [solve_reaction(k) for k in k_values]
print(f"Sequential: {time.perf_counter() - start:.3f}s")

# Parallel with joblib
start = time.perf_counter()
results_joblib = Parallel(n_jobs=-1)(  # n_jobs=-1 uses all cores
    delayed(solve_reaction)(k) for k in k_values
)
print(f"Joblib parallel: {time.perf_counter() - start:.3f}s")
Sequential: 0.117s
Joblib parallel: 0.801s

Example: Batch Processing Experimental Data#

Suppose we have experimental data from multiple runs of a reaction, and we want to fit a kinetic model to each dataset. This is a perfect use case for parallel processing.

from scipy.optimize import curve_fit

def first_order_model(t, k, C0):
    """First-order decay: C(t) = C0 * exp(-k*t)"""
    return C0 * np.exp(-k * t)

def generate_synthetic_data(true_k, true_C0, noise_level=0.05):
    """Generate synthetic experimental data with noise."""
    t = np.linspace(0, 10, 20)
    C_true = first_order_model(t, true_k, true_C0)
    C_noisy = C_true + np.random.normal(0, noise_level * true_C0, len(t))
    C_noisy = np.maximum(C_noisy, 0)  # Concentration can't be negative
    return t, C_noisy, true_k, true_C0

def fit_kinetic_model(data):
    """Fit first-order model to experimental data."""
    t, C, true_k, true_C0 = data
    try:
        popt, pcov = curve_fit(first_order_model, t, C, p0=[0.5, 1.0], bounds=(0, [10, 10]))
        k_fit, C0_fit = popt
        k_err = np.sqrt(pcov[0, 0])
        return {
            'true_k': true_k, 
            'fitted_k': k_fit, 
            'k_error': k_err,
            'true_C0': true_C0,
            'fitted_C0': C0_fit
        }
    except Exception as e:
        return {'error': str(e)}

# Generate 100 synthetic experiments with different true parameters
np.random.seed(123)
n_experiments = 100
true_k_values = np.random.uniform(0.1, 2.0, n_experiments)
true_C0_values = np.random.uniform(0.5, 2.0, n_experiments)

datasets = [
    generate_synthetic_data(k, C0) 
    for k, C0 in zip(true_k_values, true_C0_values)
]
# Fit all experiments in parallel with progress reporting
start = time.perf_counter()
fit_results = Parallel(n_jobs=-1, verbose=1)(
    delayed(fit_kinetic_model)(data) for data in datasets
)
elapsed = time.perf_counter() - start
print(f"\nFitted {n_experiments} experiments in {elapsed:.2f} seconds")
Fitted 100 experiments in 0.14 seconds
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 4 concurrent workers.
[Parallel(n_jobs=-1)]: Done 100 out of 100 | elapsed:    0.1s finished
# Analyze the fitting results
successful_fits = [r for r in fit_results if 'error' not in r]

true_k = np.array([r['true_k'] for r in successful_fits])
fitted_k = np.array([r['fitted_k'] for r in successful_fits])

fig, axes = plt.subplots(1, 2, figsize=(12, 5))

# Parity plot
axes[0].scatter(true_k, fitted_k, alpha=0.6, edgecolor='black', linewidth=0.5)
axes[0].plot([0, 2], [0, 2], 'r--', linewidth=2, label='Perfect fit')
axes[0].set_xlabel('True k (1/s)', fontsize=12)
axes[0].set_ylabel('Fitted k (1/s)', fontsize=12)
axes[0].set_title('Parity Plot: True vs Fitted Rate Constants', fontsize=12)
axes[0].legend()
axes[0].set_aspect('equal')

# Error distribution
relative_error = (fitted_k - true_k) / true_k * 100
axes[1].hist(relative_error, bins=30, alpha=0.7, color='steelblue', edgecolor='black')
axes[1].axvline(0, color='red', linestyle='--', linewidth=2)
axes[1].set_xlabel('Relative Error (%)', fontsize=12)
axes[1].set_ylabel('Count', fontsize=12)
axes[1].set_title(f'Fitting Error Distribution (n={len(successful_fits)})', fontsize=12)

plt.tight_layout()

print(f"\nFitting Statistics:")
print(f"  Mean absolute error: {np.mean(np.abs(fitted_k - true_k)):.4f} 1/s")
print(f"  Mean relative error: {np.mean(relative_error):.2f}%")
print(f"  Std of relative error: {np.std(relative_error):.2f}%")
Fitting Statistics:
  Mean absolute error: 0.0799 1/s
  Mean relative error: -0.47%
  Std of relative error: 8.89%
../_images/c1b41406aa8714237da0587294fa3612ac199295777aa36785e767f2092e28c7.png

Joblib Backends and Threading#

Joblib can use different backends:

  • loky (default): Process-based, robust to crashes

  • multiprocessing: Standard multiprocessing

  • threading: Thread-based (for I/O-bound or numpy-heavy code)

# Using threading backend (useful when functions release the GIL, like numpy)
def numpy_heavy_computation(size):
    """Numpy operations release the GIL, so threading can help."""
    A = np.random.randn(size, size)
    return np.linalg.svd(A, compute_uv=False).sum()

sizes = [200] * 20

# Process-based (default)
start = time.perf_counter()
results = Parallel(n_jobs=-1, backend='loky')(
    delayed(numpy_heavy_computation)(s) for s in sizes
)
print(f"Loky (processes): {time.perf_counter() - start:.3f}s")

# Thread-based 
start = time.perf_counter()
results = Parallel(n_jobs=-1, backend='threading')(
    delayed(numpy_heavy_computation)(s) for s in sizes
)
print(f"Threading: {time.perf_counter() - start:.3f}s")
Loky (processes): 0.044s
Threading: 0.145s

Caching Results with joblib.Memory#

When you run expensive computations repeatedly with the same inputs, caching can save time. Joblib’s Memory class provides transparent disk caching.

from joblib import Memory
import tempfile

# Create a cache directory
cachedir = tempfile.mkdtemp()
memory = Memory(cachedir, verbose=0)

@memory.cache
def expensive_simulation(param):
    """Simulate an expensive computation."""
    time.sleep(0.5)  # Pretend this takes a while
    return param ** 2 + np.random.randn() * 0.01

# First run: computes and caches
start = time.perf_counter()
result1 = expensive_simulation(42)
print(f"First call: {time.perf_counter() - start:.3f}s, result = {result1:.4f}")

# Second run: retrieves from cache
start = time.perf_counter()
result2 = expensive_simulation(42)
print(f"Cached call: {time.perf_counter() - start:.3f}s, result = {result2:.4f}")

# Clean up
memory.clear(warn=False)
import shutil
shutil.rmtree(cachedir)
First call: 0.502s, result = 1764.0226
Cached call: 0.001s, result = 1764.0226

Scaling Up: When to Reach for Bigger Tools#

The tools we’ve covered (threading, multiprocessing, concurrent.futures, joblib) work well for parallelizing tasks on a single machine. But sometimes you need more:

Signs You’ve Outgrown Single-Machine Parallelism#

  1. Data doesn’t fit in memory: Your dataset is larger than your RAM

  2. Need more cores: Even with all cores busy, computation takes too long

  3. Complex task graphs: Tasks have dependencies that simple map() can’t express

  4. Distributed computing: You want to use multiple machines

Dask: Scalable Parallel Computing#

Dask is a flexible parallel computing library that scales from laptops to clusters. It provides:

  • Dask Arrays: Parallel numpy arrays that can be larger than memory

  • Dask DataFrames: Parallel pandas DataFrames

  • Dask Delayed: Parallelize custom code with task graphs

  • Distributed scheduler: Scale to clusters

Dask is designed to feel familiar if you know numpy and pandas.

# Example: Dask delayed for custom parallel code
# (Only run if dask is installed)
try:
    from dask import delayed, compute
    import dask
    
    @delayed
    def delayed_solve_reaction(k):
        return solve_reaction(k)
    
    # Build a task graph (no computation yet)
    tasks = [delayed_solve_reaction(k) for k in k_values[:50]]
    
    # Execute in parallel
    start = time.perf_counter()
    results = compute(*tasks)
    print(f"Dask parallel: {time.perf_counter() - start:.3f}s")
    print(f"Computed {len(results)} results")
    
except ImportError:
    print("Dask not installed. Install with: pip install dask")
    print("Dask is useful when you need to:")
    print("  - Process data larger than memory")
    print("  - Scale to multiple machines")
    print("  - Build complex task dependency graphs")
Dask not installed. Install with: pip install dask
Dask is useful when you need to:
  - Process data larger than memory
  - Scale to multiple machines
  - Build complex task dependency graphs

When to Use Each Tool#

Situation

Recommended Tool

I/O-bound tasks (file/network)

threading or ThreadPoolExecutor

CPU-bound, simple parallelism

ProcessPoolExecutor or joblib

Scientific computing, sklearn

joblib

Need progress bars, easy syntax

joblib with verbose

Data larger than RAM

dask

Multiple machines / cluster

dask.distributed or ray

Heavy numerical arrays

Consider numba for JIT compilation

Other Tools Worth Knowing#

  • Ray: Distributed computing framework, especially for ML workloads

  • Numba: JIT compiler that can parallelize numerical loops

  • mpi4py: MPI for Python, for HPC clusters

  • PySpark: For big data processing

Practical Guidelines#

1. Measure Before Optimizing#

Always profile your code first. Parallelization has overhead, and the speedup depends on:

  • Task duration (longer tasks benefit more)

  • Number of tasks

  • Data transfer costs

def benchmark_parallel(func, args_list, n_jobs_list):
    """Benchmark parallel execution with different numbers of workers."""
    results = {}
    
    # Sequential baseline
    start = time.perf_counter()
    _ = [func(arg) for arg in args_list]
    results[1] = time.perf_counter() - start
    
    for n_jobs in n_jobs_list:
        start = time.perf_counter()
        _ = Parallel(n_jobs=n_jobs)(delayed(func)(arg) for arg in args_list)
        results[n_jobs] = time.perf_counter() - start
    
    return results

# Benchmark with our reaction solver
k_test = np.linspace(0.1, 2.0, 100)
n_jobs_options = [2, 4, -1]  # -1 means all cores

times = benchmark_parallel(solve_reaction, k_test, n_jobs_options)

print("Execution times:")
for n_jobs, t in times.items():
    speedup = times[1] / t
    label = "sequential" if n_jobs == 1 else f"{n_jobs} workers" if n_jobs > 0 else f"{mp.cpu_count()} workers (all)"
    print(f"  {label}: {t:.3f}s (speedup: {speedup:.2f}x)")
Execution times:
  sequential: 0.061s (speedup: 1.00x)
  2 workers: 0.608s (speedup: 0.10x)
  4 workers: 0.730s (speedup: 0.08x)
  4 workers (all): 0.053s (speedup: 1.16x)

2. Chunk Your Work Appropriately#

If you have many tiny tasks, the overhead of dispatching each one dominates. Group them into larger chunks.

def process_chunk(k_chunk):
    """Process a chunk of k values."""
    return [solve_reaction(k) for k in k_chunk]

# Split into chunks
k_values_large = np.linspace(0.1, 2.0, 400)
chunk_size = 50
chunks = [k_values_large[i:i+chunk_size] for i in range(0, len(k_values_large), chunk_size)]

# Process chunks in parallel
start = time.perf_counter()
chunk_results = Parallel(n_jobs=-1)(delayed(process_chunk)(chunk) for chunk in chunks)
# Flatten results
all_results = [item for sublist in chunk_results for item in sublist]
print(f"Chunked parallel: {time.perf_counter() - start:.3f}s for {len(all_results)} tasks")
Chunked parallel: 0.145s for 400 tasks

3. Be Careful with Shared State#

Parallel processes don’t share memory by default. Avoid patterns that require synchronization:

  • Don’t modify global variables from parallel tasks

  • Return results instead of writing to shared data structures

  • Use Manager objects if you really need shared state (but prefer avoiding it)

4. Handle Random Seeds Carefully#

When running Monte Carlo simulations in parallel, ensure each worker has a different random seed to avoid correlated results.

def monte_carlo_with_seed(seed):
    """Monte Carlo simulation with explicit seed."""
    rng = np.random.RandomState(seed)
    samples = rng.normal(0, 1, 1000)
    return np.mean(samples)

# Generate unique seeds for each worker
base_seed = 42
seeds = [base_seed + i for i in range(100)]

results = Parallel(n_jobs=-1)(delayed(monte_carlo_with_seed)(s) for s in seeds)
print(f"Monte Carlo results: mean = {np.mean(results):.4f}, std = {np.std(results):.4f}")
Monte Carlo results: mean = 0.0046, std = 0.0338

Summary#

This chapter covered the main approaches to parallel processing in Python:

  1. Threading (threading, ThreadPoolExecutor): Best for I/O-bound tasks. The GIL prevents speedup for CPU-bound work.

  2. Multiprocessing (multiprocessing, ProcessPoolExecutor): Best for CPU-bound tasks. Each process has its own Python interpreter and GIL.

  3. concurrent.futures: Modern, clean API that unifies threading and multiprocessing with the same interface.

  4. Joblib: The go-to tool for scientific Python. Simple syntax, automatic backend selection, and useful features like caching.

  5. Dask and beyond: When you outgrow single-machine parallelism, tools like Dask, Ray, and Spark can scale to larger data and distributed clusters.

Key takeaways:

  • Understand the GIL: threads for I/O, processes for CPU

  • Use concurrent.futures or joblib for most tasks

  • Measure before parallelizing—overhead matters

  • Keep tasks independent when possible

  • Handle random seeds explicitly in Monte Carlo simulations