MPI Programming

🧪 Lab 4 — Introduction to MPI (Message Passing Interface)

Duration: ~1 hour
Languages: C (MPI), Python (mpi4py)
Environment: Linux / WSL / macOS (Windows via WSL is recommended)


🎯 Learning Objectives

  • Understand ranks, communicators, and SPMD (same program, multiple data).
  • Use point-to-point (send/recv) and collectives (broadcast, scatter/gather, reduce).
  • Measure time with MPI_Wtime() / MPI.Wtime() and reason about scaling.

⚙️ Setup

C (Open MPI or MPICH)

  • On Ubuntu/WSL:
    sudo apt-get update
    sudo apt-get install -y build-essential openmpi-bin libopenmpi-dev
    
  • On macOS (Homebrew):
    brew install open-mpi
    

Python (mpi4py)

python -m venv .venv
# Windows PowerShell:  .venv\Scripts\activate
# Linux/macOS/WSL:     source .venv/bin/activate
pip install mpi4py

Run all programs with mpiexec -n <NUM_PROCS> .... On some systems the launcher is mpirun.


🧩 Part 0 — Hello, MPI (Ranks & Communicator)

API Primer (used below)

  • C

    • MPI_Init(&argc, &argv): initialize the MPI runtime.
    • MPI_Comm_size(MPI_COMM_WORLD, &size): number of processes in the global communicator.
    • MPI_Comm_rank(MPI_COMM_WORLD, &rank): this process’s rank (0..size-1).
    • MPI_Get_processor_name(buf, &len): name of the node.
    • MPI_Finalize(): clean shutdown.
  • Python (mpi4py)

    • comm = MPI.COMM_WORLD (communicator)
    • comm.Get_size(), comm.Get_rank()
    • MPI.Get_processor_name()

C — mpi_hello.c

#include <mpi.h>
#include <stdio.h>

int main(int argc, char** argv) {
    // Initialize MPI environment
    MPI_Init(&argc, &argv);

    int world_size = 0, world_rank = -1, name_len = 0;
    char processor_name[MPI_MAX_PROCESSOR_NAME];

    // Total processes and my rank
    MPI_Comm_size(MPI_COMM_WORLD, &world_size);
    MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
    MPI_Get_processor_name(processor_name, &name_len);

    // Print a message from each process (order may vary)
    printf("Hello from rank %d of %d on %s\n", world_rank, world_size, processor_name);

    // Finalize
    MPI_Finalize();
    return 0;
}

Build & Run

mpicc mpi_hello.c -o mpi_hello
mpiexec -n 4 ./mpi_hello

Sample Output (4 ranks, order may vary)

Hello from rank 0 of 4 on nodeA
Hello from rank 2 of 4 on nodeA
Hello from rank 1 of 4 on nodeA
Hello from rank 3 of 4 on nodeA

Python — mpi_hello.py

from mpi4py import MPI

# World communicator and process identity
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
name = MPI.Get_processor_name()

print(f"Hello from rank {rank} of {size} on {name}")

Run

mpiexec -n 4 python mpi_hello.py

Sample Output (4 ranks, order may vary)

Hello from rank 3 of 4 on nodeA
Hello from rank 1 of 4 on nodeA
Hello from rank 2 of 4 on nodeA
Hello from rank 0 of 4 on nodeA

🧩 Part 1 — Collective Computation: Parallel Vector Sum

Idea: Root creates a vector, scatters equal chunks; each process sums its chunk; root reduces partial sums with a sum operation.

API Primer

  • C Collectives

    • MPI_Scatter(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm)
      Distribute equal-size blocks from root to all ranks.
    • MPI_Reduce(sendbuf, recvbuf, count, datatype, op, root, comm)
      Combine values from all ranks using an operation (e.g., MPI_SUM).
    • MPI_Wtime(): wall-clock timer (seconds).
  • Python (mpi4py)

    • comm.Scatter([A, MPI.DOUBLE], [local, MPI.DOUBLE], root=0)
    • comm.reduce(local_sum, op=MPI.SUM, root=0)
    • MPI.Wtime()

C — mpi_vecsum.c

#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>

int main(int argc, char** argv) {
    MPI_Init(&argc, &argv);
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    // Total elements (argument or default)
    const int N = (argc > 1) ? atoi(argv[1]) : 1000000;
    int chunk = N / size;  // assume divisible for brevity

    // Root allocates and initializes the full array
    double *A = NULL;
    if (rank == 0) {
        A = (double*)malloc(N * sizeof(double));
        for (int i = 0; i < N; i++) A[i] = 1.0;  // sum should be N
    }

    // Each process allocates space for its chunk
    double *local = (double*)malloc(chunk * sizeof(double));

    double t0 = MPI_Wtime();

    // Scatter equal-size chunks from root to all processes
    MPI_Scatter(A, chunk, MPI_DOUBLE, local, chunk, MPI_DOUBLE, 0, MPI_COMM_WORLD);

    // Compute local partial sum
    double local_sum = 0.0;
    for (int i = 0; i < chunk; i++) local_sum += local[i];

    // Reduce all partial sums to root (sum)
    double global_sum = 0.0;
    MPI_Reduce(&local_sum, &global_sum, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);

    double t1 = MPI_Wtime();

    if (rank == 0) {
        printf("Vector sum = %.0f (expected %d), time = %.6f s, procs = %d\n",
               global_sum, N, t1 - t0, size);
        free(A);
    }
    free(local);
    MPI_Finalize();
    return 0;
}

