CoAP APIs

🛰️ Lab 7 – CoAP with Lamport & Vector Clocks

Duration: ~1 hour
Environment: Windows, Linux, or macOS

This lab introduces two fundamental concepts in distributed systems—Lamport clocks and Vector clocks—through a simple CoAP server and client written in Python.

You will build:

  • A tiny CoAP server exposing endpoints /lamport and /vector
  • A tiny CoAP client that sends updates using Lamport and vector timestamps
  • Console output that shows ordering, acceptance, and ignored updates

Everything is intentionally minimal to support teaching and experimentation.


🎯 Learning Goals

By the end of this lab, you will:

  • Understand how CoAP (Constrained Application Protocol) handles simple PUT requests
  • Implement a Lamport logical clock between client and server
  • Implement a 2-node vector clock
  • Observe how distributed ordering decisions are made in practice

🧩 Required Software

Make sure you have installed:

  • Python 3.8+
  • aiocoap library

Install using:

python -m venv venv
source venv/bin/activate        # Windows: venv\Scripts\activate
pip install aiocoap

🛠 Step 1 — Create the CoAP Server

Create a file named server.py.

This server implements two resources:

  • /lamport — demonstrates Lamport clocks\
  • /vector — demonstrates vector-clock ordering (2 nodes only)

📡 server.py

#!/usr/bin/env python3
import asyncio
import json

import aiocoap.resource as resource
import aiocoap


# ---------- Lamport clock resource ----------

class LamportResource(resource.Resource):
    def __init__(self):
        super().__init__()
        self.server_L = 0   # server's Lamport clock

    async def render_put(self, request):
        try:
            data = json.loads(request.payload.decode("utf-8"))
        except Exception:
            return aiocoap.Message(code=aiocoap.BAD_REQUEST,
                                   payload=b"Invalid JSON")

        node = data["node"]
        node_L = int(data["lamport"])
        value = data["value"]

        # Lamport rule: L_s = max(L_s, L_recv) + 1
        old = self.server_L
        self.server_L = max(self.server_L, node_L) + 1

        print(f"[LAMPORT] From {node}: value={value}, node_L={node_L}, "
              f"server_L {old} -> {self.server_L}")

        resp = {"server_lamport": self.server_L}
        return aiocoap.Message(code=aiocoap.CHANGED,
                               payload=json.dumps(resp).encode("utf-8"))


# ---------- Vector clock resource (2 nodes: A=0, B=1) ----------

class VectorResource(resource.Resource):
    def __init__(self):
        super().__init__()
        self.current_clock = [0, 0]
        self.current_value = None

    async def render_put(self, request):
        try:
            data = json.loads(request.payload.decode("utf-8"))
        except Exception:
            return aiocoap.Message(code=aiocoap.BAD_REQUEST,
                                   payload=b"Invalid JSON")

        node = data["node"]
        clock = list(map(int, data["clock"]))
        value = data["value"]

        # Accept if message clock >= current clock (component-wise)
        if all(m >= c for m, c in zip(clock, self.current_clock)):
            status = "accepted"
            old_clock = self.current_clock
            old_value = self.current_value
            self.current_clock = clock
            self.current_value = value
            print(f"[VECTOR] From {node}: value {old_value} -> {value}, "
                  f"clock {old_clock} -> {clock}")
        else:
            status = "older-ignored"
            print(f"[VECTOR] From {node}: IGNORE value={value}, "
                  f"clock={clock}, current_clock={self.current_clock}")

        resp = {
            "status": status,
            "server_clock": self.current_clock,
            "server_value": self.current_value,
        }
        return aiocoap.Message(code=aiocoap.CHANGED,
                               payload=json.dumps(resp).encode("utf-8"))


# ---------- Server main ----------

async def main():
    root = resource.Site()
    root.add_resource(["lamport"], LamportResource())
    root.add_resource(["vector"], VectorResource())

    print("CoAP server listening on coap://localhost:5683")
    await aiocoap.Context.create_server_context(root, bind=("localhost", 5683))

    await asyncio.get_running_loop().create_future()


if __name__ == "__main__":
    asyncio.run(main())

🧪 Step 2 — Create the CoAP Client

Create a file named client.py.

This client:

  • Uses Lamport clocks to update /lamport
  • Uses vector clocks to update /vector
  • Shows accepted and rejected updates

📡 client.py

#!/usr/bin/env python3
import asyncio
import json

from aiocoap import Context, Message, PUT


