Publish/Subscribe Messaging

Lab 06 — Messaging via MQTT

Deadlines:

  • End of lab session (GitHub checkpoint): commit & push your progress to your team repository.
  • Before next lab (eClass submission): upload (1) a .zip with your code and (2) a PDF export of labs/lab06/README.md.

Submission contents:

  • (1) a .zip with your code, and
  • (2) a PDF export of labs/lab06/README.md.

Intro to what you need to do

Up to now your producer and consumer have lived inside the same Python script, communicating through a shared in-memory queue. That works on a single machine, but it means the producer and consumer are stuck together. They start and stop together, they share the same process, and they cannot run on different devices. Of course this is not realistic or a good strategy in general for edge devices, but it was much simpler to implement and “play” with them.

In this lab you will break them apart. The producer becomes a standalone script that reads the PIR sensor and publishes motion events to an MQTT broker. The consumer becomes a separate script that subscribes to the broker and receives those events. The broker sits in between, decoupling the two completely. The producer does not know who is listening, the consumer does not know who is producing, and they do not even need to be running at the same time.

This is the pub/sub pattern from the lecture. Instead of threads sharing a queue, you have independent processes sharing a message broker.

You will work in three main stages:

  1. Set up Mosquitto — install the broker and the command-line clients, and get comfortable with publishing and subscribing from the terminal
  2. Explore MQTT from the terminal — experiment with topics, wildcards, QoS levels, and retained messages so you understand how the broker works before writing any code
  3. Split the pipeline — rewrite your producer and consumer as two separate Python scripts that communicate through MQTT instead of a shared queue

By the end, your system looks like this:

┌──────────────┐        MQTT         ┌──────────────┐
│   producer   │ ──── publish ────▶ │  Mosquitto    │
│              │                     │   broker     │
│  PIR sensor  │                     │              │
│  sampler     │                     │              │
│  interpreter │                     └──────┬───────┘
└──────────────┘                            │
                                        subscribe
                                     ┌──────▼───────┐
                                     │   consumer   │
                                     │              │
                                     │  receives    │
                                     │  enriches    │
                                     │  writes JSONL│
                                     └──────────────┘

Create the following structure:

/
├── README.md
├── labs/
│   ├── lab01/
│   ├── ...
│   └── lab06/
│       ├── README.md
│       ├── requirements.txt
│       ├── producer.py
│       ├── consumer.py
│       └── pirlib/
│           ├── __init__.py
│           ├── sampler.py
│           └── interpreter.py

Copy your pirlib/ from the previous lab. The run_pipeline.py file will not exist in this lab as it gets replaced by producer.py and consumer.py.

Part 1 — Install and start Mosquitto

Install the Mosquitto broker and the command-line clients on your Raspberry Pi:

sudo apt-get update
sudo apt-get install -y mosquitto mosquitto-clients

Check that the broker is running:

systemctl status mosquitto

You should see something like Active: active (running). The broker is now listening on port 1883 (the default MQTT port) and ready to accept connections.

If it is not running, start it:

sudo systemctl start mosquitto
sudo systemctl enable mosquitto

The enable command makes it start automatically on boot, which in general you will want on an edge device.

Part 2 — Explore MQTT from the terminal

Before writing any Python, spend some time with the command-line tools. This will build your intuition for how topics, subscriptions, and message delivery work.

Basic publish and subscribe

Open two terminals. In the first, start a subscriber:

mosquitto_sub -h localhost -t "test/hello"

This connects to the broker on localhost and subscribes to the topic test/hello. It will sit there waiting for messages.

In the second terminal, publish a message:

mosquitto_pub -h localhost -t "test/hello" -m "world"

You should see world appear in the subscriber terminal. Try publishing a few more messages and watch them arrive.

Topic hierarchy

MQTT topics are hierarchical, separated by /. Try this:

# Terminal 1: subscribe to a specific topic
mosquitto_sub -h localhost -t "smartbin/pir-01/motion"
# Terminal 2: publish to that topic
mosquitto_pub -h localhost -t "smartbin/pir-01/motion" -m '{"state": "detected"}'

Notice you can publish JSON as the message payload — MQTT does not care about the format, it just delivers bytes.

Wildcards

Now try wildcard subscriptions. With the subscriber still running, try these in separate terminals:

# Subscribe to ALL topics under smartbin/pir-01/
mosquitto_sub -h localhost -t "smartbin/pir-01/#"
# Subscribe to motion events from ANY device
mosquitto_sub -h localhost -t "smartbin/+/motion"

In another terminal, publish to different topics and see which subscribers receive what:

mosquitto_pub -h localhost -t "smartbin/pir-01/motion" -m "detected"
mosquitto_pub -h localhost -t "smartbin/pir-01/status" -m "online"
mosquitto_pub -h localhost -t "smartbin/pir-02/motion" -m "detected"
mosquitto_pub -h localhost -t "smartbin/ultrasonic-01/fill" -m "72"

Observe: which messages did smartbin/pir-01/# receive? Which did smartbin/+/motion receive? This is the wildcard filtering. # matches any number of levels, + matches exactly one level.

QoS levels

Try publishing with different Quality of Service levels:

# QoS 0: at most once (fire and forget)
mosquitto_pub -h localhost -t "test/qos" -m "qos0 message" -q 0

# QoS 1: at least once (acknowledged)
mosquitto_pub -h localhost -t "test/qos" -m "qos1 message" -q 1

# QoS 2: exactly once (four-step handshake)
mosquitto_pub -h localhost -t "test/qos" -m "qos2 message" -q 2

With a subscriber listening on test/qos, all three should arrive. The difference is in what happens when the network is unreliable, for now, on localhost, they all (should) behave the same. Think about when you would choose each level.

Retained messages

Normally, if you publish a message and nobody is subscribed, the message is lost. Retained messages change this:

# Publish a retained message
mosquitto_pub -h localhost -t "smartbin/pir-01/status" -m "online" -r

Now start a new subscriber after the publish:

mosquitto_sub -h localhost -t "smartbin/pir-01/status"

The subscriber immediately receives online even though the message was published before it connected. The broker stored the last retained message for that topic. This is useful for status information, a new subscriber can immediately learn the current state without waiting for the next update.

To clear a retained message, publish an empty message with the retain flag:

mosquitto_pub -h localhost -t "smartbin/pir-01/status" -r -n