Build & Run

mpicc mpi_vecsum.c -o mpi_vecsum
mpiexec -n 4 ./mpi_vecsum 2000000

Sample Output (4 ranks)

Vector sum = 2000000 (expected 2000000), time = 0.012345 s, procs = 4

Python — mpi_vecsum.py

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

N = 2_000_000
chunk = N // size

# Only root allocates full array
A = np.ones(N, dtype=np.float64) if rank == 0 else None
local = np.empty(chunk, dtype=np.float64)

t0 = MPI.Wtime()
# Scatter the data
comm.Scatter([A, MPI.DOUBLE], [local, MPI.DOUBLE], root=0)

# Local computation
local_sum = local.sum()

# Reduce to root
global_sum = comm.reduce(local_sum, op=MPI.SUM, root=0)
t1 = MPI.Wtime()

if rank == 0:
    print(f"Vector sum = {global_sum:.0f} (expected {N}), time = {t1 - t0:.6f} s, procs = {size}")

Run

mpiexec -n 4 python mpi_vecsum.py

Sample Output (4 ranks)

Vector sum = 2000000 (expected 2000000), time = 0.010321 s, procs = 4

🧩 Part 2 — Improved Ring Communication (Circular Token Passing)

Idea: Rank 0 starts with a token that travels through all ranks in order, forming a full ring. Each process receives it, increments it, and sends it to the next rank. Rank 0 receives it back at the end.

API Primer

  • MPI_Send: Send data to a target process.
  • MPI_Recv: Receive data from a source process.

C — mpi_ring.c

#include <mpi.h>
#include <stdio.h>

