🛰️ Lab 7 – CoAP with Lamport & Vector Clocks
Duration: ~1 hour
Environment: Windows, Linux, or macOS
This lab introduces two fundamental concepts in distributed systems—Lamport clocks and Vector clocks—through a simple CoAP server and client written in Python.
You will build:
- A tiny CoAP server exposing endpoints
/lamportand/vector - A tiny CoAP client that sends updates using Lamport and vector timestamps
- Console output that shows ordering, acceptance, and ignored updates
Everything is intentionally minimal to support teaching and experimentation.
🎯 Learning Goals
By the end of this lab, you will:
- Understand how CoAP (Constrained Application Protocol) handles simple PUT requests
- Implement a Lamport logical clock between client and server
- Implement a 2-node vector clock
- Observe how distributed ordering decisions are made in practice
🧩 Required Software
Make sure you have installed:
- Python 3.8+
aiocoaplibrary
Install using:
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install aiocoap
🛠 Step 1 — Create the CoAP Server
Create a file named server.py.
This server implements two resources:
/lamport— demonstrates Lamport clocks\/vector— demonstrates vector-clock ordering (2 nodes only)
📡 server.py
#!/usr/bin/env python3
import asyncio
import json
import aiocoap.resource as resource
import aiocoap
# ---------- Lamport clock resource ----------
class LamportResource(resource.Resource):
def __init__(self):
super().__init__()
self.server_L = 0 # server's Lamport clock
async def render_put(self, request):
try:
data = json.loads(request.payload.decode("utf-8"))
except Exception:
return aiocoap.Message(code=aiocoap.BAD_REQUEST,
payload=b"Invalid JSON")
node = data["node"]
node_L = int(data["lamport"])
value = data["value"]
# Lamport rule: L_s = max(L_s, L_recv) + 1
old = self.server_L
self.server_L = max(self.server_L, node_L) + 1
print(f"[LAMPORT] From {node}: value={value}, node_L={node_L}, "
f"server_L {old} -> {self.server_L}")
resp = {"server_lamport": self.server_L}
return aiocoap.Message(code=aiocoap.CHANGED,
payload=json.dumps(resp).encode("utf-8"))
# ---------- Vector clock resource (2 nodes: A=0, B=1) ----------
class VectorResource(resource.Resource):
def __init__(self):
super().__init__()
self.current_clock = [0, 0]
self.current_value = None
async def render_put(self, request):
try:
data = json.loads(request.payload.decode("utf-8"))
except Exception:
return aiocoap.Message(code=aiocoap.BAD_REQUEST,
payload=b"Invalid JSON")
node = data["node"]
clock = list(map(int, data["clock"]))
value = data["value"]
# Accept if message clock >= current clock (component-wise)
if all(m >= c for m, c in zip(clock, self.current_clock)):
status = "accepted"
old_clock = self.current_clock
old_value = self.current_value
self.current_clock = clock
self.current_value = value
print(f"[VECTOR] From {node}: value {old_value} -> {value}, "
f"clock {old_clock} -> {clock}")
else:
status = "older-ignored"
print(f"[VECTOR] From {node}: IGNORE value={value}, "
f"clock={clock}, current_clock={self.current_clock}")
resp = {
"status": status,
"server_clock": self.current_clock,
"server_value": self.current_value,
}
return aiocoap.Message(code=aiocoap.CHANGED,
payload=json.dumps(resp).encode("utf-8"))
# ---------- Server main ----------
async def main():
root = resource.Site()
root.add_resource(["lamport"], LamportResource())
root.add_resource(["vector"], VectorResource())
print("CoAP server listening on coap://localhost:5683")
await aiocoap.Context.create_server_context(root, bind=("localhost", 5683))
await asyncio.get_running_loop().create_future()
if __name__ == "__main__":
asyncio.run(main())
🧪 Step 2 — Create the CoAP Client
Create a file named client.py.
This client:
- Uses Lamport clocks to update
/lamport - Uses vector clocks to update
/vector - Shows accepted and rejected updates
📡 client.py
#!/usr/bin/env python3
import asyncio
import json
from aiocoap import Context, Message, PUT
# ---------- Lamport demo (single node A) ----------
async def lamport_demo():
ctx = await Context.create_client_context()
node = "A"
L = 0
async def send(value):
nonlocal L
L += 1
payload = {"node": node, "lamport": L, "value": value}
print(f"\n[LAMPORT CLIENT] Sending: {payload}")
req = Message(code=PUT,
uri="coap://localhost/lamport",
payload=json.dumps(payload).encode("utf-8"))
resp = await ctx.request(req).response
print("[LAMPORT CLIENT] Response:", resp.payload.decode("utf-8"))
await send(10)
await send(20)
await send(15)
# ---------- Vector demo (2 nodes: A=0, B=1) ----------
async def vector_demo():
ctx = await Context.create_client_context()
V_A = [0, 0]
V_B = [0, 0]
async def send_from(node_name, clock, value):
payload = {"node": node_name, "clock": clock, "value": value}
print(f"\n[VECTOR CLIENT] From {node_name} sending: {payload}")
req = Message(code=PUT,
uri="coap://localhost/vector",
payload=json.dumps(payload).encode("utf-8"))
resp = await ctx.request(req).response
print("[VECTOR CLIENT] Response:", resp.payload.decode("utf-8"))
# A sends: [1,0]
V_A[0] += 1
await send_from("A", V_A.copy(), 10)
# B sends after seeing A: [1,1]
V_B[1] += 1
V_B[0] = V_A[0]
await send_from("B", V_B.copy(), 20)
# A sends an old clock: [1,0] again
await send_from("A (old)", [1, 0], 999)
# ---------- Main ----------
async def main():
print("=== Lamport example ===")
await lamport_demo()
print("\n=== Vector clock example ===")
await vector_demo()
if __name__ == "__main__":
asyncio.run(main())
▶️ Step 3 — Run the Lab
Start the server (Terminal 1)
python server.py
Run the client (Terminal 2)
python client.py
You will observe clock updates, accepted messages, and ignored outdated events.
PART 2 CoAp and Consistency Models
In this lab, you will explore how different consistency models can be illustrated using CoAP (Constrained Application Protocol) and simple Python code.
We will use a single CoAP server exposing four endpoints, each approximating a different consistency model:
/eventual→ Eventual Consistency/causal→ Causal Consistency (vector clocks)/sequential→ Sequential Consistency (global order with Lamport clock)/strong→ Strong Consistency (simulated primary–backup)
A Python client will send updates as if two nodes (A and B) exist, and you will observe how each endpoint behaves differently.
🎯 Learning Goals
By the end, you will be able to:
- Explain the difference between Eventual, Causal, Sequential, and Strong consistency.
- Understand how logical clocks (Lamport and vector clocks) help reason about order.
- Use CoAP+JSON as a teaching vehicle for distributed system concepts.
- Read logs and interpret which updates are accepted, ignored, or ordered differently.
🧩 Required Software
- Python 3.8+
aiocoaplibrary
Install with:
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install aiocoap
🛠 Step 1 – Create the CoAP Server
Create a file named server.py.
This file defines one server with four resources, one per consistency model.
📡 server.py
#!/usr/bin/env python3
import asyncio
import json
import aiocoap
import aiocoap.resource as resource
# ============================================================
# Eventual Consistency Resource
# ============================================================
# Simple "last writer wins" using a timestamp.
# No causality, no global order, just overwrite if timestamp is newer.
#
# PUT JSON:
# { "node": "A", "value": 10, "timestamp": 1001 }
#
# Server stores:
# current_value, current_timestamp
# ============================================================
class EventualResource(resource.Resource):
def __init__(self):
super().__init__()
self.current_value = None
self.current_ts = -1 # logical timestamp (int)
async def render_put(self, request):
try:
data = json.loads(request.payload.decode("utf-8"))
except Exception:
return aiocoap.Message(code=aiocoap.BAD_REQUEST,
payload=b"Invalid JSON")
node = data["node"]
value = data["value"]
ts = int(data["timestamp"])
if ts >= self.current_ts:
old_value = self.current_value
old_ts = self.current_ts
self.current_value = value
self.current_ts = ts
status = "accepted"
print(f"[EVENTUAL] From {node}: value {old_value} -> {value}, "
f"ts {old_ts} -> {ts}")
else:
status = "ignored-older"
print(f"[EVENTUAL] From {node}: IGNORE value={value}, ts={ts}, "
f"current_ts={self.current_ts}")
resp = {
"model": "eventual",
"status": status,
"server_value": self.current_value,
"server_timestamp": self.current_ts,
}
return aiocoap.Message(
code=aiocoap.CHANGED,
payload=json.dumps(resp).encode("utf-8")
)
# ============================================================
# Causal Consistency Resource
# ============================================================
# 2-node vector clock: A=0, B=1
#
# PUT JSON:
# { "node": "A", "value": 10, "vc": [1,0] }
#
# Rules:
# - If incoming vc strictly dominates current vc (component-wise),
# accept update.
# - If incoming vc is strictly smaller -> ignore as "older".
# - Else -> concurrent (here we mark as "conflict" but keep existing).
# ============================================================
def vc_lt(a, b):
'''Return True if vector a < b (component-wise <= and at least one <).'''
strictly_smaller = False
for x, y in zip(a, b):
if x > y:
return False
if x < y:
strictly_smaller = True
return strictly_smaller
class CausalResource(resource.Resource):
def __init__(self):
super().__init__()
self.current_vc = [0, 0] # [A, B]
self.current_value = None
async def render_put(self, request):
try:
data = json.loads(request.payload.decode("utf-8"))
except Exception:
return aiocoap.Message(code=aiocoap.BAD_REQUEST,
payload=b"Invalid JSON")
node = data["node"]
value = data["value"]
vc = [int(x) for x in data["vc"]]
cur = self.current_vc
cur_val = self.current_value
if vc_lt(cur, vc):
# new vc dominates -> accept
status = "accepted"
self.current_vc = vc
self.current_value = value
print(f"[CAUSAL] From {node}: value {cur_val} -> {value}, "
f"vc {cur} -> {vc}")
elif vc_lt(vc, cur):
# new vc is older
status = "ignored-older"
print(f"[CAUSAL] From {node}: IGNORE value={value}, vc={vc}, "
f"current_vc={cur}")
else:
# concurrent
status = "conflict"
print(f"[CAUSAL] From {node}: CONCURRENT value={value}, vc={vc}, "
f"current_vc={cur}")
# For simplicity, we KEEP existing value.
resp = {
"model": "causal",
"status": status,
"server_value": self.current_value,
"server_vc": self.current_vc,
}
return aiocoap.Message(
code=aiocoap.CHANGED,
payload=json.dumps(resp).encode("utf-8")
)
# ============================================================
# Sequential Consistency Resource
# ============================================================
# Here we simulate a global order using a Lamport clock.
#
# PUT JSON:
# { "node": "A", "value": 10, "lamport": 5 }
#
# Server keeps:
# - server_L: Lamport clock
# - last_applied_L: last Lamport timestamp applied to state
#
# We accept an update only if its lamport >= last_applied_L.
# This means all replicas conceptually agree on the same global order.
# ============================================================
class SequentialResource(resource.Resource):
def __init__(self):
super().__init__()
self.server_L = 0
self.last_applied_L = -1
self.current_value = None
async def render_put(self, request):
try:
data = json.loads(request.payload.decode("utf-8"))
except Exception:
return aiocoap.Message(code=aiocoap.BAD_REQUEST,
payload=b"Invalid JSON")
node = data["node"]
value = data["value"]
lamport_from_node = int(data["lamport"])
old_server_L = self.server_L
# Lamport clock update on receive
self.server_L = max(self.server_L, lamport_from_node) + 1
if lamport_from_node >= self.last_applied_L:
status = "accepted"
old_val = self.current_value
old_L = self.last_applied_L
self.current_value = value
self.last_applied_L = lamport_from_node
print(f"[SEQUENTIAL] From {node}: value {old_val} -> {value}, "
f"L {old_L} -> {lamport_from_node}, server_L {old_server_L} -> {self.server_L}")
else:
status = "ignored-out-of-order"
print(f"[SEQUENTIAL] From {node}: IGNORE value={value}, "
f"L={lamport_from_node}, last_applied_L={self.last_applied_L}")
resp = {
"model": "sequential",
"status": status,
"server_value": self.current_value,
"server_lamport": self.server_L,
"last_applied_lamport": self.last_applied_L,
}
return aiocoap.Message(
code=aiocoap.CHANGED,
payload=json.dumps(resp).encode("utf-8")
)
# ============================================================
# Strong Consistency Resource (Simulated Primary–Backup)
# ============================================================
# For teaching purposes, we simulate a "primary-backup" setup
# INSIDE a single process:
#
# - primary_value: main copy
# - backup_value: backup copy
#
# On PUT, we:
# 1) Write primary_value
# 2) "Synchronously replicate" to backup_value
# 3) Only then respond to client
#
# So any read after a successful write would see the latest value.
#
# PUT JSON:
# { "node": "A", "value": 10, "op_id": 1 }
# ============================================================
class StrongResource(resource.Resource):
def __init__(self):
super().__init__()
self.primary_value = None
self.backup_value = None
self.last_op_id = -1
async def render_put(self, request):
try:
data = json.loads(request.payload.decode("utf-8"))
except Exception:
return aiocoap.Message(code=aiocoap.BAD_REQUEST,
payload=b"Invalid JSON")
node = data["node"]
value = data["value"]
op_id = int(data.get("op_id", 0))
# Simple check to avoid re-applying older ops
if op_id <= self.last_op_id:
status = "ignored-duplicate-or-older"
print(f"[STRONG] From {node}: IGNORE value={value}, op_id={op_id}, last_op_id={self.last_op_id}")
else:
# 1. Write to primary
self.primary_value = value
# 2. "Synchronous" replication to backup
self.backup_value = value
# 3. Commit op id
old_op = self.last_op_id
self.last_op_id = op_id
status = "committed"
print(f"[STRONG] From {node}: value replicated primary+backup, "
f"op_id {old_op} -> {op_id}, value={value}")
resp = {
"model": "strong",
"status": status,
"primary_value": self.primary_value,
"backup_value": self.backup_value,
"last_op_id": self.last_op_id,
}
return aiocoap.Message(
code=aiocoap.CHANGED,
payload=json.dumps(resp).encode("utf-8")
)
# ============================================================
# Server main
# ============================================================
async def main():
root = resource.Site()
root.add_resource(["eventual"], EventualResource())
root.add_resource(["causal"], CausalResource())
root.add_resource(["sequential"], SequentialResource())
root.add_resource(["strong"], StrongResource())
print("CoAP server listening on coap://localhost:5683")
await aiocoap.Context.create_server_context(root, bind=("localhost", 5683))
# Run forever
await asyncio.get_running_loop().create_future()
if __name__ == "__main__":
asyncio.run(main())
🧪 Step 2 – Create the Client
Create a file named client.py.
This client plays the roles of Node A and Node B and sends updates to each of the four endpoints.
📡 client.py
#!/usr/bin/env python3
import asyncio
import json
from aiocoap import Context, Message, PUT
# ============================================================
# Helpers
# ============================================================
async def send_put(ctx, uri, payload_obj, label):
print(f"\n[{label}] Sending to {uri}: {payload_obj}")
req = Message(code=PUT,
uri=uri,
payload=json.dumps(payload_obj).encode("utf-8"))
resp = await ctx.request(req).response
print(f"[{label}] Response: {resp.payload.decode('utf-8')}")
# ============================================================
# Eventual Consistency Demo
# ============================================================
# Node A and B send with plain timestamps.
# You can manually choose timestamps to simulate reordering.
# ============================================================
async def eventual_demo(ctx):
print("\n=== EVENTUAL CONSISTENCY DEMO ===")
uri = "coap://localhost/eventual"
# Node A writes with ts=1
await send_put(ctx, uri,
{"node": "A", "value": "A-1", "timestamp": 1},
"EVENTUAL")
# Node B writes later but with smaller timestamp (simulate clock skew)
await send_put(ctx, uri,
{"node": "B", "value": "B-0", "timestamp": 0},
"EVENTUAL")
# Node B writes with larger timestamp
await send_put(ctx, uri,
{"node": "B", "value": "B-2", "timestamp": 2},
"EVENTUAL")
# ============================================================
# Causal Consistency Demo
# ============================================================
# Node A and B maintain vector clocks:
# A: [a0, a1]
# B: [b0, b1]
# ============================================================
async def causal_demo(ctx):
print("\n=== CAUSAL CONSISTENCY DEMO ===")
uri = "coap://localhost/causal"
V_A = [0, 0]
V_B = [0, 0]
# A does a local update: [1,0]
V_A[0] += 1
await send_put(ctx, uri,
{"node": "A", "value": "A-1", "vc": V_A},
"CAUSAL")
# B sees A's update, merges, and does its own: [1,1]
V_B[0] = V_A[0]
V_B[1] += 1
await send_put(ctx, uri,
{"node": "B", "value": "B-2", "vc": V_B},
"CAUSAL")
# A sends an older vector clock to simulate an outdated message: [1,0]
await send_put(ctx, uri,
{"node": "A(OLD)", "value": "A-old", "vc": [1, 0]},
"CAUSAL")
# ============================================================
# Sequential Consistency Demo
# ============================================================
# Node A and B use Lamport clocks and the server totally orders ops
# by Lamport timestamps (approximation).
# ============================================================
async def sequential_demo(ctx):
print("\n=== SEQUENTIAL CONSISTENCY DEMO ===")
uri = "coap://localhost/sequential"
# Lamport clocks at nodes
L_A = 0
L_B = 0
# A sends first (L_A=1)
L_A += 1
await send_put(ctx, uri,
{"node": "A", "value": "A-1", "lamport": L_A},
"SEQUENTIAL")
# B sends, having seen A's L but we simulate local only (L_B=1)
L_B += 1
await send_put(ctx, uri,
{"node": "B", "value": "B-1", "lamport": L_B},
"SEQUENTIAL")
# A sends again with smaller lamport number (simulate out-of-order)
await send_put(ctx, uri,
{"node": "A(OLD)", "value": "A-old", "lamport": 0},
"SEQUENTIAL")
# ============================================================
# Strong Consistency Demo
# ============================================================
# We simulate a primary-backup system with "op_id" acting as a
# monotonically increasing operation id.
# ============================================================
async def strong_demo(ctx):
print("\n=== STRONG CONSISTENCY DEMO ===")
uri = "coap://localhost/strong"
op_id = 0
# A does operation 1
op_id += 1
await send_put(ctx, uri,
{"node": "A", "value": "X", "op_id": op_id},
"STRONG")
# B does operation 2
op_id += 1
await send_put(ctx, uri,
{"node": "B", "value": "Y", "op_id": op_id},
"STRONG")
# A accidentally retries operation 1 (duplicate / older)
await send_put(ctx, uri,
{"node": "A", "value": "X-again", "op_id": 1},
"STRONG")
# ============================================================
# Main
# ============================================================
async def main():
ctx = await Context.create_client_context()
await eventual_demo(ctx)
await causal_demo(ctx)
await sequential_demo(ctx)
await strong_demo(ctx)
if __name__ == "__main__":
asyncio.run(main())
▶️ Step 3 – Run the Lab
Open two terminals in the same directory.
1️⃣ Start the CoAP server
python server.py
You should see:
CoAP server listening on coap://localhost:5683
2️⃣ Run the client
In another terminal:
python client.py
Watch the server output and client output carefully.
🔍 What to Observe
Eventual Consistency (/eventual)
- The update with the largest timestamp eventually wins.
- Older timestamps can still overwrite temporarily if clocks are skewed (in real systems).
- Good for simple replication but allows stale reads.
Causal Consistency (/causal)
- Vector clocks
[a0, a1]ensure that causally newer updates are accepted. - Outdated updates are ignored; concurrent ones are marked as conflicts.
- Preserves happens-before but not a single total order.
Sequential Consistency (/sequential)
- Lamport timestamps simulate a single global order.
- Server ignores updates that appear out-of-order (smaller Lamport than last applied).
- All nodes agree on the same sequence, though not tied to real-time.
Strong Consistency (/strong)
- Every operation is applied to primary and backup before acknowledging.
- Duplicate or older operation IDs are ignored.
- Intuition: any read after a committed write would see the latest value.