# ---------- Lamport demo (single node A) ----------

async def lamport_demo():
    ctx = await Context.create_client_context()
    node = "A"
    L = 0

    async def send(value):
        nonlocal L
        L += 1
        payload = {"node": node, "lamport": L, "value": value}
        print(f"\n[LAMPORT CLIENT] Sending: {payload}")
        req = Message(code=PUT,
                      uri="coap://localhost/lamport",
                      payload=json.dumps(payload).encode("utf-8"))
        resp = await ctx.request(req).response
        print("[LAMPORT CLIENT] Response:", resp.payload.decode("utf-8"))

    await send(10)
    await send(20)
    await send(15)


# ---------- Vector demo (2 nodes: A=0, B=1) ----------

async def vector_demo():
    ctx = await Context.create_client_context()

    V_A = [0, 0]
    V_B = [0, 0]

    async def send_from(node_name, clock, value):
        payload = {"node": node_name, "clock": clock, "value": value}
        print(f"\n[VECTOR CLIENT] From {node_name} sending: {payload}")
        req = Message(code=PUT,
                      uri="coap://localhost/vector",
                      payload=json.dumps(payload).encode("utf-8"))
        resp = await ctx.request(req).response
        print("[VECTOR CLIENT] Response:", resp.payload.decode("utf-8"))

    # A sends: [1,0]
    V_A[0] += 1
    await send_from("A", V_A.copy(), 10)

    # B sends after seeing A: [1,1]
    V_B[1] += 1
    V_B[0] = V_A[0]
    await send_from("B", V_B.copy(), 20)

    # A sends an old clock: [1,0] again
    await send_from("A (old)", [1, 0], 999)


# ---------- Main ----------

async def main():
    print("=== Lamport example ===")
    await lamport_demo()

    print("\n=== Vector clock example ===")
    await vector_demo()

if __name__ == "__main__":
    asyncio.run(main())

▶️ Step 3 — Run the Lab

Start the server (Terminal 1)

python server.py

Run the client (Terminal 2)

python client.py

You will observe clock updates, accepted messages, and ignored outdated events.

PART 2 CoAp and Consistency Models

In this lab, you will explore how different consistency models can be illustrated using CoAP (Constrained Application Protocol) and simple Python code.

We will use a single CoAP server exposing four endpoints, each approximating a different consistency model:

  • /eventual → Eventual Consistency
  • /causal → Causal Consistency (vector clocks)
  • /sequential → Sequential Consistency (global order with Lamport clock)
  • /strong → Strong Consistency (simulated primary–backup)

A Python client will send updates as if two nodes (A and B) exist, and you will observe how each endpoint behaves differently.


🎯 Learning Goals

By the end, you will be able to:

  • Explain the difference between Eventual, Causal, Sequential, and Strong consistency.
  • Understand how logical clocks (Lamport and vector clocks) help reason about order.
  • Use CoAP+JSON as a teaching vehicle for distributed system concepts.
  • Read logs and interpret which updates are accepted, ignored, or ordered differently.

🧩 Required Software

  • Python 3.8+
  • aiocoap library

Install with:

python -m venv venv
source venv/bin/activate        # Windows: venv\Scripts\activate
pip install aiocoap

🛠 Step 1 – Create the CoAP Server

Create a file named server.py.

This file defines one server with four resources, one per consistency model.


📡 server.py

#!/usr/bin/env python3
import asyncio
import json

import aiocoap
import aiocoap.resource as resource


# ============================================================
# Eventual Consistency Resource
# ============================================================
# Simple "last writer wins" using a timestamp.
# No causality, no global order, just overwrite if timestamp is newer.
#
# PUT JSON:
#   { "node": "A", "value": 10, "timestamp": 1001 }
#
# Server stores:
#   current_value, current_timestamp
# ============================================================

class EventualResource(resource.Resource):
    def __init__(self):
        super().__init__()
        self.current_value = None
        self.current_ts = -1  # logical timestamp (int)

    async def render_put(self, request):
        try:
            data = json.loads(request.payload.decode("utf-8"))
        except Exception:
            return aiocoap.Message(code=aiocoap.BAD_REQUEST,
                                   payload=b"Invalid JSON")

        node = data["node"]
        value = data["value"]
        ts = int(data["timestamp"])

        if ts >= self.current_ts:
            old_value = self.current_value
            old_ts = self.current_ts
            self.current_value = value
            self.current_ts = ts
            status = "accepted"
            print(f"[EVENTUAL] From {node}: value {old_value} -> {value}, "
                  f"ts {old_ts} -> {ts}")
        else:
            status = "ignored-older"
            print(f"[EVENTUAL] From {node}: IGNORE value={value}, ts={ts}, "
                  f"current_ts={self.current_ts}")

        resp = {
            "model": "eventual",
            "status": status,
            "server_value": self.current_value,
            "server_timestamp": self.current_ts,
        }
        return aiocoap.Message(
            code=aiocoap.CHANGED,
            payload=json.dumps(resp).encode("utf-8")
        )