int main(int argc, char** argv) {
    MPI_Init(&argc, &argv);

    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    int token;

    if (rank == 0) {
        token = 42;
        printf("Rank 0 starting token = %d\n", token);
        MPI_Send(&token, 1, MPI_INT, 1, 0, MPI_COMM_WORLD);
        MPI_Recv(&token, 1, MPI_INT, size - 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
        printf("Rank 0 received token back = %d\n", token);
    } else {
        MPI_Recv(&token, 1, MPI_INT, rank - 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
        printf("Rank %d received token %d from rank %d\n", rank, token, rank - 1);
        token += 1;
        MPI_Send(&token, 1, MPI_INT, (rank + 1) % size, 0, MPI_COMM_WORLD);
    }

    MPI_Finalize();
    return 0;
}

Run

mpicc mpi_ring.c -o mpi_ring
mpiexec -n 4 ./mpi_ring

Sample Output

Rank 0 starting token = 42
Rank 1 received token 42 from rank 0
Rank 2 received token 43 from rank 1
Rank 3 received token 44 from rank 2
Rank 0 received token back = 45

Python — mpi_ring.py

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if rank == 0:
    token = 42
    print(f"Rank 0 starting token = {token}")
    comm.send(token, dest=1, tag=0)
    token = comm.recv(source=size - 1, tag=0)
    print(f"Rank 0 received token back = {token}")
else:
    token = comm.recv(source=rank - 1, tag=0)
    print(f"Rank {rank} received token {token} from rank {rank - 1}")
    token += 1
    comm.send(token, dest=(rank + 1) % size, tag=0)

Run

mpiexec -n 4 python mpi_ring.py

Sample Output

Rank 0 starting token = 42
Rank 1 received token 42 from rank 0
Rank 2 received token 43 from rank 1
Rank 3 received token 44 from rank 2
Rank 0 received token back = 45

🧩 Part 3 — Master–Worker Task Distribution

Idea: Rank 0 (master) sends work items x to ranks 1..P-1 (workers). Workers compute f(x)=x*x and send results back. Master continues issuing tasks until all are done.

API Primer

  • C

    • MPI_Send(buf, count, datatype, dest, tag, comm)
    • MPI_Recv(buf, count, datatype, source, tag, comm, &status)
    • Tags allow you to differentiate message types (e.g., TASK vs STOP).
  • Python (mpi4py)

    • comm.send(obj, dest=..., tag=...) and comm.recv(source=..., tag=..., status=...)
    • Use a Status() object to inspect status.tag and status.source.

C — mpi_master_worker.c

#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>

enum { TAG_TASK = 1, TAG_STOP = 2 };

static int f(int x) { return x * x; }

int main(int argc, char** argv) {
    MPI_Init(&argc, &argv);
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    if (size < 2) {
        if (rank == 0) fprintf(stderr, "Need at least 2 ranks\n");
        MPI_Finalize();
        return 1;
    }

    const int N = (argc > 1) ? atoi(argv[1]) : 50;

    if (rank == 0) {
        // --- Master ---
        int next_task = 0;
        int active = size - 1; // number of workers still running

        // Prime workers with one task each (or STOP if no work left)
        for (int w = 1; w < size; w++) {
            if (next_task < N) {
                MPI_Send(&next_task, 1, MPI_INT, w, TAG_TASK, MPI_COMM_WORLD);
                next_task++;
            } else {
                int dummy = -1;
                MPI_Send(&dummy, 1, MPI_INT, w, TAG_STOP, MPI_COMM_WORLD);
                active--;
            }
        }

        // Receive results and keep feeding tasks
        while (active > 0) {
            MPI_Status st;
            int result[2]; // {x, f(x)}
            MPI_Recv(result, 2, MPI_INT, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &st);
            printf("Master got: x=%d f=%d from worker %d\n", result[0], result[1], st.MPI_SOURCE);

            if (next_task < N) {
                MPI_Send(&next_task, 1, MPI_INT, st.MPI_SOURCE, TAG_TASK, MPI_COMM_WORLD);
                next_task++;
            } else {
                int dummy = -1;
                MPI_Send(&dummy, 1, MPI_INT, st.MPI_SOURCE, TAG_STOP, MPI_COMM_WORLD);
                active--;
            }
        }
    } else {
        // --- Worker ---
        while (1) {
            MPI_Status st;
            int x;
            MPI_Recv(&x, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &st);
            if (st.MPI_TAG == TAG_STOP) break;
            int y = f(x);
            int pair[2] = { x, y };
            MPI_Send(pair, 2, MPI_INT, 0, 0, MPI_COMM_WORLD);
        }
    }

    MPI_Finalize();
    return 0;
}

Build & Run

mpicc mpi_master_worker.c -o mpi_master_worker
mpiexec -n 4 ./mpi_master_worker 8

Sample Output (4 ranks)

Master got: x=0 f=0 from worker 1
Master got: x=1 f=1 from worker 2
Master got: x=2 f=4 from worker 3
Master got: x=3 f=9 from worker 1
Master got: x=4 f=16 from worker 2
Master got: x=5 f=25 from worker 3
Master got: x=6 f=36 from worker 1
Master got: x=7 f=49 from worker 2

Python — mpi_master_worker.py

from mpi4py import MPI

TAG_TASK = 1
TAG_STOP = 2

def f(x: int) -> int:
    return x * x

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if size < 2:
    if rank == 0:
        print("Need at least 2 ranks")
    raise SystemExit(1)

N = 8  # total tasks

if rank == 0:
    # --- Master ---
    next_task = 0
    active = size - 1

    # Send initial tasks
    for w in range(1, size):
        if next_task < N:
            comm.send(next_task, dest=w, tag=TAG_TASK)
            next_task += 1
        else:
            comm.send(-1, dest=w, tag=TAG_STOP)
            active -= 1

    # Receive results and feed more tasks
    while active > 0:
        status = MPI.Status()
        x_y = comm.recv(source=MPI.ANY_SOURCE, tag=0, status=status)
        src = status.source
        x, y = x_y
        print(f"Master got: x={x} f={y} from worker {src}")
        if next_task < N:
            comm.send(next_task, dest=src, tag=TAG_TASK)
            next_task += 1
        else:
            comm.send(-1, dest=src, tag=TAG_STOP)
            active -= 1

else:
    # --- Worker ---
    while True:
        status = MPI.Status()
        x = comm.recv(source=0, tag=MPI.ANY_TAG, status=status)
        if status.tag == TAG_STOP:
            break
        y = f(x)
        comm.send((x, y), dest=0, tag=0)

Run

mpiexec -n 4 python mpi_master_worker.py

Sample Output (4 ranks)

Master got: x=0 f=0 from worker 1
Master got: x=1 f=1 from worker 2
Master got: x=2 f=4 from worker 3
Master got: x=3 f=9 from worker 1
Master got: x=4 f=16 from worker 2
Master got: x=5 f=25 from worker 3
Master got: x=6 f=36 from worker 1
Master got: x=7 f=49 from worker 2

🧠 Wrap-Up Discussion

  • SPMD model: every process runs the same program but with different rank behavior.
  • Collectives vs Point-to-Point: collectives simplify global ops, point-to-point gives control.
  • Load balancing: master–worker pattern handles uneven workloads efficiently.
  • Common bugs: mismatched sends/receives or missing MPI_Finalize() cause hangs.

✅ Key Takeaways

  • MPI scales programs across processes, not threads.
  • MPI_Comm_rank and MPI_Comm_size define your role in the communicator.
  • MPI_Scatter, MPI_Gather, and MPI_Reduce simplify distributed computation.
  • Python’s mpi4py provides nearly one-to-one API mappings with C MPI.
  • Always match send/recv datatypes and message tags to avoid deadlocks.
Previous