Experiment on your own

Spend a few minutes trying things out. Some ideas:

  • Subscribe to # (all topics) and publish to various topics — see everything that flows through the broker
  • Open three or four subscribers on different topics and see how one publish reaches some but not others
  • Try publishing a multi-line JSON payload
  • Try subscribing to a topic, then killing the subscriber, publishing a message, and restarting the subscriber — does the message arrive? (Hint: what QoS are you using? Is the session clean?)

The more you explore now, the fewer surprises you will have when writing the Python code.

Part 3 — Split the pipeline into publisher and subscriber

Now you will take your Lab pipeline and split it into two independent programs that communicate through MQTT instead of a shared queue.

Install the Python MQTT library

Add paho-mqtt to your requirements.txt:

paho-mqtt

Install it:

pip install paho-mqtt

Design your topic structure

Before writing code, define a topic structure for your system.

Consider how your topics should be organized to support different subscription patterns. For example, think about how you might:

  • Access events from a specific sensor
  • Access all events from a specific device
  • Access the same type of event across multiple devices

Reflect on how hierarchical topic design and the use of wildcards (+, #) can support these use cases.

Document your proposed topic structure in your report and briefly justify your design choices.

Write the producer

Create producer.py. This script does what the producer thread used to do : reads the PIR sensor, interprets the signal, and produces structured events. But instead of putting records into a Python queue, it publishes them to the MQTT broker.

The structure can look like this (pseudocode: you need to implement it):

Import paho.mqtt.client
Import your sampler and interpreter from pirlib

Create an MQTT client
Connect to the broker (localhost, port 1883)
Start the MQTT network loop (client.loop_start())

Set up the sampler and interpreter as before
Set run_id, seq counter

While not stopped:
    Read a sample from the sampler
    Update the interpreter

    For each event the interpreter returns:
        Increment seq
        Build the event record (same fields as before, with JSON-LD context)
        Convert the record to a JSON string
        Publish the JSON string to your chosen MQTT topic
        Print a status line if verbose

    Sleep for the sample interval

On shutdown:
    Publish a retained status message ("offline") to a status topic
    Disconnect from the broker

A few things to think about:

QoS: what level should you publish with? QoS 0 is fastest but messages can be lost. QoS 1 guarantees delivery but can duplicate. For motion events, which trade-off makes sense? You can make this a command-line argument.

Retained messages: consider publishing the sensor status (online/offline) as a retained message on a separate topic like smartbin/bin-01/pir-01/status. That way, any new subscriber immediately knows whether the sensor is alive.

The event payload: this should be the same JSON structure you had before (with the JSON-LD context from Lab 05 if you implemented it). MQTT carries the bytes, you serialize to JSON before publishing and deserialize after receiving.

Write the consumer

Create consumer.py. This script does what the consumer thread used to do: receives events, adds ingest_time and pipeline_latency_ms, and writes them to a JSONL file. But instead of reading from a Python queue, it subscribes to the MQTT broker and receives events through a callback.

The structure looks like this (pseudocode):

Import paho.mqtt.client
Import json, datetime

Define the on_message callback:
    Parse the message payload as JSON
    Add ingest_time (UTC now)
    Calculate pipeline_latency_ms (ingest_time - event_time)
    Add both fields to the record
    Write the record as a JSON line to the output file
    Flush the file
    Update and print metrics (total received, average latency, etc.)

Create an MQTT client
Assign the on_message callback to the client
Connect to the broker (localhost, port 1883)
Subscribe to your chosen topic (with your chosen QoS)

Run the MQTT network loop (client.loop_forever())

The key difference from the threaded version is the callback pattern. In paho-mqtt, you do not poll a queue but you define an on_message function and the library calls it every time a message arrives. This is event-driven programming.

client.loop_forever() is a blocking call that handles the network connection, processes incoming messages, and calls your callbacks. It runs until you call client.disconnect() or the script is interrupted.

Handle KeyboardInterrupt (Ctrl-C) to shut down cleanly:

try:
    client.loop_forever()
except KeyboardInterrupt:
    print("\n[consumer] shutting down...")
    client.disconnect()

Command-line arguments

Both scripts should accept command-line arguments. (Beyond what you had before) At minimum:

producer.py:

python producer.py \
  --broker localhost \
  --port 1883 \
  --topic smartbin/bin-01/pir-01/events \
  --device-id pir-01 \
  --pin 18 \
  --sample-interval 0.1 \
  --cooldown 5 \
  --min-high 0.2 \
  --qos 1 \

consumer.py:

python consumer.py \
  --broker localhost \
  --port 1883 \
  --topic "smartbin/bin-01/pir-01/events" \
  --qos 1 \
  --out motion_events.jsonl \

Run it

  1. Make sure Mosquitto is running
  2. Start the consumer first (so it is subscribed before events arrive):
    python consumer.py --broker localhost --topic "smartbin/bin-01/pir-01/events" --out output/events.jsonl --verbose
    
  3. In a second terminal, start the producer:
    python producer.py --broker localhost --topic smartbin/bin-01/pir-01/events --pin 18 --verbose
    
  4. Trigger motion events and watch the consumer receive and log them

You should see the producer publishing messages and the consumer receiving them, adding ingest_time and pipeline_latency_ms, and writing JSON output, the same as before, but now through a broker instead of shared memory.

Verify it really is decoupled

Try these experiments to see that the pub/sub decoupling actually works:

  1. Stop the consumer, keep the producer running. The producer keeps publishing since it does not care that nobody is listening. Start the consumer again. Does it pick up new events? (What about the ones published while it was down? Think about QoS and clean sessions.)

  2. Run the consumer with a wildcard topic. Use (e.gsmartbin/bin-01/#) and see what it receives. If you published retained status messages from the producer, the consumer should get those too.

  3. Open a second consumer subscribing to the same topic. Both should receive every message, the broker delivers to all subscribers.

  4. Use mosquitto_sub alongside your Python consumer. Both should receive the same messages. This shows that the broker does not care who or what is subscribing and any MQTT client works.

Update your Docker Compose setup from the previous labs to include the Mosquitto broker as a service:

Probably the producer and consumer will need to connect to the broker using the service name (e.g broker) as the hostname. Docker Compose networking handles this automatically. You would change --broker localhost to --broker broker in the container commands.

Why this lab is structured this way

In Lab 03, your producer and consumer were threads inside the same process. They communicated through a Python Queue object in shared memory. This is fast and simple, but it locks the two components together. Also in general usually you would only have production on the edge device not consumption as well. By moving to MQTT, you decouple them completely. The producer and consumer are now separate programs. They can run on different machines. They can start and stop independently. You can have multiple consumers subscribing to the same events. You can add a new consumer that sends data to the cloud without changing the producer at all. This is the middleware layer from the lecture. The producer does not open a socket to the consumer. It publishes to a topic. Whoever is subscribed receives the data. This pattern scales in ways that direct connections do not. Understanding topics, wildcards, QoS, and retained messages is essential for designing a system that behaves correctly. When you build the full Smart Wastebin, you will need to decide: what topic structure? What QoS for sensor data vs. commands? Should device status be retained? These are design decisions that affect reliability and operability.

Report questions

Answer the following in your labs/lab06/README.md after the implementation and experiments are complete.

MQTT fundamentals

RQ1: What is the role of the MQTT broker? Why don’t we just let the producer and consumer communicate directly (e.g via sockets)?
RQ2: What topic structure did you choose and why? How does it support future extensibility (more sensors, more bins)?
RQ3: Explain the difference between QoS 0, 1, and 2 in your own words. Which did you use for your motion events and why?
RQ4: What is a retained message? Give one concrete example of when it would be useful for your system.

Terminal exploration

RQ5: When you subscribed to smartbin/+/motion, which messages did you receive and which did you not? Explain why, based on how + works.
RQ6: What happened when you subscribed to #? Why is this useful for debugging but dangerous in production?
RQ7: You published a message while no subscriber was connected (without the retain flag). Then you started a subscriber. Did it receive the message? Why or why not?

Pipeline changes

RQ8: What are the main differences between your run_pipeline.py (threaded queue) and the new producer.py + consumer.py (MQTT)?
RQ9: In the threaded version, what happened when the queue was full? In the MQTT version, what happens if the consumer is slow or offline?
RQ10: How does the callback pattern in paho-mqtt (on_message) differ from the polling pattern you used in the threaded consumer (queue.get(timeout=0.5))?
RQ11: Show one example JSON record from the MQTT-based consumer. Is the structure the same as in previous labs? What about pipeline_latency_ms, is it higher or lower? Why?

Decoupling experiments

RQ12: You stopped the consumer and kept the producer running. What happened to the messages published during that time? Were they delivered when the consumer restarted?
RQ13: You ran two consumers on the same topic. Did both receive every message? Why does this matter for building scalable systems?
RQ14: Could you run the producer on one Raspberry Pi and the consumer on a different machine (e.g., your laptop)? What would you need to change?

Reflection

RQ15: In your own words, what does “decoupling” mean in the context of pub/sub? What are the practical benefits?
RQ16: If the Mosquitto broker itself crashes, what happens to your system? How could you mitigate this?

Project hint: Smart Wastebin

MQTT is going to be the communication backbone of your project. Now that sensor events flow through a broker, you can build other components that subscribe to the same topics independently:

Some potential examples:

  • A fill-level monitor that subscribes to smartbin/bin-01/ultrasonic-01/events
  • A dashboard that subscribes to smartbin/# and shows all events across all bins
  • An alerting service that subscribes to fill-level events and sends a notification when a bin is over 80% full
  • A data logger that subscribes to everything and writes a permanent archive

None of these components need to know about each other. They all talk to the broker. This is the power of pub/sub, you can grow the system by adding subscribers without the need to add direct connections etc. .

Start thinking about your topic hierarchy for the full project. A well-designed topic structure is like a well-designed database schema, it is much easier to get it right at the start than to change it later.


What should be finished before you leave the lab

Before the end of the session you should have: installed Mosquitto and the command-line clients, explored pub/sub from the terminal (topics, wildcards, QoS, retained messages), written producer.py that publishes motion events to MQTT, written consumer.py that subscribes and writes JSONL output, run both together and verified end-to-end event delivery, tested decoupling scenarios (consumer offline, multiple consumers, wildcard subscriptions), updated labs/lab06/README.md with code and report answers, and pushed to GitHub.

Final checklist (Lab 06)

  • Mosquitto broker installed and running
  • Terminal pub/sub experiments completed (topics, wildcards, QoS, retained)
  • producer.py publishes PIR motion events to MQTT
  • consumer.py subscribes and writes JSONL with ingest_time and pipeline_latency_ms
  • Producer and consumer run as separate processes
  • End-to-end pipeline works through the broker
  • Decoupling verified (consumer offline, multiple consumers, wildcard sub)
  • Command-line arguments supported (broker, port, topic, QoS, etc.)
  • requirements.txt includes paho-mqtt
  • labs/lab06/README.md contains code, run steps, and report answers
  • Commit and push completed

Deliverables and submission

What must exist in the repository (by end of lab)

/
├── README.md
├── labs/
│   ├── lab01/
│   ├── ...
│   └── lab06/
│       ├── README.md
│       ├── requirements.txt
│       ├── producer.py
│       ├── consumer.py
│       └── pirlib/
│           ├── __init__.py
│           ├── sampler.py
│           └── interpreter.py

Do not include:

  • venv/
  • __pycache__/
  • *.pyc
  • output/ or *.jsonl
  • large temporary files unless explicitly requested

What labs/lab06/README.md must contain

Two clearly separated parts:

  1. Code / runbook — include your topic structure, example commands, and how to run the producer and consumer
  2. Answers to report questions

Same style as previous labs.


End of lab session — GitHub checkpoint

Before leaving:

  • commit your progress
  • push to your team GitHub repository

Minimum expectation:

  • all deliverables tracked by Git
  • latest commit pushed
  • commit message is clear

Before next lab — eClass submission

Submit both:

  1. Code archive (.zip)
  2. PDF export of labs/lab06/README.md

Required PDF filename format:

  • lab06_REPORT_<team>.pdf

What follows is a greek version of the same lab

Προθεσμίες:

  • Τέλος εργαστηριακής συνεδρίας (GitHub checkpoint): κάντε commit & push την πρόοδό σας στο repository της ομάδας σας.
  • Πριν το επόμενο εργαστήριο (υποβολή στο eClass): ανεβάστε (1) ένα .zip με τον κώδικά σας και (2) ένα PDF export του labs/lab06/README.md.

Περιεχόμενο υποβολής:

  • (1) ένα .zip με τον κώδικά σας, και
  • (2) ένα PDF export του labs/lab06/README.md.

Εισαγωγή στο τι πρέπει να κάνετε

Μέχρι τώρα ο producer και ο consumer σας ζούσαν μέσα στο ίδιο Python script, επικοινωνώντας μέσω ενός κοινόχρηστου in-memory queue. Αυτό λειτουργεί σε ένα μόνο μηχάνημα, αλλά σημαίνει ότι ο producer και ο consumer είναι συνδεδεμένοι μεταξύ τους. Ξεκινούν και σταματούν μαζί, μοιράζονται την ίδια διεργασία, και δεν μπορούν να τρέξουν σε διαφορετικές συσκευές. Φυσικά αυτό δεν είναι ρεαλιστικό ή καλή στρατηγική γενικά για edge devices, αλλά ήταν πολύ απλούστερο να υλοποιηθεί και να «παιχτεί» μαζί τους.

Σε αυτό το εργαστήριο θα τους διαχωρίσετε. Ο producer γίνεται ένα αυτόνομο script που διαβάζει τον PIR sensor και publishes motion events σε έναν MQTT broker. Ο consumer γίνεται ένα ξεχωριστό script που κάνει subscribe στον broker και λαμβάνει αυτά τα events. Ο broker βρίσκεται ανάμεσά τους, αποσυνδέοντάς τους πλήρως. Ο producer δεν ξέρει ποιος ακούει, ο consumer δεν ξέρει ποιος παράγει, και δεν χρειάζεται καν να τρέχουν ταυτόχρονα.

Αυτό είναι το pub/sub pattern από τη διάλεξη. Αντί για threads που μοιράζονται ένα queue, έχετε ανεξάρτητες διεργασίες που μοιράζονται έναν message broker.

Θα δουλέψετε σε τρία κύρια στάδια:

  1. Εγκατάσταση του Mosquitto — εγκαταστήστε τον broker και τους command-line clients, και εξοικειωθείτε με το publishing και subscribing από το terminal
  2. Εξερεύνηση του MQTT από το terminal — πειραματιστείτε με topics, wildcards, QoS levels, και retained messages για να καταλάβετε πώς λειτουργεί ο broker πριν γράψετε κώδικα
  3. Διαχωρισμός του pipeline — ξαναγράψτε τον producer και τον consumer σας ως δύο ξεχωριστά Python scripts που επικοινωνούν μέσω MQTT αντί για κοινόχρηστο queue

Στο τέλος, το σύστημά σας μοιάζει ως εξής:

┌──────────────┐        MQTT         ┌──────────────┐
│   producer   │ ──── publish ────▶ │  Mosquitto    │
│              │                     │   broker     │
│  PIR sensor  │                     │              │
│  sampler     │                     │              │
│  interpreter │                     └──────┬───────┘
└──────────────┘                            │
                                        subscribe
                                     ┌──────▼───────┐
                                     │   consumer   │
                                     │              │
                                     │  receives    │
                                     │  enriches    │
                                     │  writes JSONL│
                                     └──────────────┘

Δημιουργήστε την παρακάτω δομή:

/
├── README.md
├── labs/
│   ├── lab01/
│   ├── ...
│   └── lab06/
│       ├── README.md
│       ├── requirements.txt
│       ├── producer.py
│       ├── consumer.py
│       └── pirlib/
│           ├── __init__.py
│           ├── sampler.py
│           └── interpreter.py

Αντιγράψτε το pirlib/ από το προηγούμενο εργαστήριο. Το αρχείο run_pipeline.py δεν θα υπάρχει σε αυτό το εργαστήριο καθώς αντικαθίσταται από τα producer.py και consumer.py.


Μέρος 1 — Εγκατάσταση και εκκίνηση του Mosquitto

Εγκαταστήστε τον Mosquitto broker και τους command-line clients στο Raspberry Pi σας:

sudo apt-get update
sudo apt-get install -y mosquitto mosquitto-clients

Ελέγξτε ότι ο broker τρέχει:

systemctl status mosquitto

Θα πρέπει να δείτε κάτι σαν Active: active (running). Ο broker ακούει τώρα στη port 1883 (προεπιλεγμένο MQTT port) και είναι έτοιμος να δεχτεί συνδέσεις.

Αν δεν τρέχει, ξεκινήστε τον:

sudo systemctl start mosquitto
sudo systemctl enable mosquitto

Η εντολή enable τον κάνει να ξεκινά αυτόματα κατά το boot, κάτι που γενικά θέλετε σε ένα edge device.


Μέρος 2 — Εξερεύνηση του MQTT από το terminal

Πριν γράψετε Python, αφιερώστε χρόνο με τα command-line εργαλεία. Αυτό θα χτίσει τη διαίσθησή σας για το πώς λειτουργούν τα topics, τα subscriptions, και η παράδοση μηνυμάτων.

Βασικό publish και subscribe

Ανοίξτε δύο terminals. Στο πρώτο, ξεκινήστε έναν subscriber:

mosquitto_sub -h localhost -t "test/hello"

Αυτό συνδέεται στον broker στο localhost και κάνει subscribe στο topic test/hello. Θα περιμένει εκεί για μηνύματα.

Στο δεύτερο terminal, κάντε publish ένα μήνυμα:

mosquitto_pub -h localhost -t "test/hello" -m "world"

Θα πρέπει να δείτε το world να εμφανίζεται στο terminal του subscriber. Δοκιμάστε να κάνετε publish μερικά ακόμα μηνύματα και παρακολουθήστε τα να φτάνουν.

Topic hierarchy

Τα MQTT topics είναι ιεραρχικά, διαχωρισμένα με /. Δοκιμάστε αυτό:

# Terminal 1: subscribe σε συγκεκριμένο topic
mosquitto_sub -h localhost -t "smartbin/pir-01/motion"
# Terminal 2: publish σε αυτό το topic
mosquitto_pub -h localhost -t "smartbin/pir-01/motion" -m '{"state": "detected"}'

Παρατηρήστε ότι μπορείτε να κάνετε publish JSON ως message payload — το MQTT δεν ενδιαφέρεται για τη μορφή, απλώς παραδίδει bytes.

Wildcards

Τώρα δοκιμάστε wildcard subscriptions. Με τον subscriber ακόμα να τρέχει, δοκιμάστε αυτά σε ξεχωριστά terminals:

# Subscribe σε ΟΛΑ τα topics κάτω από το smartbin/pir-01/
mosquitto_sub -h localhost -t "smartbin/pir-01/#"
# Subscribe σε motion events από ΟΠΟΙΑΔΗΠΟΤΕ συσκευή
mosquitto_sub -h localhost -t "smartbin/+/motion"

Σε άλλο terminal, κάντε publish σε διαφορετικά topics και δείτε ποιοι subscribers λαμβάνουν τι:

mosquitto_pub -h localhost -t "smartbin/pir-01/motion" -m "detected"
mosquitto_pub -h localhost -t "smartbin/pir-01/status" -m "online"
mosquitto_pub -h localhost -t "smartbin/pir-02/motion" -m "detected"
mosquitto_pub -h localhost -t "smartbin/ultrasonic-01/fill" -m "72"

Παρατηρήστε: ποια μηνύματα έλαβε το smartbin/pir-01/#; Ποια έλαβε το smartbin/+/motion; Αυτό είναι το wildcard filtering. Το # ταιριάζει με οποιοδήποτε αριθμό επιπέδων, το + ταιριάζει ακριβώς ένα επίπεδο.

QoS levels

Δοκιμάστε publishing με διαφορετικά Quality of Service levels:

# QoS 0: at most once (fire and forget)
mosquitto_pub -h localhost -t "test/qos" -m "qos0 message" -q 0

# QoS 1: at least once (acknowledged)
mosquitto_pub -h localhost -t "test/qos" -m "qos1 message" -q 1

# QoS 2: exactly once (four-step handshake)
mosquitto_pub -h localhost -t "test/qos" -m "qos2 message" -q 2

Με έναν subscriber που ακούει στο test/qos, και οι τρεις θα πρέπει να φτάσουν. Η διαφορά έγκειται στο τι συμβαίνει όταν το δίκτυο είναι αναξιόπιστο — προς το παρόν, στο localhost, όλα (θα πρέπει να) συμπεριφέρονται το ίδιο. Σκεφτείτε πότε θα επιλέγατε κάθε επίπεδο.

Retained messages

Κανονικά, αν κάνετε publish ένα μήνυμα και κανείς δεν είναι subscribed, το μήνυμα χάνεται. Τα retained messages αλλάζουν αυτό:

# Publish ένα retained message
mosquitto_pub -h localhost -t "smartbin/pir-01/status" -m "online" -r

Τώρα ξεκινήστε έναν νέο subscriber μετά το publish:

mosquitto_sub -h localhost -t "smartbin/pir-01/status"

Ο subscriber λαμβάνει αμέσως το online παρόλο που το μήνυμα δημοσιεύτηκε πριν συνδεθεί. Ο broker αποθήκευσε το τελευταίο retained message για εκείνο το topic. Αυτό είναι χρήσιμο για πληροφορίες κατάστασης — ένας νέος subscriber μπορεί αμέσως να μάθει την τρέχουσα κατάσταση χωρίς να περιμένει την επόμενη ενημέρωση.

Για να διαγράψετε ένα retained message, κάντε publish ένα κενό μήνυμα με το retain flag:

mosquitto_pub -h localhost -t "smartbin/pir-01/status" -r -n

Πειραματιστείτε μόνοι σας

Αφιερώστε μερικά λεπτά για να δοκιμάσετε πράγματα. Μερικές ιδέες:

  • Κάντε subscribe στο # (όλα τα topics) και publish σε διάφορα topics — δείτε όλα όσα ρέουν μέσα από τον broker
  • Ανοίξτε τρεις ή τέσσερις subscribers σε διαφορετικά topics και δείτε πώς ένα publish φτάνει σε μερικούς αλλά όχι σε άλλους
  • Δοκιμάστε publishing ενός multi-line JSON payload
  • Δοκιμάστε να κάνετε subscribe σε ένα topic, να σκοτώσετε τον subscriber, να κάνετε publish ένα μήνυμα, και να επανεκκινήσετε τον subscriber — φτάνει το μήνυμα; (Υπόδειξη: τι QoS χρησιμοποιείτε; Είναι το session clean;)

Όσο περισσότερο εξερευνάτε τώρα, τόσο λιγότερες εκπλήξεις θα έχετε όταν γράφετε τον Python κώδικα.


Μέρος 3 — Διαχωρισμός του pipeline σε publisher και subscriber

Τώρα θα πάρετε το Lab pipeline σας και θα το χωρίσετε σε δύο ανεξάρτητα προγράμματα που επικοινωνούν μέσω MQTT αντί για κοινόχρηστο queue.

Εγκατάσταση της Python MQTT βιβλιοθήκης

Προσθέστε το paho-mqtt στο requirements.txt σας:

paho-mqtt

Εγκαταστήστε το:

pip install paho-mqtt

Σχεδιασμός της topic structure σας

Πριν γράψετε κώδικα, ορίστε μια topic structure για το σύστημά σας.

Σκεφτείτε πώς πρέπει να οργανωθούν τα topics σας για να υποστηρίξουν διαφορετικά subscription patterns. Για παράδειγμα, σκεφτείτε πώς θα μπορούσατε να:

  • Αποκτήσετε πρόσβαση σε events από έναν συγκεκριμένο sensor
  • Αποκτήσετε πρόσβαση σε όλα τα events από μια συγκεκριμένη συσκευή
  • Αποκτήσετε πρόσβαση στον ίδιο τύπο event σε πολλαπλές συσκευές

Σκεφτείτε πώς ο ιεραρχικός σχεδιασμός topic και η χρήση wildcards (+, #) μπορούν να υποστηρίξουν αυτές τις περιπτώσεις χρήσης.

Τεκμηριώστε την προτεινόμενη topic structure σας στην αναφορά σας και δικαιολογήστε εν συντομία τις επιλογές σχεδιασμού σας.

Γράψτε τον producer

Δημιουργήστε το producer.py. Αυτό το script κάνει ό,τι έκανε το producer thread παλιά: διαβάζει τον PIR sensor, ερμηνεύει το σήμα, και παράγει structured events. Αλλά αντί να βάζει records σε ένα Python queue, τα publishes στον MQTT broker.

Η δομή μπορεί να μοιάζει ως εξής (pseudocode: πρέπει να το υλοποιήσετε εσείς):

Import paho.mqtt.client
Import τον sampler και interpreter σας από pirlib

Δημιουργήστε έναν MQTT client
Συνδεθείτε στον broker (localhost, port 1883)
Ξεκινήστε το MQTT network loop (client.loop_start())

Ρυθμίστε τον sampler και interpreter όπως πριν
Ορίστε run_id, μετρητή seq

While not stopped:
    Διαβάστε ένα sample από τον sampler
    Ενημερώστε τον interpreter

    Για κάθε event που επιστρέφει ο interpreter:
        Αυξήστε τον seq
        Δημιουργήστε το event record (ίδια fields με πριν, με JSON-LD context)
        Μετατρέψτε το record σε JSON string
        Κάντε publish το JSON string στο MQTT topic που επιλέξατε
        Εκτυπώστε γραμμή κατάστασης αν verbose

    Sleep για το sample interval

Κατά τον τερματισμό:
    Κάντε publish ένα retained status message ("offline") σε ένα status topic
    Αποσυνδεθείτε από τον broker

Μερικά πράγματα για να σκεφτείτε:

QoS: με ποιο επίπεδο θα κάνετε publish; Το QoS 0 είναι το πιο γρήγορο αλλά μηνύματα μπορεί να χαθούν. Το QoS 1 εγγυάται παράδοση αλλά μπορεί να προκαλέσει διπλότυπα. Για motion events, ποιος συμβιβασμός έχει νόημα; Μπορείτε να το κάνετε command-line argument.

Retained messages: σκεφτείτε το publishing της κατάστασης του sensor (online/offline) ως retained message σε ένα ξεχωριστό topic όπως smartbin/bin-01/pir-01/status. Έτσι, οποιοσδήποτε νέος subscriber θα ξέρει αμέσως αν ο sensor είναι ζωντανός.

Το event payload: αυτό πρέπει να είναι η ίδια JSON δομή που είχατε πριν (με το JSON-LD context από το Lab 05 αν το υλοποιήσατε). Το MQTT μεταφέρει τα bytes — σειριοποιείτε σε JSON πριν το publishing και αποσειριοποιείτε μετά τη λήψη.

Γράψτε τον consumer

Δημιουργήστε το consumer.py. Αυτό το script κάνει ό,τι έκανε το consumer thread παλιά: λαμβάνει events, προσθέτει ingest_time και pipeline_latency_ms, και τα γράφει σε ένα JSONL αρχείο. Αλλά αντί να διαβάζει από ένα Python queue, κάνει subscribe στον MQTT broker και λαμβάνει events μέσω callback.

Η δομή μοιάζει ως εξής (pseudocode):

Import paho.mqtt.client
Import json, datetime

Ορίστε το on_message callback:
    Κάντε parse το message payload ως JSON
    Προσθέστε ingest_time (UTC now)
    Υπολογίστε pipeline_latency_ms (ingest_time - event_time)
    Προσθέστε και τα δύο fields στο record
    Γράψτε το record ως JSON line στο output file
    Κάντε flush το αρχείο
    Ενημερώστε και εκτυπώστε metrics (σύνολο ληφθέντων, μέση latency, κ.λπ.)

Δημιουργήστε έναν MQTT client
Αναθέστε το on_message callback στον client
Συνδεθείτε στον broker (localhost, port 1883)
Κάντε subscribe στο topic που επιλέξατε (με το QoS που επιλέξατε)

Τρέξτε το MQTT network loop (client.loop_forever())

Η βασική διαφορά από την threaded έκδοση είναι το callback pattern. Στο paho-mqtt, δεν κάνετε poll ένα queue — ορίζετε μια συνάρτηση on_message και η βιβλιοθήκη την καλεί κάθε φορά που φτάνει ένα μήνυμα. Αυτός είναι event-driven προγραμματισμός.

Το client.loop_forever() είναι blocking call που χειρίζεται τη σύνδεση δικτύου, επεξεργάζεται εισερχόμενα μηνύματα, και καλεί τα callbacks σας. Τρέχει μέχρι να καλέσετε client.disconnect() ή να διακοπεί το script.

Χειριστείτε το KeyboardInterrupt (Ctrl-C) για να τερματίσετε καθαρά:

try:
    client.loop_forever()
except KeyboardInterrupt:
    print("\n[consumer] shutting down...")
    client.disconnect()

Command-line arguments

Και τα δύο scripts πρέπει να δέχονται command-line arguments. Κατ’ ελάχιστον:

producer.py:

python producer.py \
  --broker localhost \
  --port 1883 \
  --topic smartbin/bin-01/pir-01/events \
  --device-id pir-01 \
  --pin 18 \
  --sample-interval 0.1 \
  --cooldown 5 \
  --min-high 0.2 \
  --qos 1 \

consumer.py:

python consumer.py \
  --broker localhost \
  --port 1883 \
  --topic "smartbin/bin-01/pir-01/events" \
  --qos 1 \
  --out motion_events.jsonl \

Εκτέλεση

  1. Βεβαιωθείτε ότι το Mosquitto τρέχει
  2. Ξεκινήστε πρώτα τον consumer (ώστε να είναι subscribed πριν φτάσουν τα events):
    python consumer.py --broker localhost --topic "smartbin/bin-01/pir-01/events" --out output/events.jsonl --verbose
    
  3. Σε ένα δεύτερο terminal, ξεκινήστε τον producer:
    python producer.py --broker localhost --topic smartbin/bin-01/pir-01/events --pin 18 --verbose
    
  4. Προκαλέστε motion events και παρακολουθήστε τον consumer να τα λαμβάνει και να τα καταγράφει

Θα πρέπει να δείτε τον producer να κάνει publish μηνύματα και τον consumer να τα λαμβάνει, προσθέτοντας ingest_time και pipeline_latency_ms, και γράφοντας JSON output — το ίδιο με πριν, αλλά τώρα μέσω broker αντί για shared memory.

Επαληθεύστε ότι είναι πραγματικά αποσυνδεδεμένα

Δοκιμάστε αυτά τα πειράματα για να δείτε ότι η pub/sub αποσύζευξη λειτουργεί πραγματικά:

  1. Σταματήστε τον consumer, κρατήστε τον producer να τρέχει. Ο producer συνεχίζει να κάνει publish αφού δεν ενδιαφέρεται αν κάποιος ακούει. Ξεκινήστε ξανά τον consumer. Παραλαμβάνει νέα events; (Τι γίνεται με αυτά που δημοσιεύτηκαν ενόσω ήταν εκτός; Σκεφτείτε το QoS και τα clean sessions.)

  2. Τρέξτε τον consumer με wildcard topic. Χρησιμοποιήστε (π.χ. smartbin/bin-01/#) και δείτε τι λαμβάνει. Αν δημοσιεύσατε retained status messages από τον producer, ο consumer θα πρέπει να τα λάβει και αυτά.

  3. Ανοίξτε έναν δεύτερο consumer subscribed στο ίδιο topic. Και οι δύο θα πρέπει να λαμβάνουν κάθε μήνυμα — ο broker παραδίδει σε όλους τους subscribers.

  4. Χρησιμοποιήστε mosquitto_sub παράλληλα με τον Python consumer σας. Και οι δύο θα πρέπει να λαμβάνουν τα ίδια μηνύματα. Αυτό δείχνει ότι ο broker δεν ενδιαφέρεται ποιος ή τι κάνει subscribe — οποιοσδήποτε MQTT client λειτουργεί.


Μέρος 4 — Containerization με Docker Compose (προαιρετικό αλλά συνιστάται)

Ενημερώστε το Docker Compose setup σας από τα προηγούμενα εργαστήρια ώστε να συμπεριλάβει τον Mosquitto broker ως service:

Πιθανότατα ο producer και ο consumer θα χρειαστεί να συνδεθούν στον broker χρησιμοποιώντας το service name (π.χ. broker) ως hostname. Το Docker Compose networking το χειρίζεται αυτόματα. Θα αλλάξετε --broker localhost σε --broker broker στις εντολές container.


Γιατί αυτό το εργαστήριο είναι δομημένο έτσι

Στο Lab 03, ο producer και ο consumer σας ήταν threads μέσα στην ίδια διεργασία. Επικοινωνούσαν μέσω ενός Python Queue object στη shared memory. Αυτό είναι γρήγορο και απλό, αλλά δεσμεύει τα δύο components μαζί. Επίσης γενικά συνήθως θα είχατε μόνο production στο edge device, όχι και consumption.

Μεταβαίνοντας στο MQTT, τα αποσυνδέετε πλήρως. Ο producer και ο consumer είναι τώρα ξεχωριστά προγράμματα. Μπορούν να τρέξουν σε διαφορετικά μηχανήματα. Μπορούν να ξεκινούν και να σταματούν ανεξάρτητα. Μπορείτε να έχετε πολλαπλούς consumers που κάνουν subscribe στα ίδια events. Μπορείτε να προσθέσετε έναν νέο consumer που στέλνει δεδομένα στο cloud χωρίς να αλλάξετε καθόλου τον producer.

Αυτό είναι το middleware layer από τη διάλεξη. Ο producer δεν ανοίγει socket στον consumer. Κάνει publish σε ένα topic. Όποιος είναι subscribed λαμβάνει τα δεδομένα. Αυτό το pattern κλιμακώνεται με τρόπους που οι άμεσες συνδέσεις δεν μπορούν.

Η κατανόηση topics, wildcards, QoS, και retained messages είναι απαραίτητη για το σχεδιασμό ενός συστήματος που συμπεριφέρεται σωστά. Όταν φτιάχνετε το πλήρες Smart Wastebin, θα χρειαστεί να αποφασίσετε: ποια topic structure; Τι QoS για sensor data έναντι commands; Πρέπει η κατάσταση συσκευής να είναι retained; Αυτές είναι σχεδιαστικές αποφάσεις που επηρεάζουν την αξιοπιστία και τη λειτουργικότητα.


Ερωτήσεις αναφοράς

Απαντήστε τα παρακάτω στο labs/lab06/README.md σας αφού ολοκληρωθεί η υλοποίηση και τα πειράματα.

Βασικές έννοιες MQTT

RQ1: Ποιος είναι ο ρόλος του MQTT broker; Γιατί δεν αφήνουμε τον producer και τον consumer να επικοινωνούν απευθείας (π.χ. μέσω sockets);
RQ2: Ποια topic structure επιλέξατε και γιατί; Πώς υποστηρίζει τη μελλοντική επεκτασιμότητα (περισσότεροι sensors, περισσότερα bins);
RQ3: Εξηγήστε τη διαφορά μεταξύ QoS 0, 1, και 2 με δικά σας λόγια. Ποιο χρησιμοποιήσατε για τα motion events σας και γιατί;
RQ4: Τι είναι ένα retained message; Δώστε ένα συγκεκριμένο παράδειγμα πότε θα ήταν χρήσιμο για το σύστημά σας.

Εξερεύνηση από το terminal

RQ5: Όταν κάνατε subscribe στο smartbin/+/motion, ποια μηνύματα λάβατε και ποια όχι; Εξηγήστε γιατί, βάσει του πώς λειτουργεί το +.
RQ6: Τι συνέβη όταν κάνατε subscribe στο #; Γιατί αυτό είναι χρήσιμο για debugging αλλά επικίνδυνο στο production;
RQ7: Κάνατε publish ένα μήνυμα ενώ δεν ήταν συνδεδεμένος κανένας subscriber (χωρίς το retain flag). Μετά ξεκινήσατε έναν subscriber. Έλαβε το μήνυμα; Γιατί ή γιατί όχι;

Αλλαγές στο pipeline

RQ8: Ποιες είναι οι κύριες διαφορές μεταξύ του run_pipeline.py σας (threaded queue) και του νέου producer.py + consumer.py (MQTT);
RQ9: Στην threaded έκδοση, τι συνέβαινε όταν το queue ήταν γεμάτο; Στην MQTT έκδοση, τι συμβαίνει αν ο consumer είναι αργός ή offline;
RQ10: Πώς διαφέρει το callback pattern στο paho-mqtt (on_message) από το polling pattern που χρησιμοποιούσατε στον threaded consumer (queue.get(timeout=0.5));
RQ11: Δείξτε ένα παράδειγμα JSON record από τον MQTT-based consumer. Είναι η δομή ίδια με τα προηγούμενα εργαστήρια; Τι γίνεται με το pipeline_latency_ms — είναι υψηλότερο ή χαμηλότερο; Γιατί;

Πειράματα αποσύζευξης

RQ12: Σταματήσατε τον consumer και κρατήσατε τον producer να τρέχει. Τι συνέβη στα μηνύματα που δημοσιεύτηκαν εκείνη την ώρα; Παραδόθηκαν όταν επανεκκίνησε ο consumer;
RQ13: Τρέξατε δύο consumers στο ίδιο topic. Έλαβαν και οι δύο κάθε μήνυμα; Γιατί αυτό έχει σημασία για τη δημιουργία κλιμακούμενων συστημάτων;
RQ14: Θα μπορούσατε να τρέξετε τον producer σε ένα Raspberry Pi και τον consumer σε διαφορετικό μηχάνημα (π.χ. τον laptop σας); Τι θα χρειαζόταν να αλλάξετε;

Αναστοχασμός

RQ15: Με δικά σας λόγια, τι σημαίνει «αποσύζευξη» στο context του pub/sub; Ποια είναι τα πρακτικά οφέλη;
RQ16: Αν ο ίδιος ο Mosquitto broker πέσει, τι συμβαίνει στο σύστημά σας; Πώς θα μπορούσατε να το μετριάσετε αυτό;


Υπόδειξη project: Smart Wastebin

Το MQTT θα αποτελέσει τη βασική επικοινωνιακή υποδομή του project σας. Τώρα που τα sensor events ρέουν μέσω broker, μπορείτε να φτιάξετε άλλα components που κάνουν subscribe στα ίδια topics ανεξάρτητα:

Μερικά πιθανά παραδείγματα:

  • Ένα fill-level monitor που κάνει subscribe στο smartbin/bin-01/ultrasonic-01/events
  • Ένα dashboard που κάνει subscribe στο smartbin/# και εμφανίζει όλα τα events σε όλα τα bins
  • Ένα alerting service που κάνει subscribe σε fill-level events και στέλνει ειδοποίηση όταν ένα bin είναι πάνω από 80% γεμάτο
  • Ένα data logger που κάνει subscribe σε τα πάντα και γράφει μόνιμο αρχείο

Κανένα από αυτά τα components δεν χρειάζεται να ξέρει για τα άλλα. Όλα επικοινωνούν με τον broker. Αυτή είναι η δύναμη του pub/sub — μπορείτε να αναπτύξετε το σύστημα προσθέτοντας subscribers χωρίς να χρειάζεστε άμεσες συνδέσεις κ.λπ.

Αρχίστε να σκέφτεστε την topic hierarchy για το πλήρες project. Μια καλά σχεδιασμένη topic structure είναι σαν ένα καλά σχεδιασμένο database schema — είναι πολύ πιο εύκολο να το κάνετε σωστά από την αρχή παρά να το αλλάξετε αργότερα.


Τι πρέπει να έχει ολοκληρωθεί πριν φύγετε από το εργαστήριο

Πριν το τέλος της συνεδρίας θα πρέπει να έχετε: εγκαταστήσει το Mosquitto και τους command-line clients, εξερευνήσει το pub/sub από το terminal (topics, wildcards, QoS, retained messages), γράψει producer.py που κάνει publish motion events στο MQTT, γράψει consumer.py που κάνει subscribe και γράφει JSONL output, τρέξει και τα δύο μαζί και επαληθεύσει την end-to-end παράδοση events, δοκιμάσει σενάρια αποσύζευξης (consumer offline, πολλαπλοί consumers, wildcard subscriptions), ενημερώσει το labs/lab06/README.md με κώδικα και απαντήσεις αναφοράς, και κάνει push στο GitHub.


Τελικό checklist (Lab 06)

  • Mosquitto broker εγκατεστημένο και σε λειτουργία
  • Terminal pub/sub πειράματα ολοκληρωμένα (topics, wildcards, QoS, retained)
  • producer.py κάνει publish PIR motion events στο MQTT
  • consumer.py κάνει subscribe και γράφει JSONL με ingest_time και pipeline_latency_ms
  • Producer και consumer τρέχουν ως ξεχωριστές διεργασίες
  • End-to-end pipeline λειτουργεί μέσω broker
  • Αποσύζευξη επαληθευμένη (consumer offline, πολλαπλοί consumers, wildcard sub)
  • Command-line arguments υποστηρίζονται (broker, port, topic, QoS, κ.λπ.)
  • requirements.txt περιλαμβάνει paho-mqtt
  • labs/lab06/README.md περιέχει κώδικα, βήματα εκτέλεσης, και απαντήσεις αναφοράς
  • Commit και push ολοκληρωμένα

Παραδοτέα και υποβολή

Τι πρέπει να υπάρχει στο repository (έως το τέλος του εργαστηρίου)

/
├── README.md
├── labs/
│   ├── lab01/
│   ├── ...
│   └── lab06/
│       ├── README.md
│       ├── requirements.txt
│       ├── producer.py
│       ├── consumer.py
│       └── pirlib/
│           ├── __init__.py
│           ├── sampler.py
│           └── interpreter.py

Μην συμπεριλάβετε:

  • venv/
  • __pycache__/
  • *.pyc
  • output/ ή *.jsonl
  • μεγάλα προσωρινά αρχεία εκτός αν ζητηθεί ρητά

Τι πρέπει να περιέχει το labs/lab06/README.md

Δύο σαφώς διαχωρισμένα μέρη:

  1. Κώδικας / runbook — συμπεριλάβετε την topic structure σας, παραδείγματα εντολών, και πώς να τρέξετε τον producer και τον consumer
  2. Απαντήσεις στις ερωτήσεις αναφοράς

Ίδιο στυλ με τα προηγούμενα εργαστήρια.


Τέλος εργαστηριακής συνεδρίας — GitHub checkpoint

Πριν φύγετε:

  • κάντε commit την πρόοδό σας
  • κάντε push στο GitHub repository της ομάδας σας

Ελάχιστη προσδοκία:

  • όλα τα παραδοτέα παρακολουθούνται από το Git
  • το τελευταίο commit έχει γίνει push
  • το commit message είναι σαφές

Πριν το επόμενο εργαστήριο — υποβολή στο eClass

Υποβάλετε και τα δύο:

  1. Αρχείο κώδικα (.zip)
  2. PDF export του labs/lab06/README.md

Απαιτούμενη μορφή ονόματος PDF αρχείου:

  • lab06_REPORT_<team>.pdf
Previous
Next