# ============================================================
# Causal Consistency Resource
# ============================================================
# 2-node vector clock: A=0, B=1
#
# PUT JSON:
#   { "node": "A", "value": 10, "vc": [1,0] }
#
# Rules:
#   - If incoming vc strictly dominates current vc (component-wise),
#       accept update.
#   - If incoming vc is strictly smaller -> ignore as "older".
#   - Else -> concurrent (here we mark as "conflict" but keep existing).
# ============================================================

def vc_lt(a, b):
    '''Return True if vector a < b (component-wise <= and at least one <).'''
    strictly_smaller = False
    for x, y in zip(a, b):
        if x > y:
            return False
        if x < y:
            strictly_smaller = True
    return strictly_smaller


class CausalResource(resource.Resource):
    def __init__(self):
        super().__init__()
        self.current_vc = [0, 0]  # [A, B]
        self.current_value = None

    async def render_put(self, request):
        try:
            data = json.loads(request.payload.decode("utf-8"))
        except Exception:
            return aiocoap.Message(code=aiocoap.BAD_REQUEST,
                                   payload=b"Invalid JSON")

        node = data["node"]
        value = data["value"]
        vc = [int(x) for x in data["vc"]]

        cur = self.current_vc
        cur_val = self.current_value

        if vc_lt(cur, vc):
            # new vc dominates -> accept
            status = "accepted"
            self.current_vc = vc
            self.current_value = value
            print(f"[CAUSAL] From {node}: value {cur_val} -> {value}, "
                  f"vc {cur} -> {vc}")
        elif vc_lt(vc, cur):
            # new vc is older
            status = "ignored-older"
            print(f"[CAUSAL] From {node}: IGNORE value={value}, vc={vc}, "
                  f"current_vc={cur}")
        else:
            # concurrent
            status = "conflict"
            print(f"[CAUSAL] From {node}: CONCURRENT value={value}, vc={vc}, "
                  f"current_vc={cur}")
            # For simplicity, we KEEP existing value.

        resp = {
            "model": "causal",
            "status": status,
            "server_value": self.current_value,
            "server_vc": self.current_vc,
        }
        return aiocoap.Message(
            code=aiocoap.CHANGED,
            payload=json.dumps(resp).encode("utf-8")
        )


# ============================================================
# Sequential Consistency Resource
# ============================================================
# Here we simulate a global order using a Lamport clock.
#
# PUT JSON:
#   { "node": "A", "value": 10, "lamport": 5 }
#
# Server keeps:
#   - server_L: Lamport clock
#   - last_applied_L: last Lamport timestamp applied to state
#
# We accept an update only if its lamport >= last_applied_L.
# This means all replicas conceptually agree on the same global order.
# ============================================================

class SequentialResource(resource.Resource):
    def __init__(self):
        super().__init__()
        self.server_L = 0
        self.last_applied_L = -1
        self.current_value = None

    async def render_put(self, request):
        try:
            data = json.loads(request.payload.decode("utf-8"))
        except Exception:
            return aiocoap.Message(code=aiocoap.BAD_REQUEST,
                                   payload=b"Invalid JSON")

        node = data["node"]
        value = data["value"]
        lamport_from_node = int(data["lamport"])

        old_server_L = self.server_L
        # Lamport clock update on receive
        self.server_L = max(self.server_L, lamport_from_node) + 1

        if lamport_from_node >= self.last_applied_L:
            status = "accepted"
            old_val = self.current_value
            old_L = self.last_applied_L
            self.current_value = value
            self.last_applied_L = lamport_from_node
            print(f"[SEQUENTIAL] From {node}: value {old_val} -> {value}, "
                  f"L {old_L} -> {lamport_from_node}, server_L {old_server_L} -> {self.server_L}")
        else:
            status = "ignored-out-of-order"
            print(f"[SEQUENTIAL] From {node}: IGNORE value={value}, "
                  f"L={lamport_from_node}, last_applied_L={self.last_applied_L}")

        resp = {
            "model": "sequential",
            "status": status,
            "server_value": self.current_value,
            "server_lamport": self.server_L,
            "last_applied_lamport": self.last_applied_L,
        }
        return aiocoap.Message(
            code=aiocoap.CHANGED,
            payload=json.dumps(resp).encode("utf-8")
        )


