Lab 06 — Messaging via MQTT
Deadlines:
- End of lab session (GitHub checkpoint): commit & push your progress to your team repository.
- Before next lab (eClass submission): upload (1) a
.zipwith your code and (2) a PDF export oflabs/lab06/README.md.
Submission contents:
- (1) a
.zipwith your code, and - (2) a PDF export of
labs/lab06/README.md.
Intro to what you need to do
Up to now your producer and consumer have lived inside the same Python script, communicating through a shared in-memory queue. That works on a single machine, but it means the producer and consumer are stuck together. They start and stop together, they share the same process, and they cannot run on different devices. Of course this is not realistic or a good strategy in general for edge devices, but it was much simpler to implement and “play” with them.
In this lab you will break them apart. The producer becomes a standalone script that reads the PIR sensor and publishes motion events to an MQTT broker. The consumer becomes a separate script that subscribes to the broker and receives those events. The broker sits in between, decoupling the two completely. The producer does not know who is listening, the consumer does not know who is producing, and they do not even need to be running at the same time.
This is the pub/sub pattern from the lecture. Instead of threads sharing a queue, you have independent processes sharing a message broker.
You will work in three main stages:
- Set up Mosquitto — install the broker and the command-line clients, and get comfortable with publishing and subscribing from the terminal
- Explore MQTT from the terminal — experiment with topics, wildcards, QoS levels, and retained messages so you understand how the broker works before writing any code
- Split the pipeline — rewrite your producer and consumer as two separate Python scripts that communicate through MQTT instead of a shared queue
By the end, your system looks like this:
┌──────────────┐ MQTT ┌──────────────┐
│ producer │ ──── publish ────▶ │ Mosquitto │
│ │ │ broker │
│ PIR sensor │ │ │
│ sampler │ │ │
│ interpreter │ └──────┬───────┘
└──────────────┘ │
subscribe
│
┌──────▼───────┐
│ consumer │
│ │
│ receives │
│ enriches │
│ writes JSONL│
└──────────────┘
Create the following structure:
/
├── README.md
├── labs/
│ ├── lab01/
│ ├── ...
│ └── lab06/
│ ├── README.md
│ ├── requirements.txt
│ ├── producer.py
│ ├── consumer.py
│ └── pirlib/
│ ├── __init__.py
│ ├── sampler.py
│ └── interpreter.py
Copy your pirlib/ from the previous lab. The run_pipeline.py file will not exist in this lab as it gets replaced by producer.py and consumer.py.
Part 1 — Install and start Mosquitto
Install the Mosquitto broker and the command-line clients on your Raspberry Pi:
sudo apt-get update
sudo apt-get install -y mosquitto mosquitto-clients
Check that the broker is running:
systemctl status mosquitto
You should see something like Active: active (running). The broker is now listening on port 1883 (the default MQTT port) and ready to accept connections.
If it is not running, start it:
sudo systemctl start mosquitto
sudo systemctl enable mosquitto
The enable command makes it start automatically on boot, which in general you will want on an edge device.
Part 2 — Explore MQTT from the terminal
Before writing any Python, spend some time with the command-line tools. This will build your intuition for how topics, subscriptions, and message delivery work.
Basic publish and subscribe
Open two terminals. In the first, start a subscriber:
mosquitto_sub -h localhost -t "test/hello"
This connects to the broker on localhost and subscribes to the topic test/hello. It will sit there waiting for messages.
In the second terminal, publish a message:
mosquitto_pub -h localhost -t "test/hello" -m "world"
You should see world appear in the subscriber terminal. Try publishing a few more messages and watch them arrive.
Topic hierarchy
MQTT topics are hierarchical, separated by /. Try this:
# Terminal 1: subscribe to a specific topic
mosquitto_sub -h localhost -t "smartbin/pir-01/motion"
# Terminal 2: publish to that topic
mosquitto_pub -h localhost -t "smartbin/pir-01/motion" -m '{"state": "detected"}'
Notice you can publish JSON as the message payload — MQTT does not care about the format, it just delivers bytes.
Wildcards
Now try wildcard subscriptions. With the subscriber still running, try these in separate terminals:
# Subscribe to ALL topics under smartbin/pir-01/
mosquitto_sub -h localhost -t "smartbin/pir-01/#"
# Subscribe to motion events from ANY device
mosquitto_sub -h localhost -t "smartbin/+/motion"
In another terminal, publish to different topics and see which subscribers receive what:
mosquitto_pub -h localhost -t "smartbin/pir-01/motion" -m "detected"
mosquitto_pub -h localhost -t "smartbin/pir-01/status" -m "online"
mosquitto_pub -h localhost -t "smartbin/pir-02/motion" -m "detected"
mosquitto_pub -h localhost -t "smartbin/ultrasonic-01/fill" -m "72"
Observe: which messages did smartbin/pir-01/# receive? Which did smartbin/+/motion receive? This is the wildcard filtering. # matches any number of levels, + matches exactly one level.
QoS levels
Try publishing with different Quality of Service levels:
# QoS 0: at most once (fire and forget)
mosquitto_pub -h localhost -t "test/qos" -m "qos0 message" -q 0
# QoS 1: at least once (acknowledged)
mosquitto_pub -h localhost -t "test/qos" -m "qos1 message" -q 1
# QoS 2: exactly once (four-step handshake)
mosquitto_pub -h localhost -t "test/qos" -m "qos2 message" -q 2
With a subscriber listening on test/qos, all three should arrive. The difference is in what happens when the network is unreliable, for now, on localhost, they all (should) behave the same. Think about when you would choose each level.
Retained messages
Normally, if you publish a message and nobody is subscribed, the message is lost. Retained messages change this:
# Publish a retained message
mosquitto_pub -h localhost -t "smartbin/pir-01/status" -m "online" -r
Now start a new subscriber after the publish:
mosquitto_sub -h localhost -t "smartbin/pir-01/status"
The subscriber immediately receives online even though the message was published before it connected. The broker stored the last retained message for that topic. This is useful for status information, a new subscriber can immediately learn the current state without waiting for the next update.
To clear a retained message, publish an empty message with the retain flag:
mosquitto_pub -h localhost -t "smartbin/pir-01/status" -r -n
Experiment on your own
Spend a few minutes trying things out. Some ideas:
- Subscribe to
#(all topics) and publish to various topics — see everything that flows through the broker - Open three or four subscribers on different topics and see how one publish reaches some but not others
- Try publishing a multi-line JSON payload
- Try subscribing to a topic, then killing the subscriber, publishing a message, and restarting the subscriber — does the message arrive? (Hint: what QoS are you using? Is the session clean?)
The more you explore now, the fewer surprises you will have when writing the Python code.
Part 3 — Split the pipeline into publisher and subscriber
Now you will take your Lab pipeline and split it into two independent programs that communicate through MQTT instead of a shared queue.
Install the Python MQTT library
Add paho-mqtt to your requirements.txt:
paho-mqtt
Install it:
pip install paho-mqtt
Design your topic structure
Before writing code, define a topic structure for your system.
Consider how your topics should be organized to support different subscription patterns. For example, think about how you might:
- Access events from a specific sensor
- Access all events from a specific device
- Access the same type of event across multiple devices
Reflect on how hierarchical topic design and the use of wildcards (+, #) can support these use cases.
Document your proposed topic structure in your report and briefly justify your design choices.
Write the producer
Create producer.py. This script does what the producer thread used to do : reads the PIR sensor, interprets the signal, and produces structured events. But instead of putting records into a Python queue, it publishes them to the MQTT broker.
The structure can look like this (pseudocode: you need to implement it):
Import paho.mqtt.client
Import your sampler and interpreter from pirlib
Create an MQTT client
Connect to the broker (localhost, port 1883)
Start the MQTT network loop (client.loop_start())
Set up the sampler and interpreter as before
Set run_id, seq counter
While not stopped:
Read a sample from the sampler
Update the interpreter
For each event the interpreter returns:
Increment seq
Build the event record (same fields as before, with JSON-LD context)
Convert the record to a JSON string
Publish the JSON string to your chosen MQTT topic
Print a status line if verbose
Sleep for the sample interval
On shutdown:
Publish a retained status message ("offline") to a status topic
Disconnect from the broker
A few things to think about:
QoS: what level should you publish with? QoS 0 is fastest but messages can be lost. QoS 1 guarantees delivery but can duplicate. For motion events, which trade-off makes sense? You can make this a command-line argument.
Retained messages: consider publishing the sensor status (online/offline) as a retained message on a separate topic like smartbin/bin-01/pir-01/status. That way, any new subscriber immediately knows whether the sensor is alive.
The event payload: this should be the same JSON structure you had before (with the JSON-LD context from Lab 05 if you implemented it). MQTT carries the bytes, you serialize to JSON before publishing and deserialize after receiving.
Write the consumer
Create consumer.py. This script does what the consumer thread used to do: receives events, adds ingest_time and pipeline_latency_ms, and writes them to a JSONL file. But instead of reading from a Python queue, it subscribes to the MQTT broker and receives events through a callback.
The structure looks like this (pseudocode):
Import paho.mqtt.client
Import json, datetime
Define the on_message callback:
Parse the message payload as JSON
Add ingest_time (UTC now)
Calculate pipeline_latency_ms (ingest_time - event_time)
Add both fields to the record
Write the record as a JSON line to the output file
Flush the file
Update and print metrics (total received, average latency, etc.)
Create an MQTT client
Assign the on_message callback to the client
Connect to the broker (localhost, port 1883)
Subscribe to your chosen topic (with your chosen QoS)
Run the MQTT network loop (client.loop_forever())
The key difference from the threaded version is the callback pattern. In paho-mqtt, you do not poll a queue but you define an on_message function and the library calls it every time a message arrives. This is event-driven programming.
client.loop_forever() is a blocking call that handles the network connection, processes incoming messages, and calls your callbacks. It runs until you call client.disconnect() or the script is interrupted.
Handle KeyboardInterrupt (Ctrl-C) to shut down cleanly:
try:
client.loop_forever()
except KeyboardInterrupt:
print("\n[consumer] shutting down...")
client.disconnect()
Command-line arguments
Both scripts should accept command-line arguments. (Beyond what you had before) At minimum:
producer.py:
python producer.py \
--broker localhost \
--port 1883 \
--topic smartbin/bin-01/pir-01/events \
--device-id pir-01 \
--pin 18 \
--sample-interval 0.1 \
--cooldown 5 \
--min-high 0.2 \
--qos 1 \
consumer.py:
python consumer.py \
--broker localhost \
--port 1883 \
--topic "smartbin/bin-01/pir-01/events" \
--qos 1 \
--out motion_events.jsonl \
Run it
- Make sure Mosquitto is running
- Start the consumer first (so it is subscribed before events arrive):
python consumer.py --broker localhost --topic "smartbin/bin-01/pir-01/events" --out output/events.jsonl --verbose - In a second terminal, start the producer:
python producer.py --broker localhost --topic smartbin/bin-01/pir-01/events --pin 18 --verbose - Trigger motion events and watch the consumer receive and log them
You should see the producer publishing messages and the consumer receiving them, adding ingest_time and pipeline_latency_ms, and writing JSON output, the same as before, but now through a broker instead of shared memory.
Verify it really is decoupled
Try these experiments to see that the pub/sub decoupling actually works:
-
Stop the consumer, keep the producer running. The producer keeps publishing since it does not care that nobody is listening. Start the consumer again. Does it pick up new events? (What about the ones published while it was down? Think about QoS and clean sessions.)
-
Run the consumer with a wildcard topic. Use (e.g
smartbin/bin-01/#) and see what it receives. If you published retained status messages from the producer, the consumer should get those too. -
Open a second consumer subscribing to the same topic. Both should receive every message, the broker delivers to all subscribers.
-
Use mosquitto_sub alongside your Python consumer. Both should receive the same messages. This shows that the broker does not care who or what is subscribing and any MQTT client works.
Part 4 — Containerize with Docker Compose (optional but recommended)
Update your Docker Compose setup from the previous labs to include the Mosquitto broker as a service:
Probably the producer and consumer will need to connect to the broker using the service name (e.g broker) as the hostname. Docker Compose networking handles this automatically. You would change --broker localhost to --broker broker in the container commands.
Why this lab is structured this way
In Lab 03, your producer and consumer were threads inside the same process. They communicated through a Python Queue object in shared memory. This is fast and simple, but it locks the two components together. Also in general usually you would only have production on the edge device not consumption as well.
By moving to MQTT, you decouple them completely. The producer and consumer are now separate programs. They can run on different machines. They can start and stop independently. You can have multiple consumers subscribing to the same events. You can add a new consumer that sends data to the cloud without changing the producer at all.
This is the middleware layer from the lecture. The producer does not open a socket to the consumer. It publishes to a topic. Whoever is subscribed receives the data. This pattern scales in ways that direct connections do not.
Understanding topics, wildcards, QoS, and retained messages is essential for designing a system that behaves correctly. When you build the full Smart Wastebin, you will need to decide: what topic structure? What QoS for sensor data vs. commands? Should device status be retained? These are design decisions that affect reliability and operability.
Report questions
Answer the following in your labs/lab06/README.md after the implementation and experiments are complete.
MQTT fundamentals
RQ1: What is the role of the MQTT broker? Why don’t we just let the producer and consumer communicate directly (e.g via sockets)?
RQ2: What topic structure did you choose and why? How does it support future extensibility (more sensors, more bins)?
RQ3: Explain the difference between QoS 0, 1, and 2 in your own words. Which did you use for your motion events and why?
RQ4: What is a retained message? Give one concrete example of when it would be useful for your system.
Terminal exploration
RQ5: When you subscribed to smartbin/+/motion, which messages did you receive and which did you not? Explain why, based on how + works.
RQ6: What happened when you subscribed to #? Why is this useful for debugging but dangerous in production?
RQ7: You published a message while no subscriber was connected (without the retain flag). Then you started a subscriber. Did it receive the message? Why or why not?
Pipeline changes
RQ8: What are the main differences between your run_pipeline.py (threaded queue) and the new producer.py + consumer.py (MQTT)?
RQ9: In the threaded version, what happened when the queue was full? In the MQTT version, what happens if the consumer is slow or offline?
RQ10: How does the callback pattern in paho-mqtt (on_message) differ from the polling pattern you used in the threaded consumer (queue.get(timeout=0.5))?
RQ11: Show one example JSON record from the MQTT-based consumer. Is the structure the same as in previous labs? What about pipeline_latency_ms, is it higher or lower? Why?
Decoupling experiments
RQ12: You stopped the consumer and kept the producer running. What happened to the messages published during that time? Were they delivered when the consumer restarted?
RQ13: You ran two consumers on the same topic. Did both receive every message? Why does this matter for building scalable systems?
RQ14: Could you run the producer on one Raspberry Pi and the consumer on a different machine (e.g., your laptop)? What would you need to change?
Reflection
RQ15: In your own words, what does “decoupling” mean in the context of pub/sub? What are the practical benefits?
RQ16: If the Mosquitto broker itself crashes, what happens to your system? How could you mitigate this?
Project hint: Smart Wastebin
MQTT is going to be the communication backbone of your project. Now that sensor events flow through a broker, you can build other components that subscribe to the same topics independently:
Some potential examples:
- A fill-level monitor that subscribes to
smartbin/bin-01/ultrasonic-01/events - A dashboard that subscribes to
smartbin/#and shows all events across all bins - An alerting service that subscribes to fill-level events and sends a notification when a bin is over 80% full
- A data logger that subscribes to everything and writes a permanent archive
None of these components need to know about each other. They all talk to the broker. This is the power of pub/sub, you can grow the system by adding subscribers without the need to add direct connections etc. .
Start thinking about your topic hierarchy for the full project. A well-designed topic structure is like a well-designed database schema, it is much easier to get it right at the start than to change it later.
What should be finished before you leave the lab
Before the end of the session you should have: installed Mosquitto and the command-line clients, explored pub/sub from the terminal (topics, wildcards, QoS, retained messages), written producer.py that publishes motion events to MQTT, written consumer.py that subscribes and writes JSONL output, run both together and verified end-to-end event delivery, tested decoupling scenarios (consumer offline, multiple consumers, wildcard subscriptions), updated labs/lab06/README.md with code and report answers, and pushed to GitHub.
Final checklist (Lab 06)
- Mosquitto broker installed and running
- Terminal pub/sub experiments completed (topics, wildcards, QoS, retained)
-
producer.pypublishes PIR motion events to MQTT -
consumer.pysubscribes and writes JSONL withingest_timeandpipeline_latency_ms - Producer and consumer run as separate processes
- End-to-end pipeline works through the broker
- Decoupling verified (consumer offline, multiple consumers, wildcard sub)
- Command-line arguments supported (broker, port, topic, QoS, etc.)
-
requirements.txtincludespaho-mqtt -
labs/lab06/README.mdcontains code, run steps, and report answers - Commit and push completed
Deliverables and submission
What must exist in the repository (by end of lab)
/
├── README.md
├── labs/
│ ├── lab01/
│ ├── ...
│ └── lab06/
│ ├── README.md
│ ├── requirements.txt
│ ├── producer.py
│ ├── consumer.py
│ └── pirlib/
│ ├── __init__.py
│ ├── sampler.py
│ └── interpreter.py
Do not include:
venv/__pycache__/*.pycoutput/or*.jsonl- large temporary files unless explicitly requested
What labs/lab06/README.md must contain
Two clearly separated parts:
- Code / runbook — include your topic structure, example commands, and how to run the producer and consumer
- Answers to report questions
Same style as previous labs.
End of lab session — GitHub checkpoint
Before leaving:
- commit your progress
- push to your team GitHub repository
Minimum expectation:
- all deliverables tracked by Git
- latest commit pushed
- commit message is clear
Before next lab — eClass submission
Submit both:
- Code archive (
.zip) - PDF export of
labs/lab06/README.md
Required PDF filename format:
lab06_REPORT_<team>.pdf
What follows is a greek version of the same lab
Προθεσμίες:
- Τέλος εργαστηριακής συνεδρίας (GitHub checkpoint): κάντε commit & push την πρόοδό σας στο repository της ομάδας σας.
- Πριν το επόμενο εργαστήριο (υποβολή στο eClass): ανεβάστε (1) ένα
.zipμε τον κώδικά σας και (2) ένα PDF export τουlabs/lab06/README.md.
Περιεχόμενο υποβολής:
- (1) ένα
.zipμε τον κώδικά σας, και - (2) ένα PDF export του
labs/lab06/README.md.
Εισαγωγή στο τι πρέπει να κάνετε
Μέχρι τώρα ο producer και ο consumer σας ζούσαν μέσα στο ίδιο Python script, επικοινωνώντας μέσω ενός κοινόχρηστου in-memory queue. Αυτό λειτουργεί σε ένα μόνο μηχάνημα, αλλά σημαίνει ότι ο producer και ο consumer είναι συνδεδεμένοι μεταξύ τους. Ξεκινούν και σταματούν μαζί, μοιράζονται την ίδια διεργασία, και δεν μπορούν να τρέξουν σε διαφορετικές συσκευές. Φυσικά αυτό δεν είναι ρεαλιστικό ή καλή στρατηγική γενικά για edge devices, αλλά ήταν πολύ απλούστερο να υλοποιηθεί και να «παιχτεί» μαζί τους.
Σε αυτό το εργαστήριο θα τους διαχωρίσετε. Ο producer γίνεται ένα αυτόνομο script που διαβάζει τον PIR sensor και publishes motion events σε έναν MQTT broker. Ο consumer γίνεται ένα ξεχωριστό script που κάνει subscribe στον broker και λαμβάνει αυτά τα events. Ο broker βρίσκεται ανάμεσά τους, αποσυνδέοντάς τους πλήρως. Ο producer δεν ξέρει ποιος ακούει, ο consumer δεν ξέρει ποιος παράγει, και δεν χρειάζεται καν να τρέχουν ταυτόχρονα.
Αυτό είναι το pub/sub pattern από τη διάλεξη. Αντί για threads που μοιράζονται ένα queue, έχετε ανεξάρτητες διεργασίες που μοιράζονται έναν message broker.
Θα δουλέψετε σε τρία κύρια στάδια:
- Εγκατάσταση του Mosquitto — εγκαταστήστε τον broker και τους command-line clients, και εξοικειωθείτε με το publishing και subscribing από το terminal
- Εξερεύνηση του MQTT από το terminal — πειραματιστείτε με topics, wildcards, QoS levels, και retained messages για να καταλάβετε πώς λειτουργεί ο broker πριν γράψετε κώδικα
- Διαχωρισμός του pipeline — ξαναγράψτε τον producer και τον consumer σας ως δύο ξεχωριστά Python scripts που επικοινωνούν μέσω MQTT αντί για κοινόχρηστο queue
Στο τέλος, το σύστημά σας μοιάζει ως εξής:
┌──────────────┐ MQTT ┌──────────────┐
│ producer │ ──── publish ────▶ │ Mosquitto │
│ │ │ broker │
│ PIR sensor │ │ │
│ sampler │ │ │
│ interpreter │ └──────┬───────┘
└──────────────┘ │
subscribe
│
┌──────▼───────┐
│ consumer │
│ │
│ receives │
│ enriches │
│ writes JSONL│
└──────────────┘
Δημιουργήστε την παρακάτω δομή:
/
├── README.md
├── labs/
│ ├── lab01/
│ ├── ...
│ └── lab06/
│ ├── README.md
│ ├── requirements.txt
│ ├── producer.py
│ ├── consumer.py
│ └── pirlib/
│ ├── __init__.py
│ ├── sampler.py
│ └── interpreter.py
Αντιγράψτε το pirlib/ από το προηγούμενο εργαστήριο. Το αρχείο run_pipeline.py δεν θα υπάρχει σε αυτό το εργαστήριο καθώς αντικαθίσταται από τα producer.py και consumer.py.
Μέρος 1 — Εγκατάσταση και εκκίνηση του Mosquitto
Εγκαταστήστε τον Mosquitto broker και τους command-line clients στο Raspberry Pi σας:
sudo apt-get update
sudo apt-get install -y mosquitto mosquitto-clients
Ελέγξτε ότι ο broker τρέχει:
systemctl status mosquitto
Θα πρέπει να δείτε κάτι σαν Active: active (running). Ο broker ακούει τώρα στη port 1883 (προεπιλεγμένο MQTT port) και είναι έτοιμος να δεχτεί συνδέσεις.
Αν δεν τρέχει, ξεκινήστε τον:
sudo systemctl start mosquitto
sudo systemctl enable mosquitto
Η εντολή enable τον κάνει να ξεκινά αυτόματα κατά το boot, κάτι που γενικά θέλετε σε ένα edge device.
Μέρος 2 — Εξερεύνηση του MQTT από το terminal
Πριν γράψετε Python, αφιερώστε χρόνο με τα command-line εργαλεία. Αυτό θα χτίσει τη διαίσθησή σας για το πώς λειτουργούν τα topics, τα subscriptions, και η παράδοση μηνυμάτων.
Βασικό publish και subscribe
Ανοίξτε δύο terminals. Στο πρώτο, ξεκινήστε έναν subscriber:
mosquitto_sub -h localhost -t "test/hello"
Αυτό συνδέεται στον broker στο localhost και κάνει subscribe στο topic test/hello. Θα περιμένει εκεί για μηνύματα.
Στο δεύτερο terminal, κάντε publish ένα μήνυμα:
mosquitto_pub -h localhost -t "test/hello" -m "world"
Θα πρέπει να δείτε το world να εμφανίζεται στο terminal του subscriber. Δοκιμάστε να κάνετε publish μερικά ακόμα μηνύματα και παρακολουθήστε τα να φτάνουν.
Topic hierarchy
Τα MQTT topics είναι ιεραρχικά, διαχωρισμένα με /. Δοκιμάστε αυτό:
# Terminal 1: subscribe σε συγκεκριμένο topic
mosquitto_sub -h localhost -t "smartbin/pir-01/motion"
# Terminal 2: publish σε αυτό το topic
mosquitto_pub -h localhost -t "smartbin/pir-01/motion" -m '{"state": "detected"}'
Παρατηρήστε ότι μπορείτε να κάνετε publish JSON ως message payload — το MQTT δεν ενδιαφέρεται για τη μορφή, απλώς παραδίδει bytes.
Wildcards
Τώρα δοκιμάστε wildcard subscriptions. Με τον subscriber ακόμα να τρέχει, δοκιμάστε αυτά σε ξεχωριστά terminals:
# Subscribe σε ΟΛΑ τα topics κάτω από το smartbin/pir-01/
mosquitto_sub -h localhost -t "smartbin/pir-01/#"
# Subscribe σε motion events από ΟΠΟΙΑΔΗΠΟΤΕ συσκευή
mosquitto_sub -h localhost -t "smartbin/+/motion"
Σε άλλο terminal, κάντε publish σε διαφορετικά topics και δείτε ποιοι subscribers λαμβάνουν τι:
mosquitto_pub -h localhost -t "smartbin/pir-01/motion" -m "detected"
mosquitto_pub -h localhost -t "smartbin/pir-01/status" -m "online"
mosquitto_pub -h localhost -t "smartbin/pir-02/motion" -m "detected"
mosquitto_pub -h localhost -t "smartbin/ultrasonic-01/fill" -m "72"
Παρατηρήστε: ποια μηνύματα έλαβε το smartbin/pir-01/#; Ποια έλαβε το smartbin/+/motion; Αυτό είναι το wildcard filtering. Το # ταιριάζει με οποιοδήποτε αριθμό επιπέδων, το + ταιριάζει ακριβώς ένα επίπεδο.
QoS levels
Δοκιμάστε publishing με διαφορετικά Quality of Service levels:
# QoS 0: at most once (fire and forget)
mosquitto_pub -h localhost -t "test/qos" -m "qos0 message" -q 0
# QoS 1: at least once (acknowledged)
mosquitto_pub -h localhost -t "test/qos" -m "qos1 message" -q 1
# QoS 2: exactly once (four-step handshake)
mosquitto_pub -h localhost -t "test/qos" -m "qos2 message" -q 2
Με έναν subscriber που ακούει στο test/qos, και οι τρεις θα πρέπει να φτάσουν. Η διαφορά έγκειται στο τι συμβαίνει όταν το δίκτυο είναι αναξιόπιστο — προς το παρόν, στο localhost, όλα (θα πρέπει να) συμπεριφέρονται το ίδιο. Σκεφτείτε πότε θα επιλέγατε κάθε επίπεδο.
Retained messages
Κανονικά, αν κάνετε publish ένα μήνυμα και κανείς δεν είναι subscribed, το μήνυμα χάνεται. Τα retained messages αλλάζουν αυτό:
# Publish ένα retained message
mosquitto_pub -h localhost -t "smartbin/pir-01/status" -m "online" -r
Τώρα ξεκινήστε έναν νέο subscriber μετά το publish:
mosquitto_sub -h localhost -t "smartbin/pir-01/status"
Ο subscriber λαμβάνει αμέσως το online παρόλο που το μήνυμα δημοσιεύτηκε πριν συνδεθεί. Ο broker αποθήκευσε το τελευταίο retained message για εκείνο το topic. Αυτό είναι χρήσιμο για πληροφορίες κατάστασης — ένας νέος subscriber μπορεί αμέσως να μάθει την τρέχουσα κατάσταση χωρίς να περιμένει την επόμενη ενημέρωση.
Για να διαγράψετε ένα retained message, κάντε publish ένα κενό μήνυμα με το retain flag:
mosquitto_pub -h localhost -t "smartbin/pir-01/status" -r -n
Πειραματιστείτε μόνοι σας
Αφιερώστε μερικά λεπτά για να δοκιμάσετε πράγματα. Μερικές ιδέες:
- Κάντε subscribe στο
#(όλα τα topics) και publish σε διάφορα topics — δείτε όλα όσα ρέουν μέσα από τον broker - Ανοίξτε τρεις ή τέσσερις subscribers σε διαφορετικά topics και δείτε πώς ένα publish φτάνει σε μερικούς αλλά όχι σε άλλους
- Δοκιμάστε publishing ενός multi-line JSON payload
- Δοκιμάστε να κάνετε subscribe σε ένα topic, να σκοτώσετε τον subscriber, να κάνετε publish ένα μήνυμα, και να επανεκκινήσετε τον subscriber — φτάνει το μήνυμα; (Υπόδειξη: τι QoS χρησιμοποιείτε; Είναι το session clean;)
Όσο περισσότερο εξερευνάτε τώρα, τόσο λιγότερες εκπλήξεις θα έχετε όταν γράφετε τον Python κώδικα.
Μέρος 3 — Διαχωρισμός του pipeline σε publisher και subscriber
Τώρα θα πάρετε το Lab pipeline σας και θα το χωρίσετε σε δύο ανεξάρτητα προγράμματα που επικοινωνούν μέσω MQTT αντί για κοινόχρηστο queue.
Εγκατάσταση της Python MQTT βιβλιοθήκης
Προσθέστε το paho-mqtt στο requirements.txt σας:
paho-mqtt
Εγκαταστήστε το:
pip install paho-mqtt
Σχεδιασμός της topic structure σας
Πριν γράψετε κώδικα, ορίστε μια topic structure για το σύστημά σας.
Σκεφτείτε πώς πρέπει να οργανωθούν τα topics σας για να υποστηρίξουν διαφορετικά subscription patterns. Για παράδειγμα, σκεφτείτε πώς θα μπορούσατε να:
- Αποκτήσετε πρόσβαση σε events από έναν συγκεκριμένο sensor
- Αποκτήσετε πρόσβαση σε όλα τα events από μια συγκεκριμένη συσκευή
- Αποκτήσετε πρόσβαση στον ίδιο τύπο event σε πολλαπλές συσκευές
Σκεφτείτε πώς ο ιεραρχικός σχεδιασμός topic και η χρήση wildcards (+, #) μπορούν να υποστηρίξουν αυτές τις περιπτώσεις χρήσης.
Τεκμηριώστε την προτεινόμενη topic structure σας στην αναφορά σας και δικαιολογήστε εν συντομία τις επιλογές σχεδιασμού σας.
Γράψτε τον producer
Δημιουργήστε το producer.py. Αυτό το script κάνει ό,τι έκανε το producer thread παλιά: διαβάζει τον PIR sensor, ερμηνεύει το σήμα, και παράγει structured events. Αλλά αντί να βάζει records σε ένα Python queue, τα publishes στον MQTT broker.
Η δομή μπορεί να μοιάζει ως εξής (pseudocode: πρέπει να το υλοποιήσετε εσείς):
Import paho.mqtt.client
Import τον sampler και interpreter σας από pirlib
Δημιουργήστε έναν MQTT client
Συνδεθείτε στον broker (localhost, port 1883)
Ξεκινήστε το MQTT network loop (client.loop_start())
Ρυθμίστε τον sampler και interpreter όπως πριν
Ορίστε run_id, μετρητή seq
While not stopped:
Διαβάστε ένα sample από τον sampler
Ενημερώστε τον interpreter
Για κάθε event που επιστρέφει ο interpreter:
Αυξήστε τον seq
Δημιουργήστε το event record (ίδια fields με πριν, με JSON-LD context)
Μετατρέψτε το record σε JSON string
Κάντε publish το JSON string στο MQTT topic που επιλέξατε
Εκτυπώστε γραμμή κατάστασης αν verbose
Sleep για το sample interval
Κατά τον τερματισμό:
Κάντε publish ένα retained status message ("offline") σε ένα status topic
Αποσυνδεθείτε από τον broker
Μερικά πράγματα για να σκεφτείτε:
QoS: με ποιο επίπεδο θα κάνετε publish; Το QoS 0 είναι το πιο γρήγορο αλλά μηνύματα μπορεί να χαθούν. Το QoS 1 εγγυάται παράδοση αλλά μπορεί να προκαλέσει διπλότυπα. Για motion events, ποιος συμβιβασμός έχει νόημα; Μπορείτε να το κάνετε command-line argument.
Retained messages: σκεφτείτε το publishing της κατάστασης του sensor (online/offline) ως retained message σε ένα ξεχωριστό topic όπως smartbin/bin-01/pir-01/status. Έτσι, οποιοσδήποτε νέος subscriber θα ξέρει αμέσως αν ο sensor είναι ζωντανός.
Το event payload: αυτό πρέπει να είναι η ίδια JSON δομή που είχατε πριν (με το JSON-LD context από το Lab 05 αν το υλοποιήσατε). Το MQTT μεταφέρει τα bytes — σειριοποιείτε σε JSON πριν το publishing και αποσειριοποιείτε μετά τη λήψη.
Γράψτε τον consumer
Δημιουργήστε το consumer.py. Αυτό το script κάνει ό,τι έκανε το consumer thread παλιά: λαμβάνει events, προσθέτει ingest_time και pipeline_latency_ms, και τα γράφει σε ένα JSONL αρχείο. Αλλά αντί να διαβάζει από ένα Python queue, κάνει subscribe στον MQTT broker και λαμβάνει events μέσω callback.
Η δομή μοιάζει ως εξής (pseudocode):
Import paho.mqtt.client
Import json, datetime
Ορίστε το on_message callback:
Κάντε parse το message payload ως JSON
Προσθέστε ingest_time (UTC now)
Υπολογίστε pipeline_latency_ms (ingest_time - event_time)
Προσθέστε και τα δύο fields στο record
Γράψτε το record ως JSON line στο output file
Κάντε flush το αρχείο
Ενημερώστε και εκτυπώστε metrics (σύνολο ληφθέντων, μέση latency, κ.λπ.)
Δημιουργήστε έναν MQTT client
Αναθέστε το on_message callback στον client
Συνδεθείτε στον broker (localhost, port 1883)
Κάντε subscribe στο topic που επιλέξατε (με το QoS που επιλέξατε)
Τρέξτε το MQTT network loop (client.loop_forever())
Η βασική διαφορά από την threaded έκδοση είναι το callback pattern. Στο paho-mqtt, δεν κάνετε poll ένα queue — ορίζετε μια συνάρτηση on_message και η βιβλιοθήκη την καλεί κάθε φορά που φτάνει ένα μήνυμα. Αυτός είναι event-driven προγραμματισμός.
Το client.loop_forever() είναι blocking call που χειρίζεται τη σύνδεση δικτύου, επεξεργάζεται εισερχόμενα μηνύματα, και καλεί τα callbacks σας. Τρέχει μέχρι να καλέσετε client.disconnect() ή να διακοπεί το script.
Χειριστείτε το KeyboardInterrupt (Ctrl-C) για να τερματίσετε καθαρά:
try:
client.loop_forever()
except KeyboardInterrupt:
print("\n[consumer] shutting down...")
client.disconnect()
Command-line arguments
Και τα δύο scripts πρέπει να δέχονται command-line arguments. Κατ’ ελάχιστον:
producer.py:
python producer.py \
--broker localhost \
--port 1883 \
--topic smartbin/bin-01/pir-01/events \
--device-id pir-01 \
--pin 18 \
--sample-interval 0.1 \
--cooldown 5 \
--min-high 0.2 \
--qos 1 \
consumer.py:
python consumer.py \
--broker localhost \
--port 1883 \
--topic "smartbin/bin-01/pir-01/events" \
--qos 1 \
--out motion_events.jsonl \
Εκτέλεση
- Βεβαιωθείτε ότι το Mosquitto τρέχει
- Ξεκινήστε πρώτα τον consumer (ώστε να είναι subscribed πριν φτάσουν τα events):
python consumer.py --broker localhost --topic "smartbin/bin-01/pir-01/events" --out output/events.jsonl --verbose - Σε ένα δεύτερο terminal, ξεκινήστε τον producer:
python producer.py --broker localhost --topic smartbin/bin-01/pir-01/events --pin 18 --verbose - Προκαλέστε motion events και παρακολουθήστε τον consumer να τα λαμβάνει και να τα καταγράφει
Θα πρέπει να δείτε τον producer να κάνει publish μηνύματα και τον consumer να τα λαμβάνει, προσθέτοντας ingest_time και pipeline_latency_ms, και γράφοντας JSON output — το ίδιο με πριν, αλλά τώρα μέσω broker αντί για shared memory.
Επαληθεύστε ότι είναι πραγματικά αποσυνδεδεμένα
Δοκιμάστε αυτά τα πειράματα για να δείτε ότι η pub/sub αποσύζευξη λειτουργεί πραγματικά:
-
Σταματήστε τον consumer, κρατήστε τον producer να τρέχει. Ο producer συνεχίζει να κάνει publish αφού δεν ενδιαφέρεται αν κάποιος ακούει. Ξεκινήστε ξανά τον consumer. Παραλαμβάνει νέα events; (Τι γίνεται με αυτά που δημοσιεύτηκαν ενόσω ήταν εκτός; Σκεφτείτε το QoS και τα clean sessions.)
-
Τρέξτε τον consumer με wildcard topic. Χρησιμοποιήστε (π.χ.
smartbin/bin-01/#) και δείτε τι λαμβάνει. Αν δημοσιεύσατε retained status messages από τον producer, ο consumer θα πρέπει να τα λάβει και αυτά. -
Ανοίξτε έναν δεύτερο consumer subscribed στο ίδιο topic. Και οι δύο θα πρέπει να λαμβάνουν κάθε μήνυμα — ο broker παραδίδει σε όλους τους subscribers.
-
Χρησιμοποιήστε mosquitto_sub παράλληλα με τον Python consumer σας. Και οι δύο θα πρέπει να λαμβάνουν τα ίδια μηνύματα. Αυτό δείχνει ότι ο broker δεν ενδιαφέρεται ποιος ή τι κάνει subscribe — οποιοσδήποτε MQTT client λειτουργεί.
Μέρος 4 — Containerization με Docker Compose (προαιρετικό αλλά συνιστάται)
Ενημερώστε το Docker Compose setup σας από τα προηγούμενα εργαστήρια ώστε να συμπεριλάβει τον Mosquitto broker ως service:
Πιθανότατα ο producer και ο consumer θα χρειαστεί να συνδεθούν στον broker χρησιμοποιώντας το service name (π.χ. broker) ως hostname. Το Docker Compose networking το χειρίζεται αυτόματα. Θα αλλάξετε --broker localhost σε --broker broker στις εντολές container.
Γιατί αυτό το εργαστήριο είναι δομημένο έτσι
Στο Lab 03, ο producer και ο consumer σας ήταν threads μέσα στην ίδια διεργασία. Επικοινωνούσαν μέσω ενός Python Queue object στη shared memory. Αυτό είναι γρήγορο και απλό, αλλά δεσμεύει τα δύο components μαζί. Επίσης γενικά συνήθως θα είχατε μόνο production στο edge device, όχι και consumption.
Μεταβαίνοντας στο MQTT, τα αποσυνδέετε πλήρως. Ο producer και ο consumer είναι τώρα ξεχωριστά προγράμματα. Μπορούν να τρέξουν σε διαφορετικά μηχανήματα. Μπορούν να ξεκινούν και να σταματούν ανεξάρτητα. Μπορείτε να έχετε πολλαπλούς consumers που κάνουν subscribe στα ίδια events. Μπορείτε να προσθέσετε έναν νέο consumer που στέλνει δεδομένα στο cloud χωρίς να αλλάξετε καθόλου τον producer.
Αυτό είναι το middleware layer από τη διάλεξη. Ο producer δεν ανοίγει socket στον consumer. Κάνει publish σε ένα topic. Όποιος είναι subscribed λαμβάνει τα δεδομένα. Αυτό το pattern κλιμακώνεται με τρόπους που οι άμεσες συνδέσεις δεν μπορούν.
Η κατανόηση topics, wildcards, QoS, και retained messages είναι απαραίτητη για το σχεδιασμό ενός συστήματος που συμπεριφέρεται σωστά. Όταν φτιάχνετε το πλήρες Smart Wastebin, θα χρειαστεί να αποφασίσετε: ποια topic structure; Τι QoS για sensor data έναντι commands; Πρέπει η κατάσταση συσκευής να είναι retained; Αυτές είναι σχεδιαστικές αποφάσεις που επηρεάζουν την αξιοπιστία και τη λειτουργικότητα.
Ερωτήσεις αναφοράς
Απαντήστε τα παρακάτω στο labs/lab06/README.md σας αφού ολοκληρωθεί η υλοποίηση και τα πειράματα.
Βασικές έννοιες MQTT
RQ1: Ποιος είναι ο ρόλος του MQTT broker; Γιατί δεν αφήνουμε τον producer και τον consumer να επικοινωνούν απευθείας (π.χ. μέσω sockets);
RQ2: Ποια topic structure επιλέξατε και γιατί; Πώς υποστηρίζει τη μελλοντική επεκτασιμότητα (περισσότεροι sensors, περισσότερα bins);
RQ3: Εξηγήστε τη διαφορά μεταξύ QoS 0, 1, και 2 με δικά σας λόγια. Ποιο χρησιμοποιήσατε για τα motion events σας και γιατί;
RQ4: Τι είναι ένα retained message; Δώστε ένα συγκεκριμένο παράδειγμα πότε θα ήταν χρήσιμο για το σύστημά σας.
Εξερεύνηση από το terminal
RQ5: Όταν κάνατε subscribe στο smartbin/+/motion, ποια μηνύματα λάβατε και ποια όχι; Εξηγήστε γιατί, βάσει του πώς λειτουργεί το +.
RQ6: Τι συνέβη όταν κάνατε subscribe στο #; Γιατί αυτό είναι χρήσιμο για debugging αλλά επικίνδυνο στο production;
RQ7: Κάνατε publish ένα μήνυμα ενώ δεν ήταν συνδεδεμένος κανένας subscriber (χωρίς το retain flag). Μετά ξεκινήσατε έναν subscriber. Έλαβε το μήνυμα; Γιατί ή γιατί όχι;
Αλλαγές στο pipeline
RQ8: Ποιες είναι οι κύριες διαφορές μεταξύ του run_pipeline.py σας (threaded queue) και του νέου producer.py + consumer.py (MQTT);
RQ9: Στην threaded έκδοση, τι συνέβαινε όταν το queue ήταν γεμάτο; Στην MQTT έκδοση, τι συμβαίνει αν ο consumer είναι αργός ή offline;
RQ10: Πώς διαφέρει το callback pattern στο paho-mqtt (on_message) από το polling pattern που χρησιμοποιούσατε στον threaded consumer (queue.get(timeout=0.5));
RQ11: Δείξτε ένα παράδειγμα JSON record από τον MQTT-based consumer. Είναι η δομή ίδια με τα προηγούμενα εργαστήρια; Τι γίνεται με το pipeline_latency_ms — είναι υψηλότερο ή χαμηλότερο; Γιατί;
Πειράματα αποσύζευξης
RQ12: Σταματήσατε τον consumer και κρατήσατε τον producer να τρέχει. Τι συνέβη στα μηνύματα που δημοσιεύτηκαν εκείνη την ώρα; Παραδόθηκαν όταν επανεκκίνησε ο consumer;
RQ13: Τρέξατε δύο consumers στο ίδιο topic. Έλαβαν και οι δύο κάθε μήνυμα; Γιατί αυτό έχει σημασία για τη δημιουργία κλιμακούμενων συστημάτων;
RQ14: Θα μπορούσατε να τρέξετε τον producer σε ένα Raspberry Pi και τον consumer σε διαφορετικό μηχάνημα (π.χ. τον laptop σας); Τι θα χρειαζόταν να αλλάξετε;
Αναστοχασμός
RQ15: Με δικά σας λόγια, τι σημαίνει «αποσύζευξη» στο context του pub/sub; Ποια είναι τα πρακτικά οφέλη;
RQ16: Αν ο ίδιος ο Mosquitto broker πέσει, τι συμβαίνει στο σύστημά σας; Πώς θα μπορούσατε να το μετριάσετε αυτό;
Υπόδειξη project: Smart Wastebin
Το MQTT θα αποτελέσει τη βασική επικοινωνιακή υποδομή του project σας. Τώρα που τα sensor events ρέουν μέσω broker, μπορείτε να φτιάξετε άλλα components που κάνουν subscribe στα ίδια topics ανεξάρτητα:
Μερικά πιθανά παραδείγματα:
- Ένα fill-level monitor που κάνει subscribe στο
smartbin/bin-01/ultrasonic-01/events - Ένα dashboard που κάνει subscribe στο
smartbin/#και εμφανίζει όλα τα events σε όλα τα bins - Ένα alerting service που κάνει subscribe σε fill-level events και στέλνει ειδοποίηση όταν ένα bin είναι πάνω από 80% γεμάτο
- Ένα data logger που κάνει subscribe σε τα πάντα και γράφει μόνιμο αρχείο
Κανένα από αυτά τα components δεν χρειάζεται να ξέρει για τα άλλα. Όλα επικοινωνούν με τον broker. Αυτή είναι η δύναμη του pub/sub — μπορείτε να αναπτύξετε το σύστημα προσθέτοντας subscribers χωρίς να χρειάζεστε άμεσες συνδέσεις κ.λπ.
Αρχίστε να σκέφτεστε την topic hierarchy για το πλήρες project. Μια καλά σχεδιασμένη topic structure είναι σαν ένα καλά σχεδιασμένο database schema — είναι πολύ πιο εύκολο να το κάνετε σωστά από την αρχή παρά να το αλλάξετε αργότερα.
Τι πρέπει να έχει ολοκληρωθεί πριν φύγετε από το εργαστήριο
Πριν το τέλος της συνεδρίας θα πρέπει να έχετε: εγκαταστήσει το Mosquitto και τους command-line clients, εξερευνήσει το pub/sub από το terminal (topics, wildcards, QoS, retained messages), γράψει producer.py που κάνει publish motion events στο MQTT, γράψει consumer.py που κάνει subscribe και γράφει JSONL output, τρέξει και τα δύο μαζί και επαληθεύσει την end-to-end παράδοση events, δοκιμάσει σενάρια αποσύζευξης (consumer offline, πολλαπλοί consumers, wildcard subscriptions), ενημερώσει το labs/lab06/README.md με κώδικα και απαντήσεις αναφοράς, και κάνει push στο GitHub.
Τελικό checklist (Lab 06)
- Mosquitto broker εγκατεστημένο και σε λειτουργία
- Terminal pub/sub πειράματα ολοκληρωμένα (topics, wildcards, QoS, retained)
-
producer.pyκάνει publish PIR motion events στο MQTT -
consumer.pyκάνει subscribe και γράφει JSONL μεingest_timeκαιpipeline_latency_ms - Producer και consumer τρέχουν ως ξεχωριστές διεργασίες
- End-to-end pipeline λειτουργεί μέσω broker
- Αποσύζευξη επαληθευμένη (consumer offline, πολλαπλοί consumers, wildcard sub)
- Command-line arguments υποστηρίζονται (broker, port, topic, QoS, κ.λπ.)
-
requirements.txtπεριλαμβάνειpaho-mqtt -
labs/lab06/README.mdπεριέχει κώδικα, βήματα εκτέλεσης, και απαντήσεις αναφοράς - Commit και push ολοκληρωμένα
Παραδοτέα και υποβολή
Τι πρέπει να υπάρχει στο repository (έως το τέλος του εργαστηρίου)
/
├── README.md
├── labs/
│ ├── lab01/
│ ├── ...
│ └── lab06/
│ ├── README.md
│ ├── requirements.txt
│ ├── producer.py
│ ├── consumer.py
│ └── pirlib/
│ ├── __init__.py
│ ├── sampler.py
│ └── interpreter.py
Μην συμπεριλάβετε:
venv/__pycache__/*.pycoutput/ή*.jsonl- μεγάλα προσωρινά αρχεία εκτός αν ζητηθεί ρητά
Τι πρέπει να περιέχει το labs/lab06/README.md
Δύο σαφώς διαχωρισμένα μέρη:
- Κώδικας / runbook — συμπεριλάβετε την topic structure σας, παραδείγματα εντολών, και πώς να τρέξετε τον producer και τον consumer
- Απαντήσεις στις ερωτήσεις αναφοράς
Ίδιο στυλ με τα προηγούμενα εργαστήρια.
Τέλος εργαστηριακής συνεδρίας — GitHub checkpoint
Πριν φύγετε:
- κάντε commit την πρόοδό σας
- κάντε push στο GitHub repository της ομάδας σας
Ελάχιστη προσδοκία:
- όλα τα παραδοτέα παρακολουθούνται από το Git
- το τελευταίο commit έχει γίνει push
- το commit message είναι σαφές
Πριν το επόμενο εργαστήριο — υποβολή στο eClass
Υποβάλετε και τα δύο:
- Αρχείο κώδικα (
.zip) - PDF export του
labs/lab06/README.md
Απαιτούμενη μορφή ονόματος PDF αρχείου:
lab06_REPORT_<team>.pdf