Lab 08 — Building a REST API for the Smart Wastebin
Deadlines:
- End of lab session (GitHub checkpoint): commit & push your progress to your team repository.
- Before next lab (eClass submission): upload (1) a
.zipwith your code and (2) a PDF export oflabs/lab08/README.md.
Submission contents:
- (1) a
.zipwith your code, and - (2) a PDF export of
labs/lab08/README.md.
Intro to what you need to do
In this lab you will build a REST API that sits alongside your existing pipeline and serves data from it. You will also build MQTT endpoints into the API, so that a client can publish messages to MQTT topics through a simple HTTP request. Finally, you will document your MQTT interface with AsyncAPI, the event-driven equivalent of OpenAPI. Your REST endpoints get auto-documented by Flask-RESTx. Your MQTT topics get documented by an AsyncAPI spec. Together, they describe your entire system.
You will use Flask as the web framework and Flask-RESTx to define your endpoints and auto-generate Swagger UI documentation.
The system now has three communication layers:
┌──────────────────────────────────────────────────────────────┐
│ Smart Wastebin System │
│ │
│ PIR sensor ──▶ producer ──▶ MQTT broker ──▶ consumer │
│ ▲ │ │ │
│ │ ▼ ▼ │
│ REST API HA JSONL file │
│ (publish (live │ │
│ + query) dashboard) │ │
│ ▲ │ │
│ │ ▼ │
│ Swagger UI ◀────── read events │
│ (docs + test) │
│ │
│ AsyncAPI spec ──▶ describes the MQTT interface │
│ OpenAPI spec ──▶ describes the REST interface (auto) │
└──────────────────────────────────────────────────────────────┘
Create the following structure:
/
├── README.md
├── labs/
│ ├── lab01/
│ ├── ...
│ └── lab08/
│ ├── README.md
│ ├── requirements.txt
│ ├── api.py
│ ├── asyncapi.yaml
│ ├── producer.py
│ ├── consumer.py
│ └── pirlib/
│ ├── __init__.py
│ ├── sampler.py
│ └── interpreter.py
Copy your producer.py, consumer.py, and pirlib/ from Lab 06/07. The new files are api.py (the REST API server) and asyncapi.yaml (the MQTT interface documentation).
Part 1 — Set up Flask and Flask-RESTx
Install the required packages. Add them to your requirements.txt:
flask
flask-restx
...
Install:
pip install flask flask-restx
Create api.py with a minimal Flask-RESTx application to make sure everything works:
from flask import Flask
from flask_restx import Api, Resource
app = Flask(__name__)
api = Api(
app,
version="1.0",
title="Smart Wastebin API",
description="REST API for querying Smart Wastebin sensor data and bin status",
)
ns = api.namespace("bins", description="Wastebin operations")
@ns.route("/")
class BinList(Resource):
def get(self):
"""List all registered bins."""
return {"bins": []}, 200
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=5000)
Run it:
python api.py
Open your browser and go to http://<your-pi-ip>:5000. You should see the Swagger UI, an interactive documentation page showing your API with one endpoint. Click on it, click “Try it out”, and click “Execute.” You should get back {"bins": []} with a 200 status code.
This is your starting point. Every endpoint you add will automatically appear in the Swagger UI, documented and testable. You never need to write documentation separately (for the rest interface at least) it comes from the code.
Part 2 — Design your API
Before writing more code, think about what your API should expose. The lecture emphasized designing the API first i.e deciding on the resources, URIs, and verbs before implementing anything.
Your resources are:
- Bins — the wastebins themselves (status, location, metadata)
- Sensors — the sensors mounted on the bins
- Events — the motion events your pipeline produces (stored in the JSONL file)
- MQTT — an interface for publishing to and reading from MQTT topics through HTTP
Design your endpoints following REST conventions. Here is a starting point — you should expand this:
| Method | URI | Description |
|---|---|---|
| GET | /bins |
List all bins |
| GET | /bins/<bin_id> |
Get details for a specific bin |
| GET | /bins/<bin_id>/sensors |
List sensors on a specific bin |
| GET | /bins/<bin_id>/events |
Get motion events for a specific bin |
| GET | /sensors |
List all sensors |
| GET | /sensors/<sensor_id> |
Get details for a specific sensor |
| POST | /bins/<bin_id>/emptied |
Record that a bin was emptied |
| POST | /mqtt/publish |
Publish a message to an MQTT topic |
| GET | /mqtt/topics |
List known MQTT topics and their last retained value |
Think about:
- What parameters should the events endpoint accept? At minimum, a
limitto control how many records come back. Maybe a date range (start,end)? - What should the response look like? What fields does each resource need?
- What status codes should each endpoint return? 200 for success, 404 if a bin does not exist, 400 if the query parameters are invalid?
- For the MQTT publish endpoint, what does the client need to send (topic, payload, QoS, retain)?
Write down your API design in your report before implementing it.
Part 3 — Implement the data endpoints
Now build the data-serving side of the API. You will read bin and sensor information from your JSON-LD models (from Lab 05), and event data from your JSONL output files.
Load your data
Your API needs to read from two sources:
- The JSON-LD model files (
models/sensor.jsonld,models/wastebin.jsonld,models/environment.jsonld) for bin and sensor metadata - The JSONL event log written by your consumer for motion events
For the model files, load them at startup:
IMPORT json module
IMPORT os module
SET DATA_DIR TO the path of the current file's directory joined with "data"
SET EVENTS_FILE TO the path of DATA_DIR joined with "motion_events.jsonl"
FUNCTION load_json(filepath):
OPEN the file at filepath in read mode
READ and parse the file contents as JSON
RETURN the parsed JSON data
FUNCTION load_events(filepath, limit = none, sensor_id = none):
CREATE an empty list called events
IF the file at filepath does not exist:
RETURN events
OPEN the file at filepath in read mode
FOR each line in the file:
REMOVE leading and trailing whitespace from the line
IF the line is empty:
SKIP to the next line
TRY:
PARSE the line as JSON and store it as record
IF sensor_id was provided AND record's "madeBySensor" value is not equal to sensor_id:
SKIP to the next line
ADD record to events
IF the line cannot be parsed as JSON:
SKIP to the next line
REVERSE events so the most recent records come first
IF limit was provided:
KEEP only the first limit records in events
RETURN events
Define your models for Swagger
Flask-RESTx lets you define response models so Swagger UI shows the expected response structure:
bin_model = api.model("Bin", {
"id": fields.String(required=True, description="Bin unique identifier"),
"name": fields.String(description="Human-readable name"),
"location": fields.String(description="Deployment location"),
"status": fields.String(description="Current status"),
})
event_model = api.model("Event", {
"resultTime": fields.String(description="ISO timestamp of the event"),
"madeBySensor": fields.String(description="Sensor ID that produced this event"),
"hasSimpleResult": fields.String(description="Motion state (detected/clear)"),
"pipeline_latency_ms": fields.Float(description="Pipeline latency in ms"),
})
These models do not enforce anything at runtime as they are documentation. They tell anyone reading the Swagger UI exactly what the response looks like.
Implement the endpoints
Here is an example of a more complete bins endpoint. You need to expand and adapt this to your data:
from flask_restx import reqparse
# Parser for query parameters
events_parser = reqparse.RequestParser()
events_parser.add_argument("limit", type=int, default=50, help="Max events to return")
events_parser.add_argument("start", type=str, help="Start datetime (ISO format)")
events_parser.add_argument("end", type=str, help="End datetime (ISO format)")
@ns.route("/")
class BinList(Resource):
@ns.marshal_list_with(bin_model)
def get(self):
"""List all registered bins."""
# Load from your model files or a registry
return bins_registry
@ns.route("/<string:bin_id>")
@ns.param("bin_id", "The bin identifier")
@ns.response(404, "Bin not found")
class Bin(Resource):
@ns.marshal_with(bin_model)
def get(self, bin_id):
"""Get details for a specific bin."""
bin_data = find_bin(bin_id)
if not bin_data:
api.abort(404, f"Bin {bin_id} not found")
return bin_data
@ns.route("/<string:bin_id>/events")
@ns.param("bin_id", "The bin identifier")
class BinEvents(Resource):
@ns.expect(events_parser)
@ns.marshal_list_with(event_model)
def get(self, bin_id):
"""Get motion events for a specific bin."""
args = events_parser.parse_args()
events = load_events(
EVENTS_FILE,
limit=args["limit"],
sensor_id=get_sensor_for_bin(bin_id),
)
return events
A few things to notice:
@ns.marshal_with(bin_model) tells Flask-RESTx to format the response according to the model and show it in Swagger UI. The documentation stays in sync with the code and there is no separate doc file to maintain.
@ns.expect(events_parser) documents the query parameters in Swagger. When someone opens the UI, they see that /bins/bin-01/events accepts limit, start, and end parameters and can fill them in before hitting “Execute.”
api.abort(404, ...) returns a 404 response with a message. This is how you tell the client “that resource does not exist”, using the same status codes from the lecture.
Add the POST endpoint
Add an endpoint for recording that a bin was emptied (in general I have provided you with pseudocode examples, feel free to modify or add as you see fit):
DEFINE emptied_model AS an API model named "EmptiedRecord" WITH:
FIELD "bin_id" AS string WITH description "Bin identifier"
FIELD "emptied_at" AS string WITH description "ISO timestamp of when the bin was emptied"
FIELD "emptied_by" AS string WITH description "Who emptied the bin"
DEFINE API ROUTE "/<bin_id>/emptied"
DOCUMENT route parameter "bin_id" AS "The bin identifier"
CLASS BinEmptied EXTENDS Resource:
EXPECT request body to match emptied_model
DOCUMENT response 201 AS "Bin marked as emptied"
DOCUMENT response 404 AS "Bin not found"
FUNCTION post(bin_id):
RECORD that a bin was emptied
SET bin_data TO the result of find_bin(bin_id)
IF bin_data does not exist:
STOP the request WITH status 404 AND message "Bin <bin_id> not found"
SET data TO the incoming API request payload
CREATE record WITH:
"bin_id" SET TO bin_id
"emptied_at" SET TO data["emptied_at"] IF provided,
OTHERWISE SET TO the current UTC timestamp in ISO format ending with "Z"
"emptied_by" SET TO data["emptied_by"] IF provided,
OTHERWISE SET TO "unknown"
SAVE record
RETURN record WITH status code 201
This returns 201 (Created) on success. POST is not idempotent, calling it twice creates two emptied records, which is the correct behavior (the bin was emptied twice).
Add a sensors namespace
Create a second namespace for sensor endpoints:
DEFINE sensors_ns AS an API namespace named "sensors"
SET its description TO "Sensor operations"
DEFINE sensor_model AS an API model named "Sensor" WITH:
FIELD "id" AS string, required, WITH description "Sensor unique identifier"
FIELD "type" AS string WITH description "Sensor type (PIR, ultrasonic, etc.)"
FIELD "model" AS string WITH description "Hardware model"
FIELD "mounted_on" AS string WITH description "Bin this sensor is mounted on"
FIELD "status" AS string WITH description "Current sensor status"
DEFINE API ROUTE "/sensors/"
CLASS SensorList EXTENDS Resource:
FORMAT the response as a list of sensor_model objects
FUNCTION get():
LIST all registered sensors
RETURN sensors_registry
DEFINE API ROUTE "/sensors/<sensor_id>"
DOCUMENT route parameter "sensor_id" AS "The sensor identifier"
DOCUMENT response 404 AS "Sensor not found"
CLASS Sensor EXTENDS Resource:
FORMAT the response as a sensor_model object
FUNCTION get(sensor_id):
GET details for a specific sensor
SET sensor TO the result of find_sensor(sensor_id)
IF sensor does not exist:
STOP the request WITH status 404 AND message "Sensor <sensor_id> not found"
RETURN sensor
Part 4 — Add MQTT endpoints
Your REST API serves data from files. Now you will give it the ability to talk to your MQTT broker (mostly as an exercise), so a client can publish messages and read topic state through plain HTTP requests.
Connect the API to the broker
Set up an MQTT client inside your Flask app. This client stays connected to the broker for the lifetime of the API server:
IMPORT MQTT client library
IMPORT threading library
CREATE an MQTT client named "wastebin-api"
SET clean_session TO false so the session can persist
CREATE an empty dictionary called topic_store
CREATE a thread lock called topic_lock
FUNCTION on_message(client, userdata, msg):
STORE every message received by the API client
LOCK topic_lock so only one thread can update topic_store at a time
SET topic_store[msg.topic] TO a record WITH:
"topic" SET TO msg.topic
"payload" SET TO msg.payload decoded as UTF-8,
replacing invalid characters if needed
"qos" SET TO msg.qos
"retain" SET TO msg.retain
"timestamp" SET TO the current UTC timestamp in ISO format ending with "Z"
UNLOCK topic_lock
SET mqtt_client's message handler TO on_message
CONNECT mqtt_client TO the MQTT broker at "localhost" on port 1883
SET keepalive TO 60 seconds
SUBSCRIBE mqtt_client TO all topics under "smartbin/"
SET subscription QoS TO 1
START mqtt_client's background network loop
The API subscribes to smartbin/# so it can track the latest message on every topic. This builds up a live view of the MQTT state that the API can serve on demand.
Publish endpoint
Create a namespace for MQTT operations:
DEFINE mqtt_ns AS an API namespace named "mqtt"
SET its description TO "MQTT broker interaction"
DEFINE publish_model AS an API model named "MQTTPublish" WITH:
FIELD "topic" AS string, required, WITH description "MQTT topic to publish to"
FIELD "payload" AS string, required, WITH description "Message payload"
FIELD "qos" AS integer WITH description "Quality of Service (0, 1, or 2)" AND default value 1
FIELD "retain" AS boolean WITH description "Retain this message on the broker" AND default value false
DEFINE API ROUTE "/mqtt/publish"
CLASS MQTTPublish EXTENDS Resource:
EXPECT request body to match publish_model
DOCUMENT response 200 AS "Message published"
DOCUMENT response 400 AS "Invalid request"
FUNCTION post():
PUBLISH a message to an MQTT topic
SET data TO the incoming API request payload
SET topic TO data["topic"]
SET payload TO data["payload"]
SET qos TO data["qos"] IF provided, OTHERWISE 1
SET retain TO data["retain"] IF provided, OTHERWISE false
IF topic is missing OR payload is missing:
STOP the request WITH status 400 AND message "Both 'topic' and 'payload' are required"
IF qos is not 0, 1, or 2:
STOP the request WITH status 400 AND message "QoS must be 0, 1, or 2"
PUBLISH payload TO topic using mqtt_client
WITH qos set to qos
WITH retain set to retain
STORE the publish result as result
RETURN a response WITH:
"status" SET TO "published"
"topic" SET TO topic
"payload" SET TO payload
"qos" SET TO qos
"retain" SET TO retain
"mqtt_rc" SET TO result.rc
RETURN status code 200
Try it. Open Swagger UI, go to the mqtt namespace, and publish a test message:
{
"topic": "smartbin/bin-01/pir-01/motion",
"payload": "detected",
"qos": 1,
"retain": false
}
You should see the motion sensor entity update. You just triggered an MQTT event from a web browser.
Topics endpoint
Add an endpoint that shows the current state of all known topics:
DEFINE API ROUTE "/mqtt/topics"
CLASS MQTTTopics EXTENDS Resource:
FUNCTION get():
LIST all known MQTT topics and their last received message
LOCK topic_lock so topic_store can be read safely
CREATE response WITH:
"topic_count" SET TO the number of entries in topic_store
"topics" SET TO a list of all stored topic message records
RETURN response WITH status code 200
UNLOCK topic_lock
DEFINE API ROUTE "/mqtt/topics/<topic>"
DOCUMENT route parameter "topic" AS "MQTT topic path, for example smartbin/bin-01/pir-01/motion"
CLASS MQTTTopicDetail EXTENDS Resource:
DOCUMENT response 404 AS "Topic not found or no message received yet"
FUNCTION get(topic):
GET the last received message for a specific MQTT topic
LOCK topic_lock so topic_store can be read safely
IF topic does not exist in topic_store:
STOP the request WITH status 404
INCLUDE message "No message received on topic '<topic>'"
RETURN topic_store[topic] WITH status code 200
UNLOCK topic_lock
Now you can check the entire MQTT state from your browser without needing mosquitto_sub:
curl http://localhost:5000/mqtt/topics
curl http://localhost:5000/mqtt/topics/smartbin/bin-01/pir-01/motion
Combining REST actions with MQTT
The “mark bin as emptied” endpoint can also publish a status update to MQTT, so Home Assistant and other subscribers hear about it:
DEFINE API ROUTE "/<bin_id>/emptied"
DOCUMENT route parameter "bin_id" AS "The bin identifier"
CLASS BinEmptied EXTENDS Resource:
EXPECT request body to match emptied_model
DOCUMENT response 201 AS "Bin marked as emptied"
DOCUMENT response 404 AS "Bin not found"
FUNCTION post(bin_id):
RECORD that a bin was emptied
PUBLISH a status update through MQTT
SET bin_data TO the result of find_bin(bin_id)
IF bin_data does not exist:
STOP the request WITH status 404 AND message "Bin <bin_id> not found"
SET data TO the incoming API request payload
CREATE record WITH:
"bin_id" SET TO bin_id
"emptied_at" SET TO data["emptied_at"] IF provided,
OTHERWISE SET TO the current UTC timestamp in ISO format ending with "Z"
"emptied_by" SET TO data["emptied_by"] IF provided,
OTHERWISE SET TO "unknown"
SAVE record to file
PUBLISH an MQTT message:
TOPIC: "smartbin/<bin_id>/status"
PAYLOAD: JSON object WITH:
"state" SET TO "emptied"
"emptied_at" SET TO record["emptied_at"]
QOS: 1
RETAIN: true
RETURN record WITH status code 201
A single API call both records the event and notifies the MQTT ecosystem. The API acts as a gateway between the HTTP and MQTT worlds.
Part 5 — Document your MQTT interface with AsyncAPI
Your REST endpoints are self-documenting thanks to Flask-RESTx and Swagger UI. But what about your MQTT interface? Your producer publishes to topics like smartbin/bin-01/pir-01/events, your consumer subscribes, Home Assistant listens, but none of this is formally documented anywhere. Someone joining your project has to read your code to figure out what topics exist, what the message format is, and what QoS to use.
AsyncAPI solves this. It is the event-driven equivalent of OpenAPI. Where OpenAPI describes REST endpoints (request/response), AsyncAPI describes message channels (publish/subscribe). You write a specification in YAML, and tools generate interactive documentation, just like Swagger UI does automatically for your REST API. Swagger Editor now supports AsyncAPI specifications, so you can use the same tool for both.
Write the spec
Create asyncapi.yaml in your labs/lab08/ directory. This file describes your MQTT interface: what topics exist, who publishes and who subscribes, what the messages look like, and what broker they flow through.
Here is a starting point, you need to expand it to cover all your topics:
asyncapi: 3.0.0
info:
title: Smart Wastebin MQTT Interface
version: 1.0.0
description: |
Event-driven messaging interface for the Smart Wastebin system.
All communication goes through an MQTT broker (Mosquitto).
servers:
mosquitto:
host: localhost:1883
protocol: mqtt
description: Local Mosquitto broker on the Raspberry Pi
channels:
motionEvents:
address: smartbin/{bin_id}/{sensor_id}/events
description: Full motion event records published by the producer
parameters:
bin_id:
description: Wastebin identifier (e.g., bin-01)
sensor_id:
description: Sensor identifier (e.g., pir-01)
messages:
motionEvent:
$ref: '#/components/messages/MotionEvent'
motionState:
address: smartbin/{bin_id}/{sensor_id}/motion
description: Simple motion state for Home Assistant (detected/clear)
parameters:
bin_id:
description: Wastebin identifier
sensor_id:
description: Sensor identifier
messages:
motionState:
$ref: '#/components/messages/MotionState'
binStatus:
address: smartbin/{bin_id}/status
description: Retained message with current bin status and metadata
parameters:
bin_id:
description: Wastebin identifier
messages:
binStatus:
$ref: '#/components/messages/BinStatus'
operations:
publishMotionEvent:
action: send
channel:
$ref: '#/channels/motionEvents'
summary: Producer publishes a full motion event record
description: |
Published by the producer whenever the PIR sensor detects motion.
Contains the full observation record with timestamps, sensor reference,
and JSON-LD context.
consumeMotionEvent:
action: receive
channel:
$ref: '#/channels/motionEvents'
summary: Consumer receives motion events and writes them to JSONL
description: |
The consumer subscribes to this channel, enriches each event with
ingest_time and pipeline_latency_ms, and writes the result to a
JSONL file.
publishMotionState:
action: send
channel:
$ref: '#/channels/motionState'
summary: Producer publishes simple motion state for Home Assistant
publishBinStatus:
action: send
channel:
$ref: '#/channels/binStatus'
summary: Status update published as a retained message
components:
messages:
MotionEvent:
name: MotionEvent
contentType: application/json
payload:
type: object
properties:
"@context":
type: string
description: JSON-LD context reference
"@type":
type: string
description: Observation type
example: sosa:Observation
madeBySensor:
type: string
description: URI of the sensor that produced this event
example: "urn:dev:pir-01"
observedIn:
type: string
description: URI of the deployment environment
example: "urn:env:lab-room-101"
resultTime:
type: string
format: date-time
description: ISO 8601 timestamp of when the event occurred
hasSimpleResult:
type: string
description: Motion state
enum: [detected, clear]
seq:
type: integer
description: Sequence number within this pipeline run
run_id:
type: string
description: UUID identifying this pipeline run
MotionState:
name: MotionState
contentType: text/plain
payload:
type: string
enum: [detected, clear]
description: Simple motion state string for Home Assistant
BinStatus:
name: BinStatus
contentType: application/json
payload:
type: object
properties:
state:
type: string
description: Current bin state
enum: [active, emptied, full, maintenance]
location:
type: string
description: Human-readable location
last_motion:
type: string
format: date-time
description: Timestamp of most recent motion event
Walk through what this describes:
servers — the Mosquitto broker, its address, and the protocol. Equivalent to the base URL in OpenAPI.
channels — your MQTT topics, with parameters for the variable parts ({bin_id}, {sensor_id}). Each channel references the message schema it carries.
operations — who does what. publishMotionEvent is a send action (the producer publishes), consumeMotionEvent is a receive action (the consumer subscribes). This makes it explicit which components are publishers and which are subscribers.
components/messages — the message schemas. Just like the api.model definitions in Flask-RESTx, these describe the shape of each message. The MotionEvent message has all the fields from your JSON-LD observations. The MotionState message is just a simple string.
Expand it
The example above covers three channels. Think about what else your system uses:
- Any other topics you have added
Add channels, operations, and message schemas for each one.
View it
Open Swagger Editor in your browser. Click File → Import File and load your asyncapi.yaml. The editor will parse it and show interactive documentation, your channels, message schemas, server info, and operations, all rendered in a browsable interface.
Take a screenshot of your rendered AsyncAPI documentation for the report.
Why both OpenAPI and AsyncAPI?
Your Smart Wastebin system has two interfaces:
| REST API (OpenAPI) | MQTT Interface (AsyncAPI) | |
|---|---|---|
| Pattern | Request/Response | Publish/Subscribe |
| Protocol | HTTP | MQTT |
| Use case | On-demand queries, commands | Real-time events, state updates |
| Example | GET /bins/bin-01/events?limit=10 |
Subscribe to smartbin/bin-01/pir-01/events |
| Spec | OpenAPI (auto-generated by Flask-RESTx) | AsyncAPI (written by hand) |
Both specs serve the same purpose: they formally describe your interface so that someone who has never seen your code can understand how to interact with your system. OpenAPI covers the pull side. AsyncAPI covers the push side.
Part 6 — Test with Swagger UI and curl
With the API running, open http://<your-pi-ip>:5000 in your browser. You should see all your endpoints organized by namespace (bins, sensors, mqtt).
Test each endpoint:
- GET /bins — should return your list of bins
- GET /bins/bin-01 — should return details for bin-01. What happens if you request a bin that does not exist?
- GET /bins/bin-01/events — should return motion events. Try with different
limitvalues - POST /bins/bin-01/emptied — fill in the JSON body and submit. Check that a 201 response comes back and that Home Assistant updates
- GET /sensors — should list your sensors
- GET /sensors/pir-01 — should return sensor details
- POST /mqtt/publish — publish a motion event. Check that Home Assistant and your consumer both receive it
- GET /mqtt/topics — see all the topics the API is tracking
- GET /mqtt/topics/smartbin/bin-01/pir-01/motion — check the last message on a specific topic
Take screenshots of the Swagger UI showing your endpoints and at least one successful response.
Test with curl
# List all bins
curl http://localhost:5000/bins/
# Get a specific bin
curl http://localhost:5000/bins/bin-01
# Get events with a limit
curl "http://localhost:5000/bins/bin-01/events?limit=10"
# Mark a bin as emptied (also publishes to MQTT)
curl -X POST http://localhost:5000/bins/bin-01/emptied \
-H "Content-Type: application/json" \
-d '{"emptied_by": "maintenance-team-A"}'
# Publish to MQTT through the API
curl -X POST http://localhost:5000/mqtt/publish \
-H "Content-Type: application/json" \
-d '{"topic": "smartbin/bin-01/pir-01/motion", "payload": "detected", "qos": 1}'
# See all MQTT topics
curl http://localhost:5000/mqtt/topics
# Request a bin that does not exist
curl -i http://localhost:5000/bins/bin-99
Check that you get 200 for successful requests, 201 for the POST, and 404 for nonexistent resources.
Why this lab is structured this way
Until now, the only way to get data out of your system was to either watch MQTT messages in real time or open a JSONL file. Both require you to know the internals, which topic to subscribe to, where the file is, what the format looks like. An API puts a clean front door on the system. A mobile app, a web dashboard, a route-planning service, or a maintenance tool can all query the same API without knowing anything about MQTT topics or JSONL files. They just send HTTP requests and get JSON responses.
The MQTT endpoints take this further. By wrapping MQTT publish and topic state in REST endpoints, your API becomes a universal gateway. A facilities manager can mark a bin as emptied from a web form, and the MQTT ecosystem (Home Assistant, your consumer, other subscribers) all hear about it instantly. Your REST interface is documented by OpenAPI (auto-generated). Your MQTT interface is documented by AsyncAPI (written by hand). Together, they describe your entire system in standard, machine-readable formats that any developer can read and any tool can process.
This combination of MQTT and REST is a pattern you see in real IoT platforms. AWS IoT has MQTT for device communication and REST APIs for management. Azure IoT Hub does the same. Home Assistant itself exposes a REST API alongside its MQTT integration. Your Smart Wastebin now follows the same architecture.
Report questions
Answer the following in your labs/lab08/README.md after the implementation and experiments are complete.
API design
RQ1: Write down your complete API design, every endpoint, its HTTP method, the URI, what parameters it accepts, and what it returns. Present this as a table.
RQ2: Why do the event-listing endpoints use GET and not POST?
RQ3: Why does the “mark as emptied” endpoint use POST and not PUT? Think about idempotency.
RQ4: How did you handle the case where a client requests a bin or sensor that does not exist? What status code do you return and why?
Implementation
RQ5: Where does your API read its data from? Trace the path of event data from the PIR sensor all the way to an API response.
RQ6: What query parameters does your events endpoint support? Show an example request and response.
RQ7: How do the Flask-RESTx models (api.model) relate to the Swagger UI documentation? What happens in the UI when you add a new field to a model?
RQ8: Show a screenshot of your Swagger UI with endpoints visible.
MQTT endpoints
RQ9: Explain how the POST /mqtt/publish endpoint works. What does the API do when it receives a publish request?
RQ10: You published a motion event through the API using POST /mqtt/publish. Describe the full path that message takes, from the HTTP request to the consumer’s JSONL file.
RQ11: What does GET /mqtt/topics return? Why does the API need to subscribe to smartbin/# for this to work?
RQ12: You call POST /bins/bin-01/emptied. This both saves a record and publishes to MQTT. What is the advantage of combining both actions in one endpoint?
AsyncAPI
RQ13: What is AsyncAPI and how does it relate to OpenAPI? Why do you need both for the Smart Wastebin?
RQ14: How many channels did you document in your AsyncAPI spec? For each, state who is the publisher and who is the subscriber.
RQ15: Show a screenshot of your AsyncAPI spec rendered in Swagger Editor or AsyncAPI Studio.
RQ16: Compare the MotionEvent message schema in your AsyncAPI spec with the event_model in your Flask-RESTx code. They describe the same data, what is different about the context in which each is used?
Testing
RQ17: Show the curl command and response for: (a) listing all bins, (b) getting events with a limit, (c) publishing an MQTT message, (d) requesting a nonexistent bin.
RQ18: What is the difference between testing with Swagger UI and testing with curl? When would you use each?
Reflection
RQ19: A new team member joins your project. They need to build a mobile app that shows bin status and lets users report full bins. What do you hand them? How do the Swagger UI and AsyncAPI spec help?
RQ20: In your own words, explain why the Smart Wastebin needs both a push-based system (MQTT) and a pull-based system (REST API). What would be missing if you only had one?
Project hint: Smart Wastebin
The API you built is the interface that other applications will use to interact with your system. As you add more sensors and functionality, both the REST API and the AsyncAPI spec grow with it: Think about your final project demo. You can show the Swagger UI with all your REST endpoints, the AsyncAPI documentation with all your MQTT channels, and the Home Assistant dashboard with live state, three views of one system, each serving a different audience (app developers, end users etc.).
What should be finished before you leave the lab
Before the end of the session you should have: designed your API endpoints, implemented the API with Flask and Flask-RESTx, created namespaces for bins, sensors, and MQTT, included GET endpoints for listing and detail views, included POST endpoints for recording events and publishing MQTT messages, added query parameter support for events, handled error cases with appropriate status codes, connected the API to the MQTT broker, written an AsyncAPI spec covering all your MQTT topics, tested all endpoints through Swagger UI and curl, run the full system together, updated labs/lab08/README.md, and pushed to GitHub.
Final checklist (Lab 08)
-
api.pycreated with Flask and Flask-RESTx - Swagger UI accessible at port 5000
-
binsnamespace with list, detail, and events endpoints -
sensorsnamespace with list and detail endpoints - Events endpoint with
limitquery parameter -
POST /bins/<bin_id>/emptiedendpoint working -
mqttnamespace with publish and topics endpoints - API connected to Mosquitto broker
-
POST /mqtt/publishcan trigger MQTT messages (verified in Home Assistant or consumer) -
GET /mqtt/topicsshows tracked topic state - Response models defined for Swagger documentation
- 404 returned for nonexistent resources
- API reads event data from consumer’s JSONL output
-
asyncapi.yamlcreated with channels, operations, and message schemas - AsyncAPI spec covers all MQTT topics used by the system
- AsyncAPI spec viewable in Swagger Editor or AsyncAPI Studio
- Tested via Swagger UI (screenshots in report)
- Tested via
curl -
labs/lab08/README.mdcontains API design, code, screenshots, AsyncAPI spec, and report answers - Commit and push completed
Deliverables and submission
What must exist in the repository (by end of lab)
/
├── README.md
├── labs/
│ ├── lab01/
│ ├── ...
│ └── lab08/
│ ├── README.md
│ ├── requirements.txt
│ ├── api.py
│ ├── asyncapi.yaml
│ ├── producer.py
│ ├── consumer.py
│ └── pirlib/
│ ├── __init__.py
│ ├── sampler.py
│ └── interpreter.py
Do not include:
venv/__pycache__/*.pycoutput/or*.jsonl- large temporary files unless explicitly requested
What labs/lab08/README.md must contain
Two clearly separated parts:
- Code / runbook — include your API design table, endpoint implementations (or relevant excerpts), Swagger UI screenshots, AsyncAPI spec (or relevant excerpts), rendered AsyncAPI screenshot, and how to run the full system
- Answers to report questions
Same style as previous labs.
End of lab session — GitHub checkpoint
Before leaving:
- commit your progress
- push to your team GitHub repository
Minimum expectation:
- all deliverables tracked by Git
- latest commit pushed
- commit message is clear
Before next lab — eClass submission
Submit both:
- Code archive (
.zip) - PDF export of
labs/lab08/README.md
Required PDF filename format:
lab08_REPORT_<team>.pdf
What follows is a greek version of the same lab
Lab 08 — Δημιουργία REST API για το Smart Wastebin
Προθεσμίες:
- Τέλος εργαστηριακής συνεδρίας (GitHub checkpoint): κάντε commit & push την πρόοδό σας στο αποθετήριο της ομάδας σας.
- Πριν το επόμενο εργαστήριο (υποβολή στο eClass): ανεβάστε (1) ένα
.zipμε τον κώδικά σας και (2) ένα PDF export τουlabs/lab08/README.md.
Περιεχόμενο υποβολής:
- (1) ένα
.zipμε τον κώδικά σας, και - (2) ένα PDF export του
labs/lab08/README.md.
Εισαγωγή στο τι πρέπει να κάνετε
Σε αυτό το εργαστήριο θα φτιάξετε ένα REST API που συνυπάρχει με το υπάρχον pipeline σας και σερβίρει δεδομένα από αυτό. Θα φτιάξετε επίσης MQTT endpoints μέσα στο API, ώστε ένας client να μπορεί να κάνει publish μηνύματα σε MQTT topics μέσω απλού HTTP request. Τέλος, θα τεκμηριώσετε το MQTT interface σας με AsyncAPI, το event-driven ισοδύναμο του OpenAPI. Τα REST endpoints σας τεκμηριώνονται αυτόματα από το Flask-RESTx. Τα MQTT topics σας τεκμηριώνονται από ένα AsyncAPI spec. Μαζί, περιγράφουν ολόκληρο το σύστημά σας.
Θα χρησιμοποιήσετε Flask ως web framework και Flask-RESTx για να ορίσετε τα endpoints σας και να δημιουργήσετε αυτόματα Swagger UI documentation.
Το σύστημα έχει τώρα τρία communication layers:
┌──────────────────────────────────────────────────────────────┐
│ Smart Wastebin System │
│ │
│ PIR sensor ──▶ producer ──▶ MQTT broker ──▶ consumer │
│ ▲ │ │ │
│ │ ▼ ▼ │
│ REST API HA JSONL file │
│ (publish (live │ │
│ + query) dashboard) │ │
│ ▲ │ │
│ │ ▼ │
│ Swagger UI ◀────── read events │
│ (docs + test) │
│ │
│ AsyncAPI spec ──▶ περιγράφει το MQTT interface │
│ OpenAPI spec ──▶ περιγράφει το REST interface (auto) │
└──────────────────────────────────────────────────────────────┘
Δημιουργήστε την παρακάτω δομή:
/
├── README.md
├── labs/
│ ├── lab01/
│ ├── ...
│ └── lab08/
│ ├── README.md
│ ├── requirements.txt
│ ├── api.py
│ ├── asyncapi.yaml
│ ├── producer.py
│ ├── consumer.py
│ └── pirlib/
│ ├── __init__.py
│ ├── sampler.py
│ └── interpreter.py
Αντιγράψτε τα producer.py, consumer.py, και pirlib/ από το Lab 06/07. Τα νέα αρχεία είναι το api.py (ο REST API server) και το asyncapi.yaml (η τεκμηρίωση του MQTT interface).
Μέρος 1 — Ρύθμιση Flask και Flask-RESTx
Εγκαταστήστε τα απαιτούμενα πακέτα. Προσθέστε τα στο requirements.txt σας:
flask
flask-restx
...
Εγκατάσταση:
pip install flask flask-restx
Δημιουργήστε το api.py με μια minimal Flask-RESTx εφαρμογή για να βεβαιωθείτε ότι όλα λειτουργούν:
from flask import Flask
from flask_restx import Api, Resource
app = Flask(__name__)
api = Api(
app,
version="1.0",
title="Smart Wastebin API",
description="REST API for querying Smart Wastebin sensor data and bin status",
)
ns = api.namespace("bins", description="Wastebin operations")
@ns.route("/")
class BinList(Resource):
def get(self):
"""List all registered bins."""
return {"bins": []}, 200
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=5000)
Τρέξτε το:
python api.py
Ανοίξτε browser και πηγαίνετε στο http://<your-pi-ip>:5000. Θα πρέπει να δείτε το Swagger UI, μια interactive σελίδα documentation που εμφανίζει το API σας με ένα endpoint. Κάντε κλικ σε αυτό, κάντε κλικ “Try it out”, και κάντε κλικ “Execute.” Θα πρέπει να λάβετε {"bins": []} με status code 200.
Αυτό είναι το σημείο εκκίνησής σας. Κάθε endpoint που προσθέτετε θα εμφανίζεται αυτόματα στο Swagger UI, τεκμηριωμένο και δοκιμάσιμο. Δεν χρειάζεται ποτέ να γράψετε documentation ξεχωριστά (για το REST interface τουλάχιστον) — προέρχεται από τον κώδικα.
Μέρος 2 — Σχεδιασμός του API σας
Πριν γράψετε περισσότερο κώδικα, σκεφτείτε τι πρέπει να εκθέτει το API σας. Η διάλεξη τόνισε τον σχεδιασμό του API πρώτα, δηλαδή την απόφαση για τα resources, τα URIs, και τα verbs πριν υλοποιήσετε οτιδήποτε.
Τα resources σας είναι:
- Bins — οι ίδιοι οι κάδοι απορριμμάτων (status, τοποθεσία, metadata)
- Sensors — οι sensors που είναι τοποθετημένοι στους κάδους
- Events — τα motion events που παράγει το pipeline σας (αποθηκευμένα στο JSONL αρχείο)
- MQTT — ένα interface για publishing και ανάγνωση από MQTT topics μέσω HTTP
Σχεδιάστε τα endpoints σας ακολουθώντας REST conventions. Ορίστε ένα σημείο εκκίνησης — πρέπει να το επεκτείνετε:
| Method | URI | Περιγραφή |
|---|---|---|
| GET | /bins |
Λίστα όλων των bins |
| GET | /bins/<bin_id> |
Λεπτομέρειες για συγκεκριμένο bin |
| GET | /bins/<bin_id>/sensors |
Λίστα sensors σε συγκεκριμένο bin |
| GET | /bins/<bin_id>/events |
Motion events για συγκεκριμένο bin |
| GET | /sensors |
Λίστα όλων των sensors |
| GET | /sensors/<sensor_id> |
Λεπτομέρειες για συγκεκριμένο sensor |
| POST | /bins/<bin_id>/emptied |
Καταγραφή αδειάσματος bin |
| POST | /mqtt/publish |
Publish μηνύματος σε MQTT topic |
| GET | /mqtt/topics |
Λίστα γνωστών MQTT topics και τελευταία retained τιμή |
Σκεφτείτε:
- Ποιες παραμέτρους πρέπει να δέχεται το events endpoint; Τουλάχιστον ένα
limitγια τον έλεγχο του αριθμού των records που επιστρέφονται. Ίσως και ένα εύρος ημερομηνιών (start,end); - Πώς πρέπει να μοιάζει η response; Ποια fields χρειάζεται κάθε resource;
- Ποιους status codes πρέπει να επιστρέφει κάθε endpoint; 200 για επιτυχία, 404 αν ένα bin δεν υπάρχει, 400 αν οι query parameters είναι άκυροι;
- Για το MQTT publish endpoint, τι χρειάζεται να στείλει ο client (topic, payload, QoS, retain);
Καταγράψτε τον σχεδιασμό του API σας στην αναφορά σας πριν το υλοποιήσετε.
Μέρος 3 — Υλοποίηση των data endpoints
Τώρα φτιάξτε την πλευρά του API που σερβίρει δεδομένα. Θα διαβάζετε πληροφορίες bin και sensor από τα JSON-LD models σας (από το Lab 05), και event data από τα JSONL output αρχεία σας.
Φορτώστε τα δεδομένα σας
Το API σας χρειάζεται να διαβάζει από δύο πηγές:
- Τα JSON-LD model αρχεία (
models/sensor.jsonld,models/wastebin.jsonld,models/environment.jsonld) για metadata bin και sensor - Το JSONL event log που γράφει ο consumer για motion events
Για τα model αρχεία, φορτώστε τα κατά την εκκίνηση:
IMPORT json module
IMPORT os module
SET DATA_DIR ΩΣ τη διαδρομή του τρέχοντος αρχείου συνδυασμένη με "data"
SET EVENTS_FILE ΩΣ τη διαδρομή του DATA_DIR συνδυασμένη με "motion_events.jsonl"
FUNCTION load_json(filepath):
OPEN το αρχείο στο filepath σε read mode
READ και parse τα περιεχόμενα του αρχείου ως JSON
RETURN τα parsed JSON δεδομένα
FUNCTION load_events(filepath, limit = none, sensor_id = none):
CREATE κενή λίστα events
IF το αρχείο στο filepath δεν υπάρχει:
RETURN events
OPEN το αρχείο στο filepath σε read mode
FOR κάθε γραμμή στο αρχείο:
REMOVE leading και trailing whitespace από τη γραμμή
IF η γραμμή είναι κενή:
SKIP στην επόμενη γραμμή
TRY:
PARSE τη γραμμή ως JSON και αποθήκευσέ τη ως record
IF δόθηκε sensor_id ΚΑΙ η τιμή "madeBySensor" του record δεν ισούται με sensor_id:
SKIP στην επόμενη γραμμή
ADD record στα events
IF η γραμμή δεν μπορεί να parsed ως JSON:
SKIP στην επόμενη γραμμή
REVERSE events ώστε τα πιο πρόσφατα records να έρθουν πρώτα
IF δόθηκε limit:
KEEP μόνο τα πρώτα limit records στα events
RETURN events
Ορίστε τα models για το Swagger
Το Flask-RESTx σας επιτρέπει να ορίζετε response models ώστε το Swagger UI να εμφανίζει την αναμενόμενη δομή response:
bin_model = api.model("Bin", {
"id": fields.String(required=True, description="Bin unique identifier"),
"name": fields.String(description="Human-readable name"),
"location": fields.String(description="Deployment location"),
"status": fields.String(description="Current status"),
})
event_model = api.model("Event", {
"resultTime": fields.String(description="ISO timestamp of the event"),
"madeBySensor": fields.String(description="Sensor ID that produced this event"),
"hasSimpleResult": fields.String(description="Motion state (detected/clear)"),
"pipeline_latency_ms": fields.Float(description="Pipeline latency in ms"),
})
Αυτά τα models δεν επιβάλλουν τίποτα κατά την εκτέλεση — είναι documentation. Λένε σε όποιον διαβάζει το Swagger UI ακριβώς πώς μοιάζει η response.
Υλοποιήστε τα endpoints
Ορίστε ένα παράδειγμα πιο πλήρους bins endpoint. Πρέπει να το επεκτείνετε και να το προσαρμόσετε στα δεδομένα σας:
from flask_restx import reqparse
# Parser για query parameters
events_parser = reqparse.RequestParser()
events_parser.add_argument("limit", type=int, default=50, help="Max events to return")
events_parser.add_argument("start", type=str, help="Start datetime (ISO format)")
events_parser.add_argument("end", type=str, help="End datetime (ISO format)")
@ns.route("/")
class BinList(Resource):
@ns.marshal_list_with(bin_model)
def get(self):
"""List all registered bins."""
# Φόρτωση από τα model αρχεία σας ή ένα registry
return bins_registry
@ns.route("/<string:bin_id>")
@ns.param("bin_id", "The bin identifier")
@ns.response(404, "Bin not found")
class Bin(Resource):
@ns.marshal_with(bin_model)
def get(self, bin_id):
"""Get details for a specific bin."""
bin_data = find_bin(bin_id)
if not bin_data:
api.abort(404, f"Bin {bin_id} not found")
return bin_data
@ns.route("/<string:bin_id>/events")
@ns.param("bin_id", "The bin identifier")
class BinEvents(Resource):
@ns.expect(events_parser)
@ns.marshal_list_with(event_model)
def get(self, bin_id):
"""Get motion events for a specific bin."""
args = events_parser.parse_args()
events = load_events(
EVENTS_FILE,
limit=args["limit"],
sensor_id=get_sensor_for_bin(bin_id),
)
return events
Μερικά πράγματα να παρατηρήσετε:
Το @ns.marshal_with(bin_model) λέει στο Flask-RESTx να μορφοποιεί την response σύμφωνα με το model και να την εμφανίζει στο Swagger UI. Η documentation παραμένει συγχρονισμένη με τον κώδικα και δεν υπάρχει ξεχωριστό doc αρχείο για συντήρηση.
Το @ns.expect(events_parser) τεκμηριώνει τα query parameters στο Swagger. Όταν κάποιος ανοίξει το UI, βλέπει ότι το /bins/bin-01/events δέχεται παραμέτρους limit, start, και end και μπορεί να τις συμπληρώσει πριν πατήσει “Execute.”
Το api.abort(404, ...) επιστρέφει response 404 με μήνυμα. Έτσι λέτε στον client “αυτό το resource δεν υπάρχει”, χρησιμοποιώντας τους ίδιους status codes από τη διάλεξη.
Προσθέστε το POST endpoint
Προσθέστε endpoint για καταγραφή αδειάσματος bin (γενικά έχω δώσει pseudocode παραδείγματα — τροποποιήστε ή προσθέστε ό,τι κρίνετε κατάλληλο):
DEFINE emptied_model ΩΣ API model με όνομα "EmptiedRecord" ΜΕ:
FIELD "bin_id" ΩΣ string ΜΕ description "Bin identifier"
FIELD "emptied_at" ΩΣ string ΜΕ description "ISO timestamp αδειάσματος"
FIELD "emptied_by" ΩΣ string ΜΕ description "Ποιος άδειασε τον κάδο"
DEFINE API ROUTE "/<bin_id>/emptied"
DOCUMENT route parameter "bin_id" ΩΣ "The bin identifier"
CLASS BinEmptied EXTENDS Resource:
EXPECT request body να ταιριάζει με emptied_model
DOCUMENT response 201 ΩΣ "Bin marked as emptied"
DOCUMENT response 404 ΩΣ "Bin not found"
FUNCTION post(bin_id):
RECORD ότι ένας κάδος αδειάστηκε
SET bin_data ΩΣ αποτέλεσμα find_bin(bin_id)
IF bin_data δεν υπάρχει:
STOP το request ΜΕ status 404 ΚΑΙ μήνυμα "Bin <bin_id> not found"
SET data ΩΣ το εισερχόμενο API request payload
CREATE record ΜΕ:
"bin_id" SET TO bin_id
"emptied_at" SET TO data["emptied_at"] ΑΝ δόθηκε,
ΑΛΛΙΩΣ SET TO τρέχον UTC timestamp σε ISO format που τελειώνει με "Z"
"emptied_by" SET TO data["emptied_by"] ΑΝ δόθηκε,
ΑΛΛΙΩΣ SET TO "unknown"
SAVE record
RETURN record ΜΕ status code 201
Αυτό επιστρέφει 201 (Created) σε επιτυχία. Το POST δεν είναι idempotent — καλώντας το δύο φορές δημιουργούνται δύο emptied records, κάτι που είναι σωστή συμπεριφορά (ο κάδος αδειάστηκε δύο φορές).
Προσθέστε sensors namespace
Δημιουργήστε ένα δεύτερο namespace για sensor endpoints:
DEFINE sensors_ns ΩΣ API namespace με όνομα "sensors"
SET description ΩΣ "Sensor operations"
DEFINE sensor_model ΩΣ API model με όνομα "Sensor" ΜΕ:
FIELD "id" ΩΣ string, required, ΜΕ description "Sensor unique identifier"
FIELD "type" ΩΣ string ΜΕ description "Τύπος sensor (PIR, ultrasonic, κ.λπ.)"
FIELD "model" ΩΣ string ΜΕ description "Hardware model"
FIELD "mounted_on" ΩΣ string ΜΕ description "Bin στον οποίο είναι τοποθετημένος ο sensor"
FIELD "status" ΩΣ string ΜΕ description "Τρέχουσα κατάσταση sensor"
DEFINE API ROUTE "/sensors/"
CLASS SensorList EXTENDS Resource:
FORMAT response ΩΣ λίστα sensor_model objects
FUNCTION get():
LIST όλους τους καταχωρημένους sensors
RETURN sensors_registry
DEFINE API ROUTE "/sensors/<sensor_id>"
DOCUMENT route parameter "sensor_id" ΩΣ "The sensor identifier"
DOCUMENT response 404 ΩΣ "Sensor not found"
CLASS Sensor EXTENDS Resource:
FORMAT response ΩΣ sensor_model object
FUNCTION get(sensor_id):
GET λεπτομέρειες για συγκεκριμένο sensor
SET sensor ΩΣ αποτέλεσμα find_sensor(sensor_id)
IF sensor δεν υπάρχει:
STOP το request ΜΕ status 404 ΚΑΙ μήνυμα "Sensor <sensor_id> not found"
RETURN sensor
Μέρος 4 — Προσθήκη MQTT endpoints
Το REST API σας σερβίρει δεδομένα από αρχεία. Τώρα θα του δώσετε τη δυνατότητα να επικοινωνεί με τον MQTT broker σας (κυρίως ως άσκηση), ώστε ένας client να μπορεί να κάνει publish μηνύματα και να διαβάζει topic state μέσω απλών HTTP requests.
Συνδέστε το API στον broker
Ρυθμίστε έναν MQTT client μέσα στην Flask εφαρμογή σας. Αυτός ο client παραμένει συνδεδεμένος στον broker για όλη τη διάρκεια ζωής του API server:
IMPORT MQTT client library
IMPORT threading library
CREATE MQTT client με όνομα "wastebin-api"
SET clean_session ΣΕ false ώστε το session να μπορεί να διατηρηθεί
CREATE κενό dictionary topic_store
CREATE thread lock topic_lock
FUNCTION on_message(client, userdata, msg):
STORE κάθε μήνυμα που λαμβάνει ο API client
LOCK topic_lock ώστε μόνο ένα thread να μπορεί να ενημερώνει το topic_store τη φορά
SET topic_store[msg.topic] ΩΣ record ΜΕ:
"topic" SET TO msg.topic
"payload" SET TO msg.payload decoded ως UTF-8,
με αντικατάσταση άκυρων χαρακτήρων αν χρειάζεται
"qos" SET TO msg.qos
"retain" SET TO msg.retain
"timestamp" SET TO τρέχον UTC timestamp σε ISO format που τελειώνει με "Z"
UNLOCK topic_lock
SET mqtt_client's message handler ΩΣ on_message
CONNECT mqtt_client ΣΤΟ MQTT broker στο "localhost" στη port 1883
SET keepalive ΩΣ 60 seconds
SUBSCRIBE mqtt_client ΣΕ όλα τα topics κάτω από "smartbin/"
SET subscription QoS ΩΣ 1
START mqtt_client's background network loop
Το API κάνει subscribe στο smartbin/# ώστε να παρακολουθεί το τελευταίο μήνυμα σε κάθε topic. Αυτό δημιουργεί μια live εικόνα του MQTT state που το API μπορεί να σερβίρει on demand.
Publish endpoint
Δημιουργήστε ένα namespace για MQTT operations:
DEFINE mqtt_ns ΩΣ API namespace με όνομα "mqtt"
SET description ΩΣ "MQTT broker interaction"
DEFINE publish_model ΩΣ API model με όνομα "MQTTPublish" ΜΕ:
FIELD "topic" ΩΣ string, required, ΜΕ description "MQTT topic για publish"
FIELD "payload" ΩΣ string, required, ΜΕ description "Message payload"
FIELD "qos" ΩΣ integer ΜΕ description "Quality of Service (0, 1, ή 2)" ΚΑΙ default τιμή 1
FIELD "retain" ΩΣ boolean ΜΕ description "Retain αυτό το μήνυμα στον broker" ΚΑΙ default τιμή false
DEFINE API ROUTE "/mqtt/publish"
CLASS MQTTPublish EXTENDS Resource:
EXPECT request body να ταιριάζει με publish_model
DOCUMENT response 200 ΩΣ "Message published"
DOCUMENT response 400 ΩΣ "Invalid request"
FUNCTION post():
PUBLISH μήνυμα σε MQTT topic
SET data ΩΣ το εισερχόμενο API request payload
SET topic ΩΣ data["topic"]
SET payload ΩΣ data["payload"]
SET qos ΩΣ data["qos"] ΑΝ δόθηκε, ΑΛΛΙΩΣ 1
SET retain ΩΣ data["retain"] ΑΝ δόθηκε, ΑΛΛΙΩΣ false
IF topic λείπει Ή payload λείπει:
STOP το request ΜΕ status 400 ΚΑΙ μήνυμα "Both 'topic' and 'payload' are required"
IF qos δεν είναι 0, 1, ή 2:
STOP το request ΜΕ status 400 ΚΑΙ μήνυμα "QoS must be 0, 1, or 2"
PUBLISH payload ΣΤΟ topic χρησιμοποιώντας mqtt_client
ΜΕ qos set to qos
ΜΕ retain set to retain
STORE αποτέλεσμα publish ΩΣ result
RETURN response ΜΕ:
"status" SET TO "published"
"topic" SET TO topic
"payload" SET TO payload
"qos" SET TO qos
"retain" SET TO retain
"mqtt_rc" SET TO result.rc
RETURN status code 200
Δοκιμάστε το. Ανοίξτε Swagger UI, πηγαίνετε στο mqtt namespace, και κάντε publish ένα test μήνυμα:
{
"topic": "smartbin/bin-01/pir-01/motion",
"payload": "detected",
"qos": 1,
"retain": false
}
Θα πρέπει να δείτε το motion sensor entity να ενημερώνεται. Μόλις ενεργοποιήσατε ένα MQTT event από web browser.
Topics endpoint
Προσθέστε endpoint που εμφανίζει την τρέχουσα κατάσταση όλων των γνωστών topics:
DEFINE API ROUTE "/mqtt/topics"
CLASS MQTTTopics EXTENDS Resource:
FUNCTION get():
LIST όλα τα γνωστά MQTT topics και το τελευταίο μήνυμα που ελήφθη
LOCK topic_lock ώστε το topic_store να διαβαστεί με ασφάλεια
CREATE response ΜΕ:
"topic_count" SET TO αριθμό εγγραφών στο topic_store
"topics" SET TO λίστα όλων των αποθηκευμένων topic message records
RETURN response ΜΕ status code 200
UNLOCK topic_lock
DEFINE API ROUTE "/mqtt/topics/<topic>"
DOCUMENT route parameter "topic" ΩΣ "MQTT topic path, π.χ. smartbin/bin-01/pir-01/motion"
CLASS MQTTTopicDetail EXTENDS Resource:
DOCUMENT response 404 ΩΣ "Topic not found or no message received yet"
FUNCTION get(topic):
GET το τελευταίο ληφθέν μήνυμα για συγκεκριμένο MQTT topic
LOCK topic_lock ώστε το topic_store να διαβαστεί με ασφάλεια
IF topic δεν υπάρχει στο topic_store:
STOP το request ΜΕ status 404
INCLUDE μήνυμα "No message received on topic '<topic>'"
RETURN topic_store[topic] ΜΕ status code 200
UNLOCK topic_lock
Τώρα μπορείτε να ελέγξετε ολόκληρο το MQTT state από τον browser σας χωρίς να χρειάζεστε mosquitto_sub:
curl http://localhost:5000/mqtt/topics
curl http://localhost:5000/mqtt/topics/smartbin/bin-01/pir-01/motion
Συνδυασμός REST actions με MQTT
Το endpoint “mark bin as emptied” μπορεί επίσης να κάνει publish ένα status update στο MQTT, ώστε το Home Assistant και άλλοι subscribers να ενημερωθούν:
DEFINE API ROUTE "/<bin_id>/emptied"
DOCUMENT route parameter "bin_id" ΩΣ "The bin identifier"
CLASS BinEmptied EXTENDS Resource:
EXPECT request body να ταιριάζει με emptied_model
DOCUMENT response 201 ΩΣ "Bin marked as emptied"
DOCUMENT response 404 ΩΣ "Bin not found"
FUNCTION post(bin_id):
RECORD ότι ένας κάδος αδειάστηκε
PUBLISH status update μέσω MQTT
SET bin_data ΩΣ αποτέλεσμα find_bin(bin_id)
IF bin_data δεν υπάρχει:
STOP το request ΜΕ status 404 ΚΑΙ μήνυμα "Bin <bin_id> not found"
SET data ΩΣ το εισερχόμενο API request payload
CREATE record ΜΕ:
"bin_id" SET TO bin_id
"emptied_at" SET TO data["emptied_at"] ΑΝ δόθηκε,
ΑΛΛΙΩΣ SET TO τρέχον UTC timestamp σε ISO format που τελειώνει με "Z"
"emptied_by" SET TO data["emptied_by"] ΑΝ δόθηκε,
ΑΛΛΙΩΣ SET TO "unknown"
SAVE record σε αρχείο
PUBLISH MQTT μήνυμα:
TOPIC: "smartbin/<bin_id>/status"
PAYLOAD: JSON object ΜΕ:
"state" SET TO "emptied"
"emptied_at" SET TO record["emptied_at"]
QOS: 1
RETAIN: true
RETURN record ΜΕ status code 201
Μια μόνο API κλήση καταγράφει το event και ειδοποιεί το MQTT ecosystem. Το API λειτουργεί ως gateway μεταξύ των κόσμων HTTP και MQTT.
Μέρος 5 — Τεκμηρίωση του MQTT interface με AsyncAPI
Τα REST endpoints σας τεκμηριώνονται αυτόματα χάρη στο Flask-RESTx και το Swagger UI. Αλλά τι γίνεται με το MQTT interface σας; Ο producer σας κάνει publish σε topics όπως smartbin/bin-01/pir-01/events, ο consumer κάνει subscribe, το Home Assistant ακούει — αλλά τίποτα από αυτά δεν τεκμηριώνεται επίσημα πουθενά. Κάποιος που μπαίνει στο project σας πρέπει να διαβάσει τον κώδικα για να καταλάβει ποια topics υπάρχουν, ποια είναι η μορφή μηνύματος, και τι QoS να χρησιμοποιήσει.
Το AsyncAPI λύνει αυτό. Είναι το event-driven ισοδύναμο του OpenAPI. Όπου το OpenAPI περιγράφει REST endpoints (request/response), το AsyncAPI περιγράφει message channels (publish/subscribe). Γράφετε ένα spec σε YAML, και τα εργαλεία δημιουργούν interactive documentation, ακριβώς όπως το Swagger UI κάνει αυτόματα για το REST API σας. Το Swagger Editor υποστηρίζει πλέον AsyncAPI specifications, οπότε μπορείτε να χρησιμοποιήσετε το ίδιο εργαλείο και για τα δύο.
Γράψτε το spec
Δημιουργήστε το asyncapi.yaml στον κατάλογο labs/lab08/. Αυτό το αρχείο περιγράφει το MQTT interface σας: ποια topics υπάρχουν, ποιος κάνει publish και ποιος subscribe, πώς μοιάζουν τα μηνύματα, και σε ποιον broker ρέουν.
Ορίστε ένα σημείο εκκίνησης — πρέπει να το επεκτείνετε ώστε να καλύπτει όλα τα topics σας:
asyncapi: 3.0.0
info:
title: Smart Wastebin MQTT Interface
version: 1.0.0
description: |
Event-driven messaging interface for the Smart Wastebin system.
All communication goes through an MQTT broker (Mosquitto).
servers:
mosquitto:
host: localhost:1883
protocol: mqtt
description: Local Mosquitto broker on the Raspberry Pi
channels:
motionEvents:
address: smartbin/{bin_id}/{sensor_id}/events
description: Full motion event records published by the producer
parameters:
bin_id:
description: Wastebin identifier (e.g., bin-01)
sensor_id:
description: Sensor identifier (e.g., pir-01)
messages:
motionEvent:
$ref: '#/components/messages/MotionEvent'
motionState:
address: smartbin/{bin_id}/{sensor_id}/motion
description: Simple motion state for Home Assistant (detected/clear)
parameters:
bin_id:
description: Wastebin identifier
sensor_id:
description: Sensor identifier
messages:
motionState:
$ref: '#/components/messages/MotionState'
binStatus:
address: smartbin/{bin_id}/status
description: Retained message with current bin status and metadata
parameters:
bin_id:
description: Wastebin identifier
messages:
binStatus:
$ref: '#/components/messages/BinStatus'
operations:
publishMotionEvent:
action: send
channel:
$ref: '#/channels/motionEvents'
summary: Producer publishes a full motion event record
description: |
Published by the producer whenever the PIR sensor detects motion.
Contains the full observation record with timestamps, sensor reference,
and JSON-LD context.
consumeMotionEvent:
action: receive
channel:
$ref: '#/channels/motionEvents'
summary: Consumer receives motion events and writes them to JSONL
description: |
The consumer subscribes to this channel, enriches each event with
ingest_time and pipeline_latency_ms, and writes the result to a
JSONL file.
publishMotionState:
action: send
channel:
$ref: '#/channels/motionState'
summary: Producer publishes simple motion state for Home Assistant
publishBinStatus:
action: send
channel:
$ref: '#/channels/binStatus'
summary: Status update published as a retained message
components:
messages:
MotionEvent:
name: MotionEvent
contentType: application/json
payload:
type: object
properties:
"@context":
type: string
description: JSON-LD context reference
"@type":
type: string
description: Observation type
example: sosa:Observation
madeBySensor:
type: string
description: URI of the sensor that produced this event
example: "urn:dev:pir-01"
observedIn:
type: string
description: URI of the deployment environment
example: "urn:env:lab-room-101"
resultTime:
type: string
format: date-time
description: ISO 8601 timestamp of when the event occurred
hasSimpleResult:
type: string
description: Motion state
enum: [detected, clear]
seq:
type: integer
description: Sequence number within this pipeline run
run_id:
type: string
description: UUID identifying this pipeline run
MotionState:
name: MotionState
contentType: text/plain
payload:
type: string
enum: [detected, clear]
description: Simple motion state string for Home Assistant
BinStatus:
name: BinStatus
contentType: application/json
payload:
type: object
properties:
state:
type: string
description: Current bin state
enum: [active, emptied, full, maintenance]
location:
type: string
description: Human-readable location
last_motion:
type: string
format: date-time
description: Timestamp of most recent motion event
Ας δούμε τι περιγράφει αυτό:
servers — ο Mosquitto broker, η διεύθυνσή του, και το πρωτόκολλο. Ισοδύναμο του base URL στο OpenAPI.
channels — τα MQTT topics σας, με παραμέτρους για τα μεταβλητά τμήματα ({bin_id}, {sensor_id}). Κάθε channel αναφέρεται στο message schema που μεταφέρει.
operations — ποιος κάνει τι. Το publishMotionEvent είναι send action (ο producer κάνει publish), το consumeMotionEvent είναι receive action (ο consumer κάνει subscribe). Αυτό κάνει ρητό ποια components είναι publishers και ποια subscribers.
components/messages — τα message schemas. Ακριβώς όπως οι ορισμοί api.model στο Flask-RESTx, αυτά περιγράφουν τη μορφή κάθε μηνύματος. Το MotionEvent μήνυμα έχει όλα τα fields από τις JSON-LD observations σας. Το MotionState μήνυμα είναι απλώς ένα string.
Επεκτείνετέ το
Το παραπάνω παράδειγμα καλύπτει τρία channels. Σκεφτείτε τι άλλο χρησιμοποιεί το σύστημά σας:
- Οποιαδήποτε άλλα topics έχετε προσθέσει
Προσθέστε channels, operations, και message schemas για κάθε ένα.
Δείτε το
Ανοίξτε το Swagger Editor στο browser σας. Κάντε κλικ File → Import File και φορτώστε το asyncapi.yaml. Ο editor θα το αναλύσει και θα εμφανίσει interactive documentation — τα channels σας, τα message schemas, τις πληροφορίες server, και τα operations — όλα σε browsable interface.
Τραβήξτε screenshot της rendered AsyncAPI documentation σας για την αναφορά.
Γιατί και OpenAPI και AsyncAPI;
Το Smart Wastebin σύστημά σας έχει δύο interfaces:
| REST API (OpenAPI) | MQTT Interface (AsyncAPI) | |
|---|---|---|
| Pattern | Request/Response | Publish/Subscribe |
| Protocol | HTTP | MQTT |
| Χρήση | On-demand queries, commands | Real-time events, state updates |
| Παράδειγμα | GET /bins/bin-01/events?limit=10 |
Subscribe στο smartbin/bin-01/pir-01/events |
| Spec | OpenAPI (auto-generated από Flask-RESTx) | AsyncAPI (γραμμένο χειροκίνητα) |
Και τα δύο specs εξυπηρετούν τον ίδιο σκοπό: περιγράφουν επίσημα το interface σας ώστε κάποιος που δεν έχει δει ποτέ τον κώδικά σας να μπορεί να καταλάβει πώς να αλληλεπιδράσει με το σύστημά σας. Το OpenAPI καλύπτει την pull πλευρά. Το AsyncAPI καλύπτει την push πλευρά.
Μέρος 6 — Δοκιμή με Swagger UI και curl
Με το API σε λειτουργία, ανοίξτε http://<your-pi-ip>:5000 στο browser σας. Θα πρέπει να βλέπετε όλα τα endpoints οργανωμένα ανά namespace (bins, sensors, mqtt).
Δοκιμάστε κάθε endpoint:
- GET /bins — πρέπει να επιστρέψει τη λίστα με τα bins σας
- GET /bins/bin-01 — πρέπει να επιστρέψει λεπτομέρειες για το bin-01. Τι συμβαίνει αν ζητήσετε bin που δεν υπάρχει;
- GET /bins/bin-01/events — πρέπει να επιστρέψει motion events. Δοκιμάστε με διαφορετικές τιμές
limit - POST /bins/bin-01/emptied — συμπληρώστε το JSON body και υποβάλετε. Ελέγξτε ότι έρχεται response 201 και ότι το Home Assistant ενημερώνεται
- GET /sensors — πρέπει να εμφανίσει τους sensors σας
- GET /sensors/pir-01 — πρέπει να επιστρέψει λεπτομέρειες sensor
- POST /mqtt/publish — κάντε publish ένα motion event. Ελέγξτε ότι το Home Assistant και ο consumer σας το λαμβάνουν
- GET /mqtt/topics — δείτε όλα τα topics που παρακολουθεί το API
- GET /mqtt/topics/smartbin/bin-01/pir-01/motion — ελέγξτε το τελευταίο μήνυμα σε συγκεκριμένο topic
Τραβήξτε screenshots του Swagger UI που εμφανίζουν τα endpoints σας και τουλάχιστον μία επιτυχή response.
Δοκιμή με curl
# Λίστα όλων των bins
curl http://localhost:5000/bins/
# Ανάκτηση συγκεκριμένου bin
curl http://localhost:5000/bins/bin-01
# Ανάκτηση events με limit
curl "http://localhost:5000/bins/bin-01/events?limit=10"
# Σήμανση bin ως αδειασμένο (κάνει επίσης publish στο MQTT)
curl -X POST http://localhost:5000/bins/bin-01/emptied \
-H "Content-Type: application/json" \
-d '{"emptied_by": "maintenance-team-A"}'
# Publish στο MQTT μέσω API
curl -X POST http://localhost:5000/mqtt/publish \
-H "Content-Type: application/json" \
-d '{"topic": "smartbin/bin-01/pir-01/motion", "payload": "detected", "qos": 1}'
# Εμφάνιση όλων των MQTT topics
curl http://localhost:5000/mqtt/topics
# Αίτημα για bin που δεν υπάρχει
curl -i http://localhost:5000/bins/bin-99
Ελέγξτε ότι παίρνετε 200 για επιτυχημένα αιτήματα, 201 για το POST, και 404 για resources που δεν υπάρχουν.
Γιατί αυτό το εργαστήριο είναι δομημένο έτσι
Μέχρι τώρα, ο μόνος τρόπος για να βγάλετε δεδομένα από το σύστημά σας ήταν να παρακολουθείτε MQTT μηνύματα σε πραγματικό χρόνο ή να ανοίγετε ένα JSONL αρχείο. Και τα δύο απαιτούν να γνωρίζετε τα εσωτερικά του συστήματος — σε ποιο topic να κάνετε subscribe, πού βρίσκεται το αρχείο, πώς μοιάζει η μορφή. Ένα API βάζει μια καθαρή “μπροστινή πόρτα” στο σύστημα. Μια mobile εφαρμογή, ένα web dashboard, μια υπηρεσία route-planning, ή ένα εργαλείο συντήρησης μπορούν όλα να κάνουν query στο ίδιο API χωρίς να ξέρουν τίποτα για MQTT topics ή JSONL αρχεία. Απλώς στέλνουν HTTP requests και λαμβάνουν JSON responses.
Τα MQTT endpoints πηγαίνουν αυτό παρακάτω. Τυλίγοντας το MQTT publish και το topic state σε REST endpoints, το API σας γίνεται universal gateway. Ένας υπεύθυνος εγκαταστάσεων μπορεί να σημειώσει ότι ένας κάδος αδειάστηκε από μια web φόρμα, και το MQTT ecosystem (Home Assistant, ο consumer σας, άλλοι subscribers) ακούνε για αυτό αμέσως. Το REST interface σας τεκμηριώνεται από OpenAPI (auto-generated). Το MQTT interface σας τεκμηριώνεται από AsyncAPI (γραμμένο χειροκίνητα). Μαζί, περιγράφουν ολόκληρο το σύστημά σας σε τυπικές, machine-readable μορφές που οποιοσδήποτε developer μπορεί να διαβάσει και οποιοδήποτε εργαλείο να επεξεργαστεί.
Αυτός ο συνδυασμός MQTT και REST είναι ένα pattern που βλέπετε σε πραγματικές IoT πλατφόρμες. Το AWS IoT έχει MQTT για επικοινωνία συσκευών και REST APIs για διαχείριση. Το Azure IoT Hub κάνει το ίδιο. Το ίδιο το Home Assistant εκθέτει REST API παράλληλα με το MQTT integration. Το Smart Wastebin σας ακολουθεί τώρα την ίδια αρχιτεκτονική.
Ερωτήσεις αναφοράς
Απαντήστε τα παρακάτω στο labs/lab08/README.md σας αφού ολοκληρωθεί η υλοποίηση και τα πειράματα.
Σχεδιασμός API
RQ1: Καταγράψτε τον πλήρη σχεδιασμό του API σας — κάθε endpoint, η HTTP method του, το URI, ποιες παραμέτρους δέχεται, και τι επιστρέφει. Παρουσιάστε αυτό ως πίνακα.
RQ2: Γιατί τα endpoints λίστας events χρησιμοποιούν GET και όχι POST;
RQ3: Γιατί το endpoint “mark as emptied” χρησιμοποιεί POST και όχι PUT; Σκεφτείτε το idempotency.
RQ4: Πώς χειριστήκατε την περίπτωση όπου ένας client ζητά bin ή sensor που δεν υπάρχει; Ποιον status code επιστρέφετε και γιατί;
Υλοποίηση
RQ5: Από πού διαβάζει το API τα δεδομένα του; Ιχνηλατήστε τη διαδρομή των event data από τον PIR sensor μέχρι μια API response.
RQ6: Ποιες query parameters υποστηρίζει το events endpoint σας; Δείξτε παράδειγμα request και response.
RQ7: Πώς σχετίζονται τα Flask-RESTx models (api.model) με την τεκμηρίωση Swagger UI; Τι συμβαίνει στο UI όταν προσθέτετε νέο field σε ένα model;
RQ8: Δείξτε screenshot του Swagger UI σας με ορατά endpoints.
MQTT endpoints
RQ9: Εξηγήστε πώς λειτουργεί το POST /mqtt/publish endpoint. Τι κάνει το API όταν λαμβάνει αίτημα publish;
RQ10: Κάνατε publish ένα motion event μέσω του API χρησιμοποιώντας POST /mqtt/publish. Περιγράψτε τη διαδρομή που ακολουθεί αυτό το μήνυμα, από το HTTP request μέχρι το JSONL αρχείο του consumer.
RQ11: Τι επιστρέφει το GET /mqtt/topics; Γιατί πρέπει το API να κάνει subscribe στο smartbin/# για να λειτουργεί αυτό;
RQ12: Καλείτε POST /bins/bin-01/emptied. Αυτό τόσο αποθηκεύει ένα record όσο και κάνει publish στο MQTT. Ποιο είναι το πλεονέκτημα του συνδυασμού και των δύο actions σε ένα endpoint;
AsyncAPI
RQ13: Τι είναι το AsyncAPI και πώς σχετίζεται με το OpenAPI; Γιατί χρειάζεστε και τα δύο για το Smart Wastebin;
RQ14: Πόσα channels τεκμηριώσατε στο AsyncAPI spec σας; Για κάθε ένα, αναφέρετε ποιος είναι ο publisher και ποιος ο subscriber.
RQ15: Δείξτε screenshot του AsyncAPI spec σας rendered σε Swagger Editor ή AsyncAPI Studio.
RQ16: Συγκρίνετε το MotionEvent message schema στο AsyncAPI spec σας με το event_model στον Flask-RESTx κώδικά σας. Περιγράφουν τα ίδια δεδομένα — τι διαφέρει στο context χρήσης του καθενός;
Δοκιμή
RQ17: Δείξτε την εντολή curl και response για: (α) λίστα όλων των bins, (β) ανάκτηση events με limit, (γ) publish MQTT μηνύματος, (δ) αίτημα για bin που δεν υπάρχει.
RQ18: Ποια είναι η διαφορά μεταξύ δοκιμής με Swagger UI και με curl; Πότε θα χρησιμοποιούσατε το καθένα;
Αναστοχασμός
RQ19: Ένα νέο μέλος ομάδας μπαίνει στο project σας. Πρέπει να φτιάξει mobile εφαρμογή που εμφανίζει κατάσταση bin και επιτρέπει σε χρήστες να αναφέρουν γεμάτους κάδους. Τι του δίνετε; Πώς βοηθούν το Swagger UI και το AsyncAPI spec;
RQ20: Με δικά σας λόγια, εξηγήστε γιατί το Smart Wastebin χρειάζεται τόσο push-based σύστημα (MQTT) όσο και pull-based σύστημα (REST API). Τι θα έλειπε αν είχατε μόνο το ένα;
Υπόδειξη project: Smart Wastebin
Το API που φτιάξατε είναι το interface που θα χρησιμοποιούν άλλες εφαρμογές για να αλληλεπιδρούν με το σύστημά σας. Καθώς προσθέτετε περισσότερους sensors και λειτουργικότητα, και το REST API και το AsyncAPI spec μεγαλώνουν μαζί.
Σκεφτείτε το τελικό project demo σας. Μπορείτε να εμφανίσετε το Swagger UI με όλα τα REST endpoints, την AsyncAPI documentation με όλα τα MQTT channels, και το Home Assistant dashboard με live state — τρεις απόψεις ενός συστήματος, καθεμία εξυπηρετεί διαφορετικό κοινό (app developers, τελικοί χρήστες κ.λπ.).
Τι πρέπει να έχει ολοκληρωθεί πριν φύγετε από το εργαστήριο
Πριν το τέλος της συνεδρίας θα πρέπει να έχετε: σχεδιάσει τα API endpoints σας, υλοποιήσει το API με Flask και Flask-RESTx, δημιουργήσει namespaces για bins, sensors, και MQTT, συμπεριλάβει GET endpoints για λίστες και detail views, συμπεριλάβει POST endpoints για καταγραφή events και publishing MQTT μηνυμάτων, προσθέσει υποστήριξη query parameters για events, χειριστεί error cases με κατάλληλους status codes, συνδέσει το API στον MQTT broker, γράψει AsyncAPI spec που καλύπτει όλα τα MQTT topics σας, δοκιμάσει όλα τα endpoints μέσω Swagger UI και curl, τρέξει το πλήρες σύστημα μαζί, ενημερώσει το labs/lab08/README.md, και κάνει push στο GitHub.
Τελικό checklist (Lab 08)
-
api.pyδημιουργημένο με Flask και Flask-RESTx - Swagger UI προσβάσιμο στη port 5000
-
binsnamespace με list, detail, και events endpoints -
sensorsnamespace με list και detail endpoints - Events endpoint με query parameter
limit - Endpoint
POST /bins/<bin_id>/emptiedλειτουργικό -
mqttnamespace με publish και topics endpoints - API συνδεδεμένο στον Mosquitto broker
-
POST /mqtt/publishμπορεί να ενεργοποιήσει MQTT μηνύματα (επαληθευμένο στο Home Assistant ή consumer) -
GET /mqtt/topicsεμφανίζει tracked topic state - Response models ορισμένα για Swagger documentation
- 404 επιστρέφεται για resources που δεν υπάρχουν
- API διαβάζει event data από JSONL output του consumer
-
asyncapi.yamlδημιουργημένο με channels, operations, και message schemas - AsyncAPI spec καλύπτει όλα τα MQTT topics που χρησιμοποιεί το σύστημα
- AsyncAPI spec προβλέψιμο σε Swagger Editor ή AsyncAPI Studio
- Δοκιμασμένο μέσω Swagger UI (screenshots στην αναφορά)
- Δοκιμασμένο μέσω
curl -
labs/lab08/README.mdπεριέχει API design, κώδικα, screenshots, AsyncAPI spec, και απαντήσεις αναφοράς - Commit και push ολοκληρωμένα
Παραδοτέα και υποβολή
Τι πρέπει να υπάρχει στο αποθετήριο (έως το τέλος του εργαστηρίου)
/
├── README.md
├── labs/
│ ├── lab01/
│ ├── ...
│ └── lab08/
│ ├── README.md
│ ├── requirements.txt
│ ├── api.py
│ ├── asyncapi.yaml
│ ├── producer.py
│ ├── consumer.py
│ └── pirlib/
│ ├── __init__.py
│ ├── sampler.py
│ └── interpreter.py
Μην συμπεριλάβετε:
venv/__pycache__/*.pycoutput/ή*.jsonl- μεγάλα προσωρινά αρχεία εκτός αν ζητηθεί ρητά
Τι πρέπει να περιέχει το labs/lab08/README.md
Δύο σαφώς διαχωρισμένα μέρη:
- Κώδικας / runbook — συμπεριλάβετε τον πίνακα σχεδιασμού API, υλοποιήσεις endpoints (ή σχετικά αποσπάσματα), screenshots Swagger UI, AsyncAPI spec (ή σχετικά αποσπάσματα), screenshot rendered AsyncAPI, και πώς να τρέξετε το πλήρες σύστημα
- Απαντήσεις στις ερωτήσεις αναφοράς
Ίδιο στυλ με τα προηγούμενα εργαστήρια.
Τέλος εργαστηριακής συνεδρίας — GitHub checkpoint
Πριν φύγετε:
- κάντε commit την πρόοδό σας
- κάντε push στο αποθετήριο GitHub της ομάδας σας
Ελάχιστη προσδοκία:
- όλα τα παραδοτέα παρακολουθούνται από το Git
- το τελευταίο commit έχει γίνει push
- το commit message είναι σαφές
Πριν το επόμενο εργαστήριο — υποβολή στο eClass
Υποβάλετε και τα δύο:
- Αρχείο κώδικα (
.zip) - PDF export του
labs/lab08/README.md
Απαιτούμενη μορφή ονόματος PDF αρχείου:
lab08_REPORT_<team>.pdf