# ============================================================
# Strong Consistency Resource (Simulated Primary–Backup)
# ============================================================
# For teaching purposes, we simulate a "primary-backup" setup
# INSIDE a single process:
#
# - primary_value: main copy
# - backup_value: backup copy
#
# On PUT, we:
#   1) Write primary_value
#   2) "Synchronously replicate" to backup_value
#   3) Only then respond to client
#
# So any read after a successful write would see the latest value.
#
# PUT JSON:
#   { "node": "A", "value": 10, "op_id": 1 }
# ============================================================

class StrongResource(resource.Resource):
    def __init__(self):
        super().__init__()
        self.primary_value = None
        self.backup_value = None
        self.last_op_id = -1

    async def render_put(self, request):
        try:
            data = json.loads(request.payload.decode("utf-8"))
        except Exception:
            return aiocoap.Message(code=aiocoap.BAD_REQUEST,
                                   payload=b"Invalid JSON")

        node = data["node"]
        value = data["value"]
        op_id = int(data.get("op_id", 0))

        # Simple check to avoid re-applying older ops
        if op_id <= self.last_op_id:
            status = "ignored-duplicate-or-older"
            print(f"[STRONG] From {node}: IGNORE value={value}, op_id={op_id}, last_op_id={self.last_op_id}")
        else:
            # 1. Write to primary
            self.primary_value = value

            # 2. "Synchronous" replication to backup
            self.backup_value = value

            # 3. Commit op id
            old_op = self.last_op_id
            self.last_op_id = op_id
            status = "committed"
            print(f"[STRONG] From {node}: value replicated primary+backup, "
                  f"op_id {old_op} -> {op_id}, value={value}")

        resp = {
            "model": "strong",
            "status": status,
            "primary_value": self.primary_value,
            "backup_value": self.backup_value,
            "last_op_id": self.last_op_id,
        }
        return aiocoap.Message(
            code=aiocoap.CHANGED,
            payload=json.dumps(resp).encode("utf-8")
        )


# ============================================================
# Server main
# ============================================================

async def main():
    root = resource.Site()
    root.add_resource(["eventual"], EventualResource())
    root.add_resource(["causal"], CausalResource())
    root.add_resource(["sequential"], SequentialResource())
    root.add_resource(["strong"], StrongResource())

    print("CoAP server listening on coap://localhost:5683")
    await aiocoap.Context.create_server_context(root, bind=("localhost", 5683))

    # Run forever
    await asyncio.get_running_loop().create_future()


if __name__ == "__main__":
    asyncio.run(main())

🧪 Step 2 – Create the Client

Create a file named client.py.

This client plays the roles of Node A and Node B and sends updates to each of the four endpoints.


📡 client.py

#!/usr/bin/env python3
import asyncio
import json

from aiocoap import Context, Message, PUT


# ============================================================
# Helpers
# ============================================================

async def send_put(ctx, uri, payload_obj, label):
    print(f"\n[{label}] Sending to {uri}: {payload_obj}")
    req = Message(code=PUT,
                  uri=uri,
                  payload=json.dumps(payload_obj).encode("utf-8"))
    resp = await ctx.request(req).response
    print(f"[{label}] Response: {resp.payload.decode('utf-8')}")


# ============================================================
# Eventual Consistency Demo
# ============================================================
# Node A and B send with plain timestamps.
# You can manually choose timestamps to simulate reordering.
# ============================================================

async def eventual_demo(ctx):
    print("\n=== EVENTUAL CONSISTENCY DEMO ===")
    uri = "coap://localhost/eventual"

    # Node A writes with ts=1
    await send_put(ctx, uri,
                   {"node": "A", "value": "A-1", "timestamp": 1},
                   "EVENTUAL")

    # Node B writes later but with smaller timestamp (simulate clock skew)
    await send_put(ctx, uri,
                   {"node": "B", "value": "B-0", "timestamp": 0},
                   "EVENTUAL")

    # Node B writes with larger timestamp
    await send_put(ctx, uri,
                   {"node": "B", "value": "B-2", "timestamp": 2},
                   "EVENTUAL")


# ============================================================
# Causal Consistency Demo
# ============================================================
# Node A and B maintain vector clocks:
#   A: [a0, a1]
#   B: [b0, b1]
# ============================================================

async def causal_demo(ctx):
    print("\n=== CAUSAL CONSISTENCY DEMO ===")
    uri = "coap://localhost/causal"

    V_A = [0, 0]
    V_B = [0, 0]

    # A does a local update: [1,0]
    V_A[0] += 1
    await send_put(ctx, uri,
                   {"node": "A", "value": "A-1", "vc": V_A},
                   "CAUSAL")

    # B sees A's update, merges, and does its own: [1,1]
    V_B[0] = V_A[0]
    V_B[1] += 1
    await send_put(ctx, uri,
                   {"node": "B", "value": "B-2", "vc": V_B},
                   "CAUSAL")

    # A sends an older vector clock to simulate an outdated message: [1,0]
    await send_put(ctx, uri,
                   {"node": "A(OLD)", "value": "A-old", "vc": [1, 0]},
                   "CAUSAL")


# ============================================================
# Sequential Consistency Demo
# ============================================================
# Node A and B use Lamport clocks and the server totally orders ops
# by Lamport timestamps (approximation).
# ============================================================

async def sequential_demo(ctx):
    print("\n=== SEQUENTIAL CONSISTENCY DEMO ===")
    uri = "coap://localhost/sequential"

    # Lamport clocks at nodes
    L_A = 0
    L_B = 0

    # A sends first (L_A=1)
    L_A += 1
    await send_put(ctx, uri,
                   {"node": "A", "value": "A-1", "lamport": L_A},
                   "SEQUENTIAL")

    # B sends, having seen A's L but we simulate local only (L_B=1)
    L_B += 1
    await send_put(ctx, uri,
                   {"node": "B", "value": "B-1", "lamport": L_B},
                   "SEQUENTIAL")

    # A sends again with smaller lamport number (simulate out-of-order)
    await send_put(ctx, uri,
                   {"node": "A(OLD)", "value": "A-old", "lamport": 0},
                   "SEQUENTIAL")


# ============================================================
# Strong Consistency Demo
# ============================================================
# We simulate a primary-backup system with "op_id" acting as a
# monotonically increasing operation id.
# ============================================================

async def strong_demo(ctx):
    print("\n=== STRONG CONSISTENCY DEMO ===")
    uri = "coap://localhost/strong"

    op_id = 0

    # A does operation 1
    op_id += 1
    await send_put(ctx, uri,
                   {"node": "A", "value": "X", "op_id": op_id},
                   "STRONG")

    # B does operation 2
    op_id += 1
    await send_put(ctx, uri,
                   {"node": "B", "value": "Y", "op_id": op_id},
                   "STRONG")

    # A accidentally retries operation 1 (duplicate / older)
    await send_put(ctx, uri,
                   {"node": "A", "value": "X-again", "op_id": 1},
                   "STRONG")


# ============================================================
# Main
# ============================================================

async def main():
    ctx = await Context.create_client_context()

    await eventual_demo(ctx)
    await causal_demo(ctx)
    await sequential_demo(ctx)
    await strong_demo(ctx)


if __name__ == "__main__":
    asyncio.run(main())

▶️ Step 3 – Run the Lab

Open two terminals in the same directory.

1️⃣ Start the CoAP server

python server.py

You should see:

CoAP server listening on coap://localhost:5683

2️⃣ Run the client

In another terminal:

python client.py

Watch the server output and client output carefully.


🔍 What to Observe

Eventual Consistency (/eventual)

  • The update with the largest timestamp eventually wins.
  • Older timestamps can still overwrite temporarily if clocks are skewed (in real systems).
  • Good for simple replication but allows stale reads.

Causal Consistency (/causal)

  • Vector clocks [a0, a1] ensure that causally newer updates are accepted.
  • Outdated updates are ignored; concurrent ones are marked as conflicts.
  • Preserves happens-before but not a single total order.

Sequential Consistency (/sequential)

  • Lamport timestamps simulate a single global order.
  • Server ignores updates that appear out-of-order (smaller Lamport than last applied).
  • All nodes agree on the same sequence, though not tied to real-time.

Strong Consistency (/strong)

  • Every operation is applied to primary and backup before acknowledging.
  • Duplicate or older operation IDs are ignored.
  • Intuition: any read after a committed write would see the latest value.

Previous
Next