OpenAPI/Swagger

Lab 08 — Building a REST API for the Smart Wastebin

Deadlines:

  • End of lab session (GitHub checkpoint): commit & push your progress to your team repository.
  • Before next lab (eClass submission): upload (1) a .zip with your code and (2) a PDF export of labs/lab08/README.md.

Submission contents:

  • (1) a .zip with your code, and
  • (2) a PDF export of labs/lab08/README.md.

Intro to what you need to do

In this lab you will build a REST API that sits alongside your existing pipeline and serves data from it. You will also build MQTT endpoints into the API, so that a client can publish messages to MQTT topics through a simple HTTP request. Finally, you will document your MQTT interface with AsyncAPI, the event-driven equivalent of OpenAPI. Your REST endpoints get auto-documented by Flask-RESTx. Your MQTT topics get documented by an AsyncAPI spec. Together, they describe your entire system.

You will use Flask as the web framework and Flask-RESTx to define your endpoints and auto-generate Swagger UI documentation.

The system now has three communication layers:

┌──────────────────────────────────────────────────────────────┐
│                    Smart Wastebin System                     │
│                                                              │
│  PIR sensor ──▶ producer ──▶ MQTT broker ──▶ consumer      │
│                              ▲    │             │            │
│                              │    ▼             ▼            │
│                         REST API  HA        JSONL file       │
│                         (publish  (live        │             │
│                          + query) dashboard)   │             │
│                              ▲                 │             │
│                              │                 ▼             │
│                          Swagger UI ◀────── read events      │
│                       (docs + test)                          │
│                                                              │
│  AsyncAPI spec ──▶ describes the MQTT interface              │
│  OpenAPI spec  ──▶ describes the REST interface (auto)       │
└──────────────────────────────────────────────────────────────┘

Create the following structure:

/
├── README.md
├── labs/
│   ├── lab01/
│   ├── ...
│   └── lab08/
│       ├── README.md
│       ├── requirements.txt
│       ├── api.py
│       ├── asyncapi.yaml
│       ├── producer.py
│       ├── consumer.py
│       └── pirlib/
│           ├── __init__.py
│           ├── sampler.py
│           └── interpreter.py

Copy your producer.py, consumer.py, and pirlib/ from Lab 06/07. The new files are api.py (the REST API server) and asyncapi.yaml (the MQTT interface documentation).

Part 1 — Set up Flask and Flask-RESTx

Install the required packages. Add them to your requirements.txt:

flask
flask-restx
...

Install:

pip install flask flask-restx

Create api.py with a minimal Flask-RESTx application to make sure everything works:

from flask import Flask
from flask_restx import Api, Resource

app = Flask(__name__)
api = Api(
    app,
    version="1.0",
    title="Smart Wastebin API",
    description="REST API for querying Smart Wastebin sensor data and bin status",
)

ns = api.namespace("bins", description="Wastebin operations")


@ns.route("/")
class BinList(Resource):
    def get(self):
        """List all registered bins."""
        return {"bins": []}, 200


if __name__ == "__main__":
    app.run(debug=True, host="0.0.0.0", port=5000)

Run it:

python api.py

Open your browser and go to http://<your-pi-ip>:5000. You should see the Swagger UI, an interactive documentation page showing your API with one endpoint. Click on it, click “Try it out”, and click “Execute.” You should get back {"bins": []} with a 200 status code.

This is your starting point. Every endpoint you add will automatically appear in the Swagger UI, documented and testable. You never need to write documentation separately (for the rest interface at least) it comes from the code.

Part 2 — Design your API

Before writing more code, think about what your API should expose. The lecture emphasized designing the API first i.e deciding on the resources, URIs, and verbs before implementing anything.

Your resources are:

  • Bins — the wastebins themselves (status, location, metadata)
  • Sensors — the sensors mounted on the bins
  • Events — the motion events your pipeline produces (stored in the JSONL file)
  • MQTT — an interface for publishing to and reading from MQTT topics through HTTP

Design your endpoints following REST conventions. Here is a starting point — you should expand this:

Method URI Description
GET /bins List all bins
GET /bins/<bin_id> Get details for a specific bin
GET /bins/<bin_id>/sensors List sensors on a specific bin
GET /bins/<bin_id>/events Get motion events for a specific bin
GET /sensors List all sensors
GET /sensors/<sensor_id> Get details for a specific sensor
POST /bins/<bin_id>/emptied Record that a bin was emptied
POST /mqtt/publish Publish a message to an MQTT topic
GET /mqtt/topics List known MQTT topics and their last retained value

Think about:

  • What parameters should the events endpoint accept? At minimum, a limit to control how many records come back. Maybe a date range (start, end)?
  • What should the response look like? What fields does each resource need?
  • What status codes should each endpoint return? 200 for success, 404 if a bin does not exist, 400 if the query parameters are invalid?
  • For the MQTT publish endpoint, what does the client need to send (topic, payload, QoS, retain)?

Write down your API design in your report before implementing it.

Part 3 — Implement the data endpoints

Now build the data-serving side of the API. You will read bin and sensor information from your JSON-LD models (from Lab 05), and event data from your JSONL output files.

Load your data

Your API needs to read from two sources:

  1. The JSON-LD model files (models/sensor.jsonld, models/wastebin.jsonld, models/environment.jsonld) for bin and sensor metadata
  2. The JSONL event log written by your consumer for motion events

For the model files, load them at startup:

IMPORT json module
IMPORT os module

SET DATA_DIR TO the path of the current file's directory joined with "data"
SET EVENTS_FILE TO the path of DATA_DIR joined with "motion_events.jsonl"


FUNCTION load_json(filepath):
    OPEN the file at filepath in read mode
    READ and parse the file contents as JSON
    RETURN the parsed JSON data


FUNCTION load_events(filepath, limit = none, sensor_id = none):
    CREATE an empty list called events

    IF the file at filepath does not exist:
        RETURN events

    OPEN the file at filepath in read mode

    FOR each line in the file:
        REMOVE leading and trailing whitespace from the line

        IF the line is empty:
            SKIP to the next line

        TRY:
            PARSE the line as JSON and store it as record

            IF sensor_id was provided AND record's "madeBySensor" value is not equal to sensor_id:
                SKIP to the next line

            ADD record to events

        IF the line cannot be parsed as JSON:
            SKIP to the next line

    REVERSE events so the most recent records come first

    IF limit was provided:
        KEEP only the first limit records in events

    RETURN events

Define your models for Swagger

Flask-RESTx lets you define response models so Swagger UI shows the expected response structure:

bin_model = api.model("Bin", {
    "id": fields.String(required=True, description="Bin unique identifier"),
    "name": fields.String(description="Human-readable name"),
    "location": fields.String(description="Deployment location"),
    "status": fields.String(description="Current status"),
})

event_model = api.model("Event", {
    "resultTime": fields.String(description="ISO timestamp of the event"),
    "madeBySensor": fields.String(description="Sensor ID that produced this event"),
    "hasSimpleResult": fields.String(description="Motion state (detected/clear)"),
    "pipeline_latency_ms": fields.Float(description="Pipeline latency in ms"),
})

These models do not enforce anything at runtime as they are documentation. They tell anyone reading the Swagger UI exactly what the response looks like.

Implement the endpoints

Here is an example of a more complete bins endpoint. You need to expand and adapt this to your data:

from flask_restx import reqparse

# Parser for query parameters
events_parser = reqparse.RequestParser()
events_parser.add_argument("limit", type=int, default=50, help="Max events to return")
events_parser.add_argument("start", type=str, help="Start datetime (ISO format)")
events_parser.add_argument("end", type=str, help="End datetime (ISO format)")


@ns.route("/")
class BinList(Resource):
    @ns.marshal_list_with(bin_model)
    def get(self):
        """List all registered bins."""
        # Load from your model files or a registry
        return bins_registry


@ns.route("/<string:bin_id>")
@ns.param("bin_id", "The bin identifier")
@ns.response(404, "Bin not found")
class Bin(Resource):
    @ns.marshal_with(bin_model)
    def get(self, bin_id):
        """Get details for a specific bin."""
        bin_data = find_bin(bin_id)
        if not bin_data:
            api.abort(404, f"Bin {bin_id} not found")
        return bin_data


@ns.route("/<string:bin_id>/events")
@ns.param("bin_id", "The bin identifier")
class BinEvents(Resource):
    @ns.expect(events_parser)
    @ns.marshal_list_with(event_model)
    def get(self, bin_id):
        """Get motion events for a specific bin."""
        args = events_parser.parse_args()
        events = load_events(
            EVENTS_FILE,
            limit=args["limit"],
            sensor_id=get_sensor_for_bin(bin_id),
        )
        return events

A few things to notice:

@ns.marshal_with(bin_model) tells Flask-RESTx to format the response according to the model and show it in Swagger UI. The documentation stays in sync with the code and there is no separate doc file to maintain.

@ns.expect(events_parser) documents the query parameters in Swagger. When someone opens the UI, they see that /bins/bin-01/events accepts limit, start, and end parameters and can fill them in before hitting “Execute.”

api.abort(404, ...) returns a 404 response with a message. This is how you tell the client “that resource does not exist”, using the same status codes from the lecture.

Add the POST endpoint

Add an endpoint for recording that a bin was emptied (in general I have provided you with pseudocode examples, feel free to modify or add as you see fit):

DEFINE emptied_model AS an API model named "EmptiedRecord" WITH:
    FIELD "bin_id" AS string WITH description "Bin identifier"
    FIELD "emptied_at" AS string WITH description "ISO timestamp of when the bin was emptied"
    FIELD "emptied_by" AS string WITH description "Who emptied the bin"


DEFINE API ROUTE "/<bin_id>/emptied"
DOCUMENT route parameter "bin_id" AS "The bin identifier"

CLASS BinEmptied EXTENDS Resource:

    EXPECT request body to match emptied_model
    DOCUMENT response 201 AS "Bin marked as emptied"
    DOCUMENT response 404 AS "Bin not found"

    FUNCTION post(bin_id):
        RECORD that a bin was emptied

        SET bin_data TO the result of find_bin(bin_id)

        IF bin_data does not exist:
            STOP the request WITH status 404 AND message "Bin <bin_id> not found"

        SET data TO the incoming API request payload

        CREATE record WITH:
            "bin_id" SET TO bin_id
            "emptied_at" SET TO data["emptied_at"] IF provided,
                OTHERWISE SET TO the current UTC timestamp in ISO format ending with "Z"
            "emptied_by" SET TO data["emptied_by"] IF provided,
                OTHERWISE SET TO "unknown"

        SAVE record

        RETURN record WITH status code 201

This returns 201 (Created) on success. POST is not idempotent, calling it twice creates two emptied records, which is the correct behavior (the bin was emptied twice).

Add a sensors namespace

Create a second namespace for sensor endpoints:

DEFINE sensors_ns AS an API namespace named "sensors"
SET its description TO "Sensor operations"


DEFINE sensor_model AS an API model named "Sensor" WITH:
    FIELD "id" AS string, required, WITH description "Sensor unique identifier"
    FIELD "type" AS string WITH description "Sensor type (PIR, ultrasonic, etc.)"
    FIELD "model" AS string WITH description "Hardware model"
    FIELD "mounted_on" AS string WITH description "Bin this sensor is mounted on"
    FIELD "status" AS string WITH description "Current sensor status"


DEFINE API ROUTE "/sensors/"

CLASS SensorList EXTENDS Resource:

    FORMAT the response as a list of sensor_model objects

    FUNCTION get():
        LIST all registered sensors

        RETURN sensors_registry


DEFINE API ROUTE "/sensors/<sensor_id>"
DOCUMENT route parameter "sensor_id" AS "The sensor identifier"
DOCUMENT response 404 AS "Sensor not found"

CLASS Sensor EXTENDS Resource:

    FORMAT the response as a sensor_model object

    FUNCTION get(sensor_id):
        GET details for a specific sensor

        SET sensor TO the result of find_sensor(sensor_id)

        IF sensor does not exist:
            STOP the request WITH status 404 AND message "Sensor <sensor_id> not found"

        RETURN sensor

Part 4 — Add MQTT endpoints

Your REST API serves data from files. Now you will give it the ability to talk to your MQTT broker (mostly as an exercise), so a client can publish messages and read topic state through plain HTTP requests.

Connect the API to the broker

Set up an MQTT client inside your Flask app. This client stays connected to the broker for the lifetime of the API server:

IMPORT MQTT client library
IMPORT threading library

CREATE an MQTT client named "wastebin-api"
SET clean_session TO false so the session can persist

CREATE an empty dictionary called topic_store
CREATE a thread lock called topic_lock


FUNCTION on_message(client, userdata, msg):
    STORE every message received by the API client

    LOCK topic_lock so only one thread can update topic_store at a time

    SET topic_store[msg.topic] TO a record WITH:
        "topic" SET TO msg.topic
        "payload" SET TO msg.payload decoded as UTF-8,
            replacing invalid characters if needed
        "qos" SET TO msg.qos
        "retain" SET TO msg.retain
        "timestamp" SET TO the current UTC timestamp in ISO format ending with "Z"

    UNLOCK topic_lock


SET mqtt_client's message handler TO on_message

CONNECT mqtt_client TO the MQTT broker at "localhost" on port 1883
SET keepalive TO 60 seconds

SUBSCRIBE mqtt_client TO all topics under "smartbin/"
SET subscription QoS TO 1

START mqtt_client's background network loop

The API subscribes to smartbin/# so it can track the latest message on every topic. This builds up a live view of the MQTT state that the API can serve on demand.

Publish endpoint

Create a namespace for MQTT operations:

DEFINE mqtt_ns AS an API namespace named "mqtt"
SET its description TO "MQTT broker interaction"


DEFINE publish_model AS an API model named "MQTTPublish" WITH:
    FIELD "topic" AS string, required, WITH description "MQTT topic to publish to"
    FIELD "payload" AS string, required, WITH description "Message payload"
    FIELD "qos" AS integer WITH description "Quality of Service (0, 1, or 2)" AND default value 1
    FIELD "retain" AS boolean WITH description "Retain this message on the broker" AND default value false


DEFINE API ROUTE "/mqtt/publish"

CLASS MQTTPublish EXTENDS Resource:

    EXPECT request body to match publish_model
    DOCUMENT response 200 AS "Message published"
    DOCUMENT response 400 AS "Invalid request"

    FUNCTION post():
        PUBLISH a message to an MQTT topic

        SET data TO the incoming API request payload

        SET topic TO data["topic"]
        SET payload TO data["payload"]
        SET qos TO data["qos"] IF provided, OTHERWISE 1
        SET retain TO data["retain"] IF provided, OTHERWISE false

        IF topic is missing OR payload is missing:
            STOP the request WITH status 400 AND message "Both 'topic' and 'payload' are required"

        IF qos is not 0, 1, or 2:
            STOP the request WITH status 400 AND message "QoS must be 0, 1, or 2"

        PUBLISH payload TO topic using mqtt_client
            WITH qos set to qos
            WITH retain set to retain

        STORE the publish result as result

        RETURN a response WITH:
            "status" SET TO "published"
            "topic" SET TO topic
            "payload" SET TO payload
            "qos" SET TO qos
            "retain" SET TO retain
            "mqtt_rc" SET TO result.rc

        RETURN status code 200

Try it. Open Swagger UI, go to the mqtt namespace, and publish a test message:

{
  "topic": "smartbin/bin-01/pir-01/motion",
  "payload": "detected",
  "qos": 1,
  "retain": false
}

You should see the motion sensor entity update. You just triggered an MQTT event from a web browser.

Topics endpoint

Add an endpoint that shows the current state of all known topics:

DEFINE API ROUTE "/mqtt/topics"

CLASS MQTTTopics EXTENDS Resource:

    FUNCTION get():
        LIST all known MQTT topics and their last received message

        LOCK topic_lock so topic_store can be read safely

        CREATE response WITH:
            "topic_count" SET TO the number of entries in topic_store
            "topics" SET TO a list of all stored topic message records

        RETURN response WITH status code 200

        UNLOCK topic_lock


DEFINE API ROUTE "/mqtt/topics/<topic>"
DOCUMENT route parameter "topic" AS "MQTT topic path, for example smartbin/bin-01/pir-01/motion"

CLASS MQTTTopicDetail EXTENDS Resource:

    DOCUMENT response 404 AS "Topic not found or no message received yet"

    FUNCTION get(topic):
        GET the last received message for a specific MQTT topic

        LOCK topic_lock so topic_store can be read safely

        IF topic does not exist in topic_store:
            STOP the request WITH status 404
            INCLUDE message "No message received on topic '<topic>'"

        RETURN topic_store[topic] WITH status code 200

        UNLOCK topic_lock

Now you can check the entire MQTT state from your browser without needing mosquitto_sub:

curl http://localhost:5000/mqtt/topics
curl http://localhost:5000/mqtt/topics/smartbin/bin-01/pir-01/motion

Combining REST actions with MQTT

The “mark bin as emptied” endpoint can also publish a status update to MQTT, so Home Assistant and other subscribers hear about it:

DEFINE API ROUTE "/<bin_id>/emptied"
DOCUMENT route parameter "bin_id" AS "The bin identifier"

CLASS BinEmptied EXTENDS Resource:

    EXPECT request body to match emptied_model
    DOCUMENT response 201 AS "Bin marked as emptied"
    DOCUMENT response 404 AS "Bin not found"

    FUNCTION post(bin_id):
        RECORD that a bin was emptied
        PUBLISH a status update through MQTT

        SET bin_data TO the result of find_bin(bin_id)

        IF bin_data does not exist:
            STOP the request WITH status 404 AND message "Bin <bin_id> not found"

        SET data TO the incoming API request payload

        CREATE record WITH:
            "bin_id" SET TO bin_id
            "emptied_at" SET TO data["emptied_at"] IF provided,
                OTHERWISE SET TO the current UTC timestamp in ISO format ending with "Z"
            "emptied_by" SET TO data["emptied_by"] IF provided,
                OTHERWISE SET TO "unknown"

        SAVE record to file

        PUBLISH an MQTT message:
            TOPIC: "smartbin/<bin_id>/status"
            PAYLOAD: JSON object WITH:
                "state" SET TO "emptied"
                "emptied_at" SET TO record["emptied_at"]
            QOS: 1
            RETAIN: true

        RETURN record WITH status code 201

A single API call both records the event and notifies the MQTT ecosystem. The API acts as a gateway between the HTTP and MQTT worlds.

Part 5 — Document your MQTT interface with AsyncAPI

Your REST endpoints are self-documenting thanks to Flask-RESTx and Swagger UI. But what about your MQTT interface? Your producer publishes to topics like smartbin/bin-01/pir-01/events, your consumer subscribes, Home Assistant listens, but none of this is formally documented anywhere. Someone joining your project has to read your code to figure out what topics exist, what the message format is, and what QoS to use.

AsyncAPI solves this. It is the event-driven equivalent of OpenAPI. Where OpenAPI describes REST endpoints (request/response), AsyncAPI describes message channels (publish/subscribe). You write a specification in YAML, and tools generate interactive documentation, just like Swagger UI does automatically for your REST API. Swagger Editor now supports AsyncAPI specifications, so you can use the same tool for both.

Write the spec

Create asyncapi.yaml in your labs/lab08/ directory. This file describes your MQTT interface: what topics exist, who publishes and who subscribes, what the messages look like, and what broker they flow through.

Here is a starting point, you need to expand it to cover all your topics:

asyncapi: 3.0.0
info:
  title: Smart Wastebin MQTT Interface
  version: 1.0.0
  description: |
    Event-driven messaging interface for the Smart Wastebin system.
    All communication goes through an MQTT broker (Mosquitto).    

servers:
  mosquitto:
    host: localhost:1883
    protocol: mqtt
    description: Local Mosquitto broker on the Raspberry Pi

channels:
  motionEvents:
    address: smartbin/{bin_id}/{sensor_id}/events
    description: Full motion event records published by the producer
    parameters:
      bin_id:
        description: Wastebin identifier (e.g., bin-01)
      sensor_id:
        description: Sensor identifier (e.g., pir-01)
    messages:
      motionEvent:
        $ref: '#/components/messages/MotionEvent'

  motionState:
    address: smartbin/{bin_id}/{sensor_id}/motion
    description: Simple motion state for Home Assistant (detected/clear)
    parameters:
      bin_id:
        description: Wastebin identifier
      sensor_id:
        description: Sensor identifier
    messages:
      motionState:
        $ref: '#/components/messages/MotionState'

  binStatus:
    address: smartbin/{bin_id}/status
    description: Retained message with current bin status and metadata
    parameters:
      bin_id:
        description: Wastebin identifier
    messages:
      binStatus:
        $ref: '#/components/messages/BinStatus'

operations:
  publishMotionEvent:
    action: send
    channel:
      $ref: '#/channels/motionEvents'
    summary: Producer publishes a full motion event record
    description: |
      Published by the producer whenever the PIR sensor detects motion.
      Contains the full observation record with timestamps, sensor reference,
      and JSON-LD context.      

  consumeMotionEvent:
    action: receive
    channel:
      $ref: '#/channels/motionEvents'
    summary: Consumer receives motion events and writes them to JSONL
    description: |
      The consumer subscribes to this channel, enriches each event with
      ingest_time and pipeline_latency_ms, and writes the result to a
      JSONL file.      

  publishMotionState:
    action: send
    channel:
      $ref: '#/channels/motionState'
    summary: Producer publishes simple motion state for Home Assistant

  publishBinStatus:
    action: send
    channel:
      $ref: '#/channels/binStatus'
    summary: Status update published as a retained message

components:
  messages:
    MotionEvent:
      name: MotionEvent
      contentType: application/json
      payload:
        type: object
        properties:
          "@context":
            type: string
            description: JSON-LD context reference
          "@type":
            type: string
            description: Observation type
            example: sosa:Observation
          madeBySensor:
            type: string
            description: URI of the sensor that produced this event
            example: "urn:dev:pir-01"
          observedIn:
            type: string
            description: URI of the deployment environment
            example: "urn:env:lab-room-101"
          resultTime:
            type: string
            format: date-time
            description: ISO 8601 timestamp of when the event occurred
          hasSimpleResult:
            type: string
            description: Motion state
            enum: [detected, clear]
          seq:
            type: integer
            description: Sequence number within this pipeline run
          run_id:
            type: string
            description: UUID identifying this pipeline run

    MotionState:
      name: MotionState
      contentType: text/plain
      payload:
        type: string
        enum: [detected, clear]
        description: Simple motion state string for Home Assistant

    BinStatus:
      name: BinStatus
      contentType: application/json
      payload:
        type: object
        properties:
          state:
            type: string
            description: Current bin state
            enum: [active, emptied, full, maintenance]
          location:
            type: string
            description: Human-readable location
          last_motion:
            type: string
            format: date-time
            description: Timestamp of most recent motion event

Walk through what this describes:

servers — the Mosquitto broker, its address, and the protocol. Equivalent to the base URL in OpenAPI.

channels — your MQTT topics, with parameters for the variable parts ({bin_id}, {sensor_id}). Each channel references the message schema it carries.

operations — who does what. publishMotionEvent is a send action (the producer publishes), consumeMotionEvent is a receive action (the consumer subscribes). This makes it explicit which components are publishers and which are subscribers.

components/messages — the message schemas. Just like the api.model definitions in Flask-RESTx, these describe the shape of each message. The MotionEvent message has all the fields from your JSON-LD observations. The MotionState message is just a simple string.

Expand it

The example above covers three channels. Think about what else your system uses:

  • Any other topics you have added

Add channels, operations, and message schemas for each one.

View it

Open Swagger Editor in your browser. Click FileImport File and load your asyncapi.yaml. The editor will parse it and show interactive documentation, your channels, message schemas, server info, and operations, all rendered in a browsable interface.

Take a screenshot of your rendered AsyncAPI documentation for the report.

Why both OpenAPI and AsyncAPI?

Your Smart Wastebin system has two interfaces:

REST API (OpenAPI) MQTT Interface (AsyncAPI)
Pattern Request/Response Publish/Subscribe
Protocol HTTP MQTT
Use case On-demand queries, commands Real-time events, state updates
Example GET /bins/bin-01/events?limit=10 Subscribe to smartbin/bin-01/pir-01/events
Spec OpenAPI (auto-generated by Flask-RESTx) AsyncAPI (written by hand)

Both specs serve the same purpose: they formally describe your interface so that someone who has never seen your code can understand how to interact with your system. OpenAPI covers the pull side. AsyncAPI covers the push side.

Part 6 — Test with Swagger UI and curl

With the API running, open http://<your-pi-ip>:5000 in your browser. You should see all your endpoints organized by namespace (bins, sensors, mqtt).

Test each endpoint:

  1. GET /bins — should return your list of bins
  2. GET /bins/bin-01 — should return details for bin-01. What happens if you request a bin that does not exist?
  3. GET /bins/bin-01/events — should return motion events. Try with different limit values
  4. POST /bins/bin-01/emptied — fill in the JSON body and submit. Check that a 201 response comes back and that Home Assistant updates
  5. GET /sensors — should list your sensors
  6. GET /sensors/pir-01 — should return sensor details
  7. POST /mqtt/publish — publish a motion event. Check that Home Assistant and your consumer both receive it
  8. GET /mqtt/topics — see all the topics the API is tracking
  9. GET /mqtt/topics/smartbin/bin-01/pir-01/motion — check the last message on a specific topic

Take screenshots of the Swagger UI showing your endpoints and at least one successful response.

Test with curl

# List all bins
curl http://localhost:5000/bins/

# Get a specific bin
curl http://localhost:5000/bins/bin-01

# Get events with a limit
curl "http://localhost:5000/bins/bin-01/events?limit=10"

# Mark a bin as emptied (also publishes to MQTT)
curl -X POST http://localhost:5000/bins/bin-01/emptied \
  -H "Content-Type: application/json" \
  -d '{"emptied_by": "maintenance-team-A"}'

# Publish to MQTT through the API
curl -X POST http://localhost:5000/mqtt/publish \
  -H "Content-Type: application/json" \
  -d '{"topic": "smartbin/bin-01/pir-01/motion", "payload": "detected", "qos": 1}'

# See all MQTT topics
curl http://localhost:5000/mqtt/topics

# Request a bin that does not exist
curl -i http://localhost:5000/bins/bin-99

Check that you get 200 for successful requests, 201 for the POST, and 404 for nonexistent resources.

Why this lab is structured this way

Until now, the only way to get data out of your system was to either watch MQTT messages in real time or open a JSONL file. Both require you to know the internals, which topic to subscribe to, where the file is, what the format looks like. An API puts a clean front door on the system. A mobile app, a web dashboard, a route-planning service, or a maintenance tool can all query the same API without knowing anything about MQTT topics or JSONL files. They just send HTTP requests and get JSON responses.

The MQTT endpoints take this further. By wrapping MQTT publish and topic state in REST endpoints, your API becomes a universal gateway. A facilities manager can mark a bin as emptied from a web form, and the MQTT ecosystem (Home Assistant, your consumer, other subscribers) all hear about it instantly. Your REST interface is documented by OpenAPI (auto-generated). Your MQTT interface is documented by AsyncAPI (written by hand). Together, they describe your entire system in standard, machine-readable formats that any developer can read and any tool can process.

This combination of MQTT and REST is a pattern you see in real IoT platforms. AWS IoT has MQTT for device communication and REST APIs for management. Azure IoT Hub does the same. Home Assistant itself exposes a REST API alongside its MQTT integration. Your Smart Wastebin now follows the same architecture.

Report questions

Answer the following in your labs/lab08/README.md after the implementation and experiments are complete.

API design

RQ1: Write down your complete API design, every endpoint, its HTTP method, the URI, what parameters it accepts, and what it returns. Present this as a table.
RQ2: Why do the event-listing endpoints use GET and not POST? RQ3: Why does the “mark as emptied” endpoint use POST and not PUT? Think about idempotency.
RQ4: How did you handle the case where a client requests a bin or sensor that does not exist? What status code do you return and why?

Implementation

RQ5: Where does your API read its data from? Trace the path of event data from the PIR sensor all the way to an API response.
RQ6: What query parameters does your events endpoint support? Show an example request and response.
RQ7: How do the Flask-RESTx models (api.model) relate to the Swagger UI documentation? What happens in the UI when you add a new field to a model?
RQ8: Show a screenshot of your Swagger UI with endpoints visible.

MQTT endpoints

RQ9: Explain how the POST /mqtt/publish endpoint works. What does the API do when it receives a publish request?
RQ10: You published a motion event through the API using POST /mqtt/publish. Describe the full path that message takes, from the HTTP request to the consumer’s JSONL file.
RQ11: What does GET /mqtt/topics return? Why does the API need to subscribe to smartbin/# for this to work?
RQ12: You call POST /bins/bin-01/emptied. This both saves a record and publishes to MQTT. What is the advantage of combining both actions in one endpoint?

AsyncAPI

RQ13: What is AsyncAPI and how does it relate to OpenAPI? Why do you need both for the Smart Wastebin?
RQ14: How many channels did you document in your AsyncAPI spec? For each, state who is the publisher and who is the subscriber.
RQ15: Show a screenshot of your AsyncAPI spec rendered in Swagger Editor or AsyncAPI Studio.
RQ16: Compare the MotionEvent message schema in your AsyncAPI spec with the event_model in your Flask-RESTx code. They describe the same data, what is different about the context in which each is used?

Testing

RQ17: Show the curl command and response for: (a) listing all bins, (b) getting events with a limit, (c) publishing an MQTT message, (d) requesting a nonexistent bin.
RQ18: What is the difference between testing with Swagger UI and testing with curl? When would you use each?

Reflection

RQ19: A new team member joins your project. They need to build a mobile app that shows bin status and lets users report full bins. What do you hand them? How do the Swagger UI and AsyncAPI spec help?
RQ20: In your own words, explain why the Smart Wastebin needs both a push-based system (MQTT) and a pull-based system (REST API). What would be missing if you only had one?

Project hint: Smart Wastebin

The API you built is the interface that other applications will use to interact with your system. As you add more sensors and functionality, both the REST API and the AsyncAPI spec grow with it: Think about your final project demo. You can show the Swagger UI with all your REST endpoints, the AsyncAPI documentation with all your MQTT channels, and the Home Assistant dashboard with live state, three views of one system, each serving a different audience (app developers, end users etc.).


What should be finished before you leave the lab

Before the end of the session you should have: designed your API endpoints, implemented the API with Flask and Flask-RESTx, created namespaces for bins, sensors, and MQTT, included GET endpoints for listing and detail views, included POST endpoints for recording events and publishing MQTT messages, added query parameter support for events, handled error cases with appropriate status codes, connected the API to the MQTT broker, written an AsyncAPI spec covering all your MQTT topics, tested all endpoints through Swagger UI and curl, run the full system together, updated labs/lab08/README.md, and pushed to GitHub.

Final checklist (Lab 08)

  • api.py created with Flask and Flask-RESTx
  • Swagger UI accessible at port 5000
  • bins namespace with list, detail, and events endpoints
  • sensors namespace with list and detail endpoints
  • Events endpoint with limit query parameter
  • POST /bins/<bin_id>/emptied endpoint working
  • mqtt namespace with publish and topics endpoints
  • API connected to Mosquitto broker
  • POST /mqtt/publish can trigger MQTT messages (verified in Home Assistant or consumer)
  • GET /mqtt/topics shows tracked topic state
  • Response models defined for Swagger documentation
  • 404 returned for nonexistent resources
  • API reads event data from consumer’s JSONL output
  • asyncapi.yaml created with channels, operations, and message schemas
  • AsyncAPI spec covers all MQTT topics used by the system
  • AsyncAPI spec viewable in Swagger Editor or AsyncAPI Studio
  • Tested via Swagger UI (screenshots in report)
  • Tested via curl
  • labs/lab08/README.md contains API design, code, screenshots, AsyncAPI spec, and report answers
  • Commit and push completed

Deliverables and submission

What must exist in the repository (by end of lab)

/
├── README.md
├── labs/
│   ├── lab01/
│   ├── ...
│   └── lab08/
│       ├── README.md
│       ├── requirements.txt
│       ├── api.py
│       ├── asyncapi.yaml
│       ├── producer.py
│       ├── consumer.py
│       └── pirlib/
│           ├── __init__.py
│           ├── sampler.py
│           └── interpreter.py

Do not include:

  • venv/
  • __pycache__/
  • *.pyc
  • output/ or *.jsonl
  • large temporary files unless explicitly requested

What labs/lab08/README.md must contain

Two clearly separated parts:

  1. Code / runbook — include your API design table, endpoint implementations (or relevant excerpts), Swagger UI screenshots, AsyncAPI spec (or relevant excerpts), rendered AsyncAPI screenshot, and how to run the full system
  2. Answers to report questions

Same style as previous labs.


End of lab session — GitHub checkpoint

Before leaving:

  • commit your progress
  • push to your team GitHub repository

Minimum expectation:

  • all deliverables tracked by Git
  • latest commit pushed
  • commit message is clear

Before next lab — eClass submission

Submit both:

  1. Code archive (.zip)
  2. PDF export of labs/lab08/README.md

Required PDF filename format:

  • lab08_REPORT_<team>.pdf

What follows is a greek version of the same lab

Lab 08 — Δημιουργία REST API για το Smart Wastebin

Προθεσμίες:

  • Τέλος εργαστηριακής συνεδρίας (GitHub checkpoint): κάντε commit & push την πρόοδό σας στο αποθετήριο της ομάδας σας.
  • Πριν το επόμενο εργαστήριο (υποβολή στο eClass): ανεβάστε (1) ένα .zip με τον κώδικά σας και (2) ένα PDF export του labs/lab08/README.md.

Περιεχόμενο υποβολής:

  • (1) ένα .zip με τον κώδικά σας, και
  • (2) ένα PDF export του labs/lab08/README.md.

Εισαγωγή στο τι πρέπει να κάνετε

Σε αυτό το εργαστήριο θα φτιάξετε ένα REST API που συνυπάρχει με το υπάρχον pipeline σας και σερβίρει δεδομένα από αυτό. Θα φτιάξετε επίσης MQTT endpoints μέσα στο API, ώστε ένας client να μπορεί να κάνει publish μηνύματα σε MQTT topics μέσω απλού HTTP request. Τέλος, θα τεκμηριώσετε το MQTT interface σας με AsyncAPI, το event-driven ισοδύναμο του OpenAPI. Τα REST endpoints σας τεκμηριώνονται αυτόματα από το Flask-RESTx. Τα MQTT topics σας τεκμηριώνονται από ένα AsyncAPI spec. Μαζί, περιγράφουν ολόκληρο το σύστημά σας.

Θα χρησιμοποιήσετε Flask ως web framework και Flask-RESTx για να ορίσετε τα endpoints σας και να δημιουργήσετε αυτόματα Swagger UI documentation.

Το σύστημα έχει τώρα τρία communication layers:

┌──────────────────────────────────────────────────────────────┐
│                    Smart Wastebin System                     │
│                                                              │
│  PIR sensor ──▶ producer ──▶ MQTT broker ──▶ consumer      │
│                              ▲    │             │            │
│                              │    ▼             ▼            │
│                         REST API  HA        JSONL file       │
│                         (publish  (live        │             │
│                          + query) dashboard)   │             │
│                              ▲                 │             │
│                              │                 ▼             │
│                          Swagger UI ◀────── read events      │
│                       (docs + test)                          │
│                                                              │
│  AsyncAPI spec ──▶ περιγράφει το MQTT interface              │
│  OpenAPI spec  ──▶ περιγράφει το REST interface (auto)       │
└──────────────────────────────────────────────────────────────┘

Δημιουργήστε την παρακάτω δομή:

/
├── README.md
├── labs/
│   ├── lab01/
│   ├── ...
│   └── lab08/
│       ├── README.md
│       ├── requirements.txt
│       ├── api.py
│       ├── asyncapi.yaml
│       ├── producer.py
│       ├── consumer.py
│       └── pirlib/
│           ├── __init__.py
│           ├── sampler.py
│           └── interpreter.py

Αντιγράψτε τα producer.py, consumer.py, και pirlib/ από το Lab 06/07. Τα νέα αρχεία είναι το api.py (ο REST API server) και το asyncapi.yaml (η τεκμηρίωση του MQTT interface).


Μέρος 1 — Ρύθμιση Flask και Flask-RESTx

Εγκαταστήστε τα απαιτούμενα πακέτα. Προσθέστε τα στο requirements.txt σας:

flask
flask-restx
...

Εγκατάσταση:

pip install flask flask-restx

Δημιουργήστε το api.py με μια minimal Flask-RESTx εφαρμογή για να βεβαιωθείτε ότι όλα λειτουργούν:

from flask import Flask
from flask_restx import Api, Resource

app = Flask(__name__)
api = Api(
    app,
    version="1.0",
    title="Smart Wastebin API",
    description="REST API for querying Smart Wastebin sensor data and bin status",
)

ns = api.namespace("bins", description="Wastebin operations")


@ns.route("/")
class BinList(Resource):
    def get(self):
        """List all registered bins."""
        return {"bins": []}, 200


if __name__ == "__main__":
    app.run(debug=True, host="0.0.0.0", port=5000)

Τρέξτε το:

python api.py

Ανοίξτε browser και πηγαίνετε στο http://<your-pi-ip>:5000. Θα πρέπει να δείτε το Swagger UI, μια interactive σελίδα documentation που εμφανίζει το API σας με ένα endpoint. Κάντε κλικ σε αυτό, κάντε κλικ “Try it out”, και κάντε κλικ “Execute.” Θα πρέπει να λάβετε {"bins": []} με status code 200.

Αυτό είναι το σημείο εκκίνησής σας. Κάθε endpoint που προσθέτετε θα εμφανίζεται αυτόματα στο Swagger UI, τεκμηριωμένο και δοκιμάσιμο. Δεν χρειάζεται ποτέ να γράψετε documentation ξεχωριστά (για το REST interface τουλάχιστον) — προέρχεται από τον κώδικα.


Μέρος 2 — Σχεδιασμός του API σας

Πριν γράψετε περισσότερο κώδικα, σκεφτείτε τι πρέπει να εκθέτει το API σας. Η διάλεξη τόνισε τον σχεδιασμό του API πρώτα, δηλαδή την απόφαση για τα resources, τα URIs, και τα verbs πριν υλοποιήσετε οτιδήποτε.

Τα resources σας είναι:

  • Bins — οι ίδιοι οι κάδοι απορριμμάτων (status, τοποθεσία, metadata)
  • Sensors — οι sensors που είναι τοποθετημένοι στους κάδους
  • Events — τα motion events που παράγει το pipeline σας (αποθηκευμένα στο JSONL αρχείο)
  • MQTT — ένα interface για publishing και ανάγνωση από MQTT topics μέσω HTTP

Σχεδιάστε τα endpoints σας ακολουθώντας REST conventions. Ορίστε ένα σημείο εκκίνησης — πρέπει να το επεκτείνετε:

Method URI Περιγραφή
GET /bins Λίστα όλων των bins
GET /bins/<bin_id> Λεπτομέρειες για συγκεκριμένο bin
GET /bins/<bin_id>/sensors Λίστα sensors σε συγκεκριμένο bin
GET /bins/<bin_id>/events Motion events για συγκεκριμένο bin
GET /sensors Λίστα όλων των sensors
GET /sensors/<sensor_id> Λεπτομέρειες για συγκεκριμένο sensor
POST /bins/<bin_id>/emptied Καταγραφή αδειάσματος bin
POST /mqtt/publish Publish μηνύματος σε MQTT topic
GET /mqtt/topics Λίστα γνωστών MQTT topics και τελευταία retained τιμή

Σκεφτείτε:

  • Ποιες παραμέτρους πρέπει να δέχεται το events endpoint; Τουλάχιστον ένα limit για τον έλεγχο του αριθμού των records που επιστρέφονται. Ίσως και ένα εύρος ημερομηνιών (start, end);
  • Πώς πρέπει να μοιάζει η response; Ποια fields χρειάζεται κάθε resource;
  • Ποιους status codes πρέπει να επιστρέφει κάθε endpoint; 200 για επιτυχία, 404 αν ένα bin δεν υπάρχει, 400 αν οι query parameters είναι άκυροι;
  • Για το MQTT publish endpoint, τι χρειάζεται να στείλει ο client (topic, payload, QoS, retain);

Καταγράψτε τον σχεδιασμό του API σας στην αναφορά σας πριν το υλοποιήσετε.


Μέρος 3 — Υλοποίηση των data endpoints

Τώρα φτιάξτε την πλευρά του API που σερβίρει δεδομένα. Θα διαβάζετε πληροφορίες bin και sensor από τα JSON-LD models σας (από το Lab 05), και event data από τα JSONL output αρχεία σας.

Φορτώστε τα δεδομένα σας

Το API σας χρειάζεται να διαβάζει από δύο πηγές:

  1. Τα JSON-LD model αρχεία (models/sensor.jsonld, models/wastebin.jsonld, models/environment.jsonld) για metadata bin και sensor
  2. Το JSONL event log που γράφει ο consumer για motion events

Για τα model αρχεία, φορτώστε τα κατά την εκκίνηση:

IMPORT json module
IMPORT os module

SET DATA_DIR ΩΣ τη διαδρομή του τρέχοντος αρχείου συνδυασμένη με "data"
SET EVENTS_FILE ΩΣ τη διαδρομή του DATA_DIR συνδυασμένη με "motion_events.jsonl"


FUNCTION load_json(filepath):
    OPEN το αρχείο στο filepath σε read mode
    READ και parse τα περιεχόμενα του αρχείου ως JSON
    RETURN τα parsed JSON δεδομένα


FUNCTION load_events(filepath, limit = none, sensor_id = none):
    CREATE κενή λίστα events

    IF το αρχείο στο filepath δεν υπάρχει:
        RETURN events

    OPEN το αρχείο στο filepath σε read mode

    FOR κάθε γραμμή στο αρχείο:
        REMOVE leading και trailing whitespace από τη γραμμή

        IF η γραμμή είναι κενή:
            SKIP στην επόμενη γραμμή

        TRY:
            PARSE τη γραμμή ως JSON και αποθήκευσέ τη ως record

            IF δόθηκε sensor_id ΚΑΙ η τιμή "madeBySensor" του record δεν ισούται με sensor_id:
                SKIP στην επόμενη γραμμή

            ADD record στα events

        IF η γραμμή δεν μπορεί να parsed ως JSON:
            SKIP στην επόμενη γραμμή

    REVERSE events ώστε τα πιο πρόσφατα records να έρθουν πρώτα

    IF δόθηκε limit:
        KEEP μόνο τα πρώτα limit records στα events

    RETURN events

Ορίστε τα models για το Swagger

Το Flask-RESTx σας επιτρέπει να ορίζετε response models ώστε το Swagger UI να εμφανίζει την αναμενόμενη δομή response:

bin_model = api.model("Bin", {
    "id": fields.String(required=True, description="Bin unique identifier"),
    "name": fields.String(description="Human-readable name"),
    "location": fields.String(description="Deployment location"),
    "status": fields.String(description="Current status"),
})

event_model = api.model("Event", {
    "resultTime": fields.String(description="ISO timestamp of the event"),
    "madeBySensor": fields.String(description="Sensor ID that produced this event"),
    "hasSimpleResult": fields.String(description="Motion state (detected/clear)"),
    "pipeline_latency_ms": fields.Float(description="Pipeline latency in ms"),
})

Αυτά τα models δεν επιβάλλουν τίποτα κατά την εκτέλεση — είναι documentation. Λένε σε όποιον διαβάζει το Swagger UI ακριβώς πώς μοιάζει η response.

Υλοποιήστε τα endpoints

Ορίστε ένα παράδειγμα πιο πλήρους bins endpoint. Πρέπει να το επεκτείνετε και να το προσαρμόσετε στα δεδομένα σας:

from flask_restx import reqparse

# Parser για query parameters
events_parser = reqparse.RequestParser()
events_parser.add_argument("limit", type=int, default=50, help="Max events to return")
events_parser.add_argument("start", type=str, help="Start datetime (ISO format)")
events_parser.add_argument("end", type=str, help="End datetime (ISO format)")


@ns.route("/")
class BinList(Resource):
    @ns.marshal_list_with(bin_model)
    def get(self):
        """List all registered bins."""
        # Φόρτωση από τα model αρχεία σας ή ένα registry
        return bins_registry


@ns.route("/<string:bin_id>")
@ns.param("bin_id", "The bin identifier")
@ns.response(404, "Bin not found")
class Bin(Resource):
    @ns.marshal_with(bin_model)
    def get(self, bin_id):
        """Get details for a specific bin."""
        bin_data = find_bin(bin_id)
        if not bin_data:
            api.abort(404, f"Bin {bin_id} not found")
        return bin_data


@ns.route("/<string:bin_id>/events")
@ns.param("bin_id", "The bin identifier")
class BinEvents(Resource):
    @ns.expect(events_parser)
    @ns.marshal_list_with(event_model)
    def get(self, bin_id):
        """Get motion events for a specific bin."""
        args = events_parser.parse_args()
        events = load_events(
            EVENTS_FILE,
            limit=args["limit"],
            sensor_id=get_sensor_for_bin(bin_id),
        )
        return events

Μερικά πράγματα να παρατηρήσετε:

Το @ns.marshal_with(bin_model) λέει στο Flask-RESTx να μορφοποιεί την response σύμφωνα με το model και να την εμφανίζει στο Swagger UI. Η documentation παραμένει συγχρονισμένη με τον κώδικα και δεν υπάρχει ξεχωριστό doc αρχείο για συντήρηση.

Το @ns.expect(events_parser) τεκμηριώνει τα query parameters στο Swagger. Όταν κάποιος ανοίξει το UI, βλέπει ότι το /bins/bin-01/events δέχεται παραμέτρους limit, start, και end και μπορεί να τις συμπληρώσει πριν πατήσει “Execute.”

Το api.abort(404, ...) επιστρέφει response 404 με μήνυμα. Έτσι λέτε στον client “αυτό το resource δεν υπάρχει”, χρησιμοποιώντας τους ίδιους status codes από τη διάλεξη.

Προσθέστε το POST endpoint

Προσθέστε endpoint για καταγραφή αδειάσματος bin (γενικά έχω δώσει pseudocode παραδείγματα — τροποποιήστε ή προσθέστε ό,τι κρίνετε κατάλληλο):

DEFINE emptied_model ΩΣ API model με όνομα "EmptiedRecord" ΜΕ:
    FIELD "bin_id" ΩΣ string ΜΕ description "Bin identifier"
    FIELD "emptied_at" ΩΣ string ΜΕ description "ISO timestamp αδειάσματος"
    FIELD "emptied_by" ΩΣ string ΜΕ description "Ποιος άδειασε τον κάδο"


DEFINE API ROUTE "/<bin_id>/emptied"
DOCUMENT route parameter "bin_id" ΩΣ "The bin identifier"

CLASS BinEmptied EXTENDS Resource:

    EXPECT request body να ταιριάζει με emptied_model
    DOCUMENT response 201 ΩΣ "Bin marked as emptied"
    DOCUMENT response 404 ΩΣ "Bin not found"

    FUNCTION post(bin_id):
        RECORD ότι ένας κάδος αδειάστηκε

        SET bin_data ΩΣ αποτέλεσμα find_bin(bin_id)

        IF bin_data δεν υπάρχει:
            STOP το request ΜΕ status 404 ΚΑΙ μήνυμα "Bin <bin_id> not found"

        SET data ΩΣ το εισερχόμενο API request payload

        CREATE record ΜΕ:
            "bin_id" SET TO bin_id
            "emptied_at" SET TO data["emptied_at"] ΑΝ δόθηκε,
                ΑΛΛΙΩΣ SET TO τρέχον UTC timestamp σε ISO format που τελειώνει με "Z"
            "emptied_by" SET TO data["emptied_by"] ΑΝ δόθηκε,
                ΑΛΛΙΩΣ SET TO "unknown"

        SAVE record

        RETURN record ΜΕ status code 201

Αυτό επιστρέφει 201 (Created) σε επιτυχία. Το POST δεν είναι idempotent — καλώντας το δύο φορές δημιουργούνται δύο emptied records, κάτι που είναι σωστή συμπεριφορά (ο κάδος αδειάστηκε δύο φορές).

Προσθέστε sensors namespace

Δημιουργήστε ένα δεύτερο namespace για sensor endpoints:

DEFINE sensors_ns ΩΣ API namespace με όνομα "sensors"
SET description ΩΣ "Sensor operations"


DEFINE sensor_model ΩΣ API model με όνομα "Sensor" ΜΕ:
    FIELD "id" ΩΣ string, required, ΜΕ description "Sensor unique identifier"
    FIELD "type" ΩΣ string ΜΕ description "Τύπος sensor (PIR, ultrasonic, κ.λπ.)"
    FIELD "model" ΩΣ string ΜΕ description "Hardware model"
    FIELD "mounted_on" ΩΣ string ΜΕ description "Bin στον οποίο είναι τοποθετημένος ο sensor"
    FIELD "status" ΩΣ string ΜΕ description "Τρέχουσα κατάσταση sensor"


DEFINE API ROUTE "/sensors/"

CLASS SensorList EXTENDS Resource:

    FORMAT response ΩΣ λίστα sensor_model objects

    FUNCTION get():
        LIST όλους τους καταχωρημένους sensors

        RETURN sensors_registry


DEFINE API ROUTE "/sensors/<sensor_id>"
DOCUMENT route parameter "sensor_id" ΩΣ "The sensor identifier"
DOCUMENT response 404 ΩΣ "Sensor not found"

CLASS Sensor EXTENDS Resource:

    FORMAT response ΩΣ sensor_model object

    FUNCTION get(sensor_id):
        GET λεπτομέρειες για συγκεκριμένο sensor

        SET sensor ΩΣ αποτέλεσμα find_sensor(sensor_id)

        IF sensor δεν υπάρχει:
            STOP το request ΜΕ status 404 ΚΑΙ μήνυμα "Sensor <sensor_id> not found"

        RETURN sensor

Μέρος 4 — Προσθήκη MQTT endpoints

Το REST API σας σερβίρει δεδομένα από αρχεία. Τώρα θα του δώσετε τη δυνατότητα να επικοινωνεί με τον MQTT broker σας (κυρίως ως άσκηση), ώστε ένας client να μπορεί να κάνει publish μηνύματα και να διαβάζει topic state μέσω απλών HTTP requests.

Συνδέστε το API στον broker

Ρυθμίστε έναν MQTT client μέσα στην Flask εφαρμογή σας. Αυτός ο client παραμένει συνδεδεμένος στον broker για όλη τη διάρκεια ζωής του API server:

IMPORT MQTT client library
IMPORT threading library

CREATE MQTT client με όνομα "wastebin-api"
SET clean_session ΣΕ false ώστε το session να μπορεί να διατηρηθεί

CREATE κενό dictionary topic_store
CREATE thread lock topic_lock


FUNCTION on_message(client, userdata, msg):
    STORE κάθε μήνυμα που λαμβάνει ο API client

    LOCK topic_lock ώστε μόνο ένα thread να μπορεί να ενημερώνει το topic_store τη φορά

    SET topic_store[msg.topic] ΩΣ record ΜΕ:
        "topic" SET TO msg.topic
        "payload" SET TO msg.payload decoded ως UTF-8,
            με αντικατάσταση άκυρων χαρακτήρων αν χρειάζεται
        "qos" SET TO msg.qos
        "retain" SET TO msg.retain
        "timestamp" SET TO τρέχον UTC timestamp σε ISO format που τελειώνει με "Z"

    UNLOCK topic_lock


SET mqtt_client's message handler ΩΣ on_message

CONNECT mqtt_client ΣΤΟ MQTT broker στο "localhost" στη port 1883
SET keepalive ΩΣ 60 seconds

SUBSCRIBE mqtt_client ΣΕ όλα τα topics κάτω από "smartbin/"
SET subscription QoS ΩΣ 1

START mqtt_client's background network loop

Το API κάνει subscribe στο smartbin/# ώστε να παρακολουθεί το τελευταίο μήνυμα σε κάθε topic. Αυτό δημιουργεί μια live εικόνα του MQTT state που το API μπορεί να σερβίρει on demand.

Publish endpoint

Δημιουργήστε ένα namespace για MQTT operations:

DEFINE mqtt_ns ΩΣ API namespace με όνομα "mqtt"
SET description ΩΣ "MQTT broker interaction"


DEFINE publish_model ΩΣ API model με όνομα "MQTTPublish" ΜΕ:
    FIELD "topic" ΩΣ string, required, ΜΕ description "MQTT topic για publish"
    FIELD "payload" ΩΣ string, required, ΜΕ description "Message payload"
    FIELD "qos" ΩΣ integer ΜΕ description "Quality of Service (0, 1, ή 2)" ΚΑΙ default τιμή 1
    FIELD "retain" ΩΣ boolean ΜΕ description "Retain αυτό το μήνυμα στον broker" ΚΑΙ default τιμή false


DEFINE API ROUTE "/mqtt/publish"

CLASS MQTTPublish EXTENDS Resource:

    EXPECT request body να ταιριάζει με publish_model
    DOCUMENT response 200 ΩΣ "Message published"
    DOCUMENT response 400 ΩΣ "Invalid request"

    FUNCTION post():
        PUBLISH μήνυμα σε MQTT topic

        SET data ΩΣ το εισερχόμενο API request payload

        SET topic ΩΣ data["topic"]
        SET payload ΩΣ data["payload"]
        SET qos ΩΣ data["qos"] ΑΝ δόθηκε, ΑΛΛΙΩΣ 1
        SET retain ΩΣ data["retain"] ΑΝ δόθηκε, ΑΛΛΙΩΣ false

        IF topic λείπει Ή payload λείπει:
            STOP το request ΜΕ status 400 ΚΑΙ μήνυμα "Both 'topic' and 'payload' are required"

        IF qos δεν είναι 0, 1, ή 2:
            STOP το request ΜΕ status 400 ΚΑΙ μήνυμα "QoS must be 0, 1, or 2"

        PUBLISH payload ΣΤΟ topic χρησιμοποιώντας mqtt_client
            ΜΕ qos set to qos
            ΜΕ retain set to retain

        STORE αποτέλεσμα publish ΩΣ result

        RETURN response ΜΕ:
            "status" SET TO "published"
            "topic" SET TO topic
            "payload" SET TO payload
            "qos" SET TO qos
            "retain" SET TO retain
            "mqtt_rc" SET TO result.rc

        RETURN status code 200

Δοκιμάστε το. Ανοίξτε Swagger UI, πηγαίνετε στο mqtt namespace, και κάντε publish ένα test μήνυμα:

{
  "topic": "smartbin/bin-01/pir-01/motion",
  "payload": "detected",
  "qos": 1,
  "retain": false
}

Θα πρέπει να δείτε το motion sensor entity να ενημερώνεται. Μόλις ενεργοποιήσατε ένα MQTT event από web browser.

Topics endpoint

Προσθέστε endpoint που εμφανίζει την τρέχουσα κατάσταση όλων των γνωστών topics:

DEFINE API ROUTE "/mqtt/topics"

CLASS MQTTTopics EXTENDS Resource:

    FUNCTION get():
        LIST όλα τα γνωστά MQTT topics και το τελευταίο μήνυμα που ελήφθη

        LOCK topic_lock ώστε το topic_store να διαβαστεί με ασφάλεια

        CREATE response ΜΕ:
            "topic_count" SET TO αριθμό εγγραφών στο topic_store
            "topics" SET TO λίστα όλων των αποθηκευμένων topic message records

        RETURN response ΜΕ status code 200

        UNLOCK topic_lock


DEFINE API ROUTE "/mqtt/topics/<topic>"
DOCUMENT route parameter "topic" ΩΣ "MQTT topic path, π.χ. smartbin/bin-01/pir-01/motion"

CLASS MQTTTopicDetail EXTENDS Resource:

    DOCUMENT response 404 ΩΣ "Topic not found or no message received yet"

    FUNCTION get(topic):
        GET το τελευταίο ληφθέν μήνυμα για συγκεκριμένο MQTT topic

        LOCK topic_lock ώστε το topic_store να διαβαστεί με ασφάλεια

        IF topic δεν υπάρχει στο topic_store:
            STOP το request ΜΕ status 404
            INCLUDE μήνυμα "No message received on topic '<topic>'"

        RETURN topic_store[topic] ΜΕ status code 200

        UNLOCK topic_lock

Τώρα μπορείτε να ελέγξετε ολόκληρο το MQTT state από τον browser σας χωρίς να χρειάζεστε mosquitto_sub:

curl http://localhost:5000/mqtt/topics
curl http://localhost:5000/mqtt/topics/smartbin/bin-01/pir-01/motion

Συνδυασμός REST actions με MQTT

Το endpoint “mark bin as emptied” μπορεί επίσης να κάνει publish ένα status update στο MQTT, ώστε το Home Assistant και άλλοι subscribers να ενημερωθούν:

DEFINE API ROUTE "/<bin_id>/emptied"
DOCUMENT route parameter "bin_id" ΩΣ "The bin identifier"

CLASS BinEmptied EXTENDS Resource:

    EXPECT request body να ταιριάζει με emptied_model
    DOCUMENT response 201 ΩΣ "Bin marked as emptied"
    DOCUMENT response 404 ΩΣ "Bin not found"

    FUNCTION post(bin_id):
        RECORD ότι ένας κάδος αδειάστηκε
        PUBLISH status update μέσω MQTT

        SET bin_data ΩΣ αποτέλεσμα find_bin(bin_id)

        IF bin_data δεν υπάρχει:
            STOP το request ΜΕ status 404 ΚΑΙ μήνυμα "Bin <bin_id> not found"

        SET data ΩΣ το εισερχόμενο API request payload

        CREATE record ΜΕ:
            "bin_id" SET TO bin_id
            "emptied_at" SET TO data["emptied_at"] ΑΝ δόθηκε,
                ΑΛΛΙΩΣ SET TO τρέχον UTC timestamp σε ISO format που τελειώνει με "Z"
            "emptied_by" SET TO data["emptied_by"] ΑΝ δόθηκε,
                ΑΛΛΙΩΣ SET TO "unknown"

        SAVE record σε αρχείο

        PUBLISH MQTT μήνυμα:
            TOPIC: "smartbin/<bin_id>/status"
            PAYLOAD: JSON object ΜΕ:
                "state" SET TO "emptied"
                "emptied_at" SET TO record["emptied_at"]
            QOS: 1
            RETAIN: true

        RETURN record ΜΕ status code 201

Μια μόνο API κλήση καταγράφει το event και ειδοποιεί το MQTT ecosystem. Το API λειτουργεί ως gateway μεταξύ των κόσμων HTTP και MQTT.


Μέρος 5 — Τεκμηρίωση του MQTT interface με AsyncAPI

Τα REST endpoints σας τεκμηριώνονται αυτόματα χάρη στο Flask-RESTx και το Swagger UI. Αλλά τι γίνεται με το MQTT interface σας; Ο producer σας κάνει publish σε topics όπως smartbin/bin-01/pir-01/events, ο consumer κάνει subscribe, το Home Assistant ακούει — αλλά τίποτα από αυτά δεν τεκμηριώνεται επίσημα πουθενά. Κάποιος που μπαίνει στο project σας πρέπει να διαβάσει τον κώδικα για να καταλάβει ποια topics υπάρχουν, ποια είναι η μορφή μηνύματος, και τι QoS να χρησιμοποιήσει.

Το AsyncAPI λύνει αυτό. Είναι το event-driven ισοδύναμο του OpenAPI. Όπου το OpenAPI περιγράφει REST endpoints (request/response), το AsyncAPI περιγράφει message channels (publish/subscribe). Γράφετε ένα spec σε YAML, και τα εργαλεία δημιουργούν interactive documentation, ακριβώς όπως το Swagger UI κάνει αυτόματα για το REST API σας. Το Swagger Editor υποστηρίζει πλέον AsyncAPI specifications, οπότε μπορείτε να χρησιμοποιήσετε το ίδιο εργαλείο και για τα δύο.

Γράψτε το spec

Δημιουργήστε το asyncapi.yaml στον κατάλογο labs/lab08/. Αυτό το αρχείο περιγράφει το MQTT interface σας: ποια topics υπάρχουν, ποιος κάνει publish και ποιος subscribe, πώς μοιάζουν τα μηνύματα, και σε ποιον broker ρέουν.

Ορίστε ένα σημείο εκκίνησης — πρέπει να το επεκτείνετε ώστε να καλύπτει όλα τα topics σας:

asyncapi: 3.0.0
info:
  title: Smart Wastebin MQTT Interface
  version: 1.0.0
  description: |
    Event-driven messaging interface for the Smart Wastebin system.
    All communication goes through an MQTT broker (Mosquitto).    

servers:
  mosquitto:
    host: localhost:1883
    protocol: mqtt
    description: Local Mosquitto broker on the Raspberry Pi

channels:
  motionEvents:
    address: smartbin/{bin_id}/{sensor_id}/events
    description: Full motion event records published by the producer
    parameters:
      bin_id:
        description: Wastebin identifier (e.g., bin-01)
      sensor_id:
        description: Sensor identifier (e.g., pir-01)
    messages:
      motionEvent:
        $ref: '#/components/messages/MotionEvent'

  motionState:
    address: smartbin/{bin_id}/{sensor_id}/motion
    description: Simple motion state for Home Assistant (detected/clear)
    parameters:
      bin_id:
        description: Wastebin identifier
      sensor_id:
        description: Sensor identifier
    messages:
      motionState:
        $ref: '#/components/messages/MotionState'

  binStatus:
    address: smartbin/{bin_id}/status
    description: Retained message with current bin status and metadata
    parameters:
      bin_id:
        description: Wastebin identifier
    messages:
      binStatus:
        $ref: '#/components/messages/BinStatus'

operations:
  publishMotionEvent:
    action: send
    channel:
      $ref: '#/channels/motionEvents'
    summary: Producer publishes a full motion event record
    description: |
      Published by the producer whenever the PIR sensor detects motion.
      Contains the full observation record with timestamps, sensor reference,
      and JSON-LD context.      

  consumeMotionEvent:
    action: receive
    channel:
      $ref: '#/channels/motionEvents'
    summary: Consumer receives motion events and writes them to JSONL
    description: |
      The consumer subscribes to this channel, enriches each event with
      ingest_time and pipeline_latency_ms, and writes the result to a
      JSONL file.      

  publishMotionState:
    action: send
    channel:
      $ref: '#/channels/motionState'
    summary: Producer publishes simple motion state for Home Assistant

  publishBinStatus:
    action: send
    channel:
      $ref: '#/channels/binStatus'
    summary: Status update published as a retained message

components:
  messages:
    MotionEvent:
      name: MotionEvent
      contentType: application/json
      payload:
        type: object
        properties:
          "@context":
            type: string
            description: JSON-LD context reference
          "@type":
            type: string
            description: Observation type
            example: sosa:Observation
          madeBySensor:
            type: string
            description: URI of the sensor that produced this event
            example: "urn:dev:pir-01"
          observedIn:
            type: string
            description: URI of the deployment environment
            example: "urn:env:lab-room-101"
          resultTime:
            type: string
            format: date-time
            description: ISO 8601 timestamp of when the event occurred
          hasSimpleResult:
            type: string
            description: Motion state
            enum: [detected, clear]
          seq:
            type: integer
            description: Sequence number within this pipeline run
          run_id:
            type: string
            description: UUID identifying this pipeline run

    MotionState:
      name: MotionState
      contentType: text/plain
      payload:
        type: string
        enum: [detected, clear]
        description: Simple motion state string for Home Assistant

    BinStatus:
      name: BinStatus
      contentType: application/json
      payload:
        type: object
        properties:
          state:
            type: string
            description: Current bin state
            enum: [active, emptied, full, maintenance]
          location:
            type: string
            description: Human-readable location
          last_motion:
            type: string
            format: date-time
            description: Timestamp of most recent motion event

Ας δούμε τι περιγράφει αυτό:

servers — ο Mosquitto broker, η διεύθυνσή του, και το πρωτόκολλο. Ισοδύναμο του base URL στο OpenAPI.

channels — τα MQTT topics σας, με παραμέτρους για τα μεταβλητά τμήματα ({bin_id}, {sensor_id}). Κάθε channel αναφέρεται στο message schema που μεταφέρει.

operations — ποιος κάνει τι. Το publishMotionEvent είναι send action (ο producer κάνει publish), το consumeMotionEvent είναι receive action (ο consumer κάνει subscribe). Αυτό κάνει ρητό ποια components είναι publishers και ποια subscribers.

components/messages — τα message schemas. Ακριβώς όπως οι ορισμοί api.model στο Flask-RESTx, αυτά περιγράφουν τη μορφή κάθε μηνύματος. Το MotionEvent μήνυμα έχει όλα τα fields από τις JSON-LD observations σας. Το MotionState μήνυμα είναι απλώς ένα string.

Επεκτείνετέ το

Το παραπάνω παράδειγμα καλύπτει τρία channels. Σκεφτείτε τι άλλο χρησιμοποιεί το σύστημά σας:

  • Οποιαδήποτε άλλα topics έχετε προσθέσει

Προσθέστε channels, operations, και message schemas για κάθε ένα.

Δείτε το

Ανοίξτε το Swagger Editor στο browser σας. Κάντε κλικ FileImport File και φορτώστε το asyncapi.yaml. Ο editor θα το αναλύσει και θα εμφανίσει interactive documentation — τα channels σας, τα message schemas, τις πληροφορίες server, και τα operations — όλα σε browsable interface.

Τραβήξτε screenshot της rendered AsyncAPI documentation σας για την αναφορά.

Γιατί και OpenAPI και AsyncAPI;

Το Smart Wastebin σύστημά σας έχει δύο interfaces:

REST API (OpenAPI) MQTT Interface (AsyncAPI)
Pattern Request/Response Publish/Subscribe
Protocol HTTP MQTT
Χρήση On-demand queries, commands Real-time events, state updates
Παράδειγμα GET /bins/bin-01/events?limit=10 Subscribe στο smartbin/bin-01/pir-01/events
Spec OpenAPI (auto-generated από Flask-RESTx) AsyncAPI (γραμμένο χειροκίνητα)

Και τα δύο specs εξυπηρετούν τον ίδιο σκοπό: περιγράφουν επίσημα το interface σας ώστε κάποιος που δεν έχει δει ποτέ τον κώδικά σας να μπορεί να καταλάβει πώς να αλληλεπιδράσει με το σύστημά σας. Το OpenAPI καλύπτει την pull πλευρά. Το AsyncAPI καλύπτει την push πλευρά.


Μέρος 6 — Δοκιμή με Swagger UI και curl

Με το API σε λειτουργία, ανοίξτε http://<your-pi-ip>:5000 στο browser σας. Θα πρέπει να βλέπετε όλα τα endpoints οργανωμένα ανά namespace (bins, sensors, mqtt).

Δοκιμάστε κάθε endpoint:

  1. GET /bins — πρέπει να επιστρέψει τη λίστα με τα bins σας
  2. GET /bins/bin-01 — πρέπει να επιστρέψει λεπτομέρειες για το bin-01. Τι συμβαίνει αν ζητήσετε bin που δεν υπάρχει;
  3. GET /bins/bin-01/events — πρέπει να επιστρέψει motion events. Δοκιμάστε με διαφορετικές τιμές limit
  4. POST /bins/bin-01/emptied — συμπληρώστε το JSON body και υποβάλετε. Ελέγξτε ότι έρχεται response 201 και ότι το Home Assistant ενημερώνεται
  5. GET /sensors — πρέπει να εμφανίσει τους sensors σας
  6. GET /sensors/pir-01 — πρέπει να επιστρέψει λεπτομέρειες sensor
  7. POST /mqtt/publish — κάντε publish ένα motion event. Ελέγξτε ότι το Home Assistant και ο consumer σας το λαμβάνουν
  8. GET /mqtt/topics — δείτε όλα τα topics που παρακολουθεί το API
  9. GET /mqtt/topics/smartbin/bin-01/pir-01/motion — ελέγξτε το τελευταίο μήνυμα σε συγκεκριμένο topic

Τραβήξτε screenshots του Swagger UI που εμφανίζουν τα endpoints σας και τουλάχιστον μία επιτυχή response.

Δοκιμή με curl

# Λίστα όλων των bins
curl http://localhost:5000/bins/

# Ανάκτηση συγκεκριμένου bin
curl http://localhost:5000/bins/bin-01

# Ανάκτηση events με limit
curl "http://localhost:5000/bins/bin-01/events?limit=10"

# Σήμανση bin ως αδειασμένο (κάνει επίσης publish στο MQTT)
curl -X POST http://localhost:5000/bins/bin-01/emptied \
  -H "Content-Type: application/json" \
  -d '{"emptied_by": "maintenance-team-A"}'

# Publish στο MQTT μέσω API
curl -X POST http://localhost:5000/mqtt/publish \
  -H "Content-Type: application/json" \
  -d '{"topic": "smartbin/bin-01/pir-01/motion", "payload": "detected", "qos": 1}'

# Εμφάνιση όλων των MQTT topics
curl http://localhost:5000/mqtt/topics

# Αίτημα για bin που δεν υπάρχει
curl -i http://localhost:5000/bins/bin-99

Ελέγξτε ότι παίρνετε 200 για επιτυχημένα αιτήματα, 201 για το POST, και 404 για resources που δεν υπάρχουν.


Γιατί αυτό το εργαστήριο είναι δομημένο έτσι

Μέχρι τώρα, ο μόνος τρόπος για να βγάλετε δεδομένα από το σύστημά σας ήταν να παρακολουθείτε MQTT μηνύματα σε πραγματικό χρόνο ή να ανοίγετε ένα JSONL αρχείο. Και τα δύο απαιτούν να γνωρίζετε τα εσωτερικά του συστήματος — σε ποιο topic να κάνετε subscribe, πού βρίσκεται το αρχείο, πώς μοιάζει η μορφή. Ένα API βάζει μια καθαρή “μπροστινή πόρτα” στο σύστημα. Μια mobile εφαρμογή, ένα web dashboard, μια υπηρεσία route-planning, ή ένα εργαλείο συντήρησης μπορούν όλα να κάνουν query στο ίδιο API χωρίς να ξέρουν τίποτα για MQTT topics ή JSONL αρχεία. Απλώς στέλνουν HTTP requests και λαμβάνουν JSON responses.

Τα MQTT endpoints πηγαίνουν αυτό παρακάτω. Τυλίγοντας το MQTT publish και το topic state σε REST endpoints, το API σας γίνεται universal gateway. Ένας υπεύθυνος εγκαταστάσεων μπορεί να σημειώσει ότι ένας κάδος αδειάστηκε από μια web φόρμα, και το MQTT ecosystem (Home Assistant, ο consumer σας, άλλοι subscribers) ακούνε για αυτό αμέσως. Το REST interface σας τεκμηριώνεται από OpenAPI (auto-generated). Το MQTT interface σας τεκμηριώνεται από AsyncAPI (γραμμένο χειροκίνητα). Μαζί, περιγράφουν ολόκληρο το σύστημά σας σε τυπικές, machine-readable μορφές που οποιοσδήποτε developer μπορεί να διαβάσει και οποιοδήποτε εργαλείο να επεξεργαστεί.

Αυτός ο συνδυασμός MQTT και REST είναι ένα pattern που βλέπετε σε πραγματικές IoT πλατφόρμες. Το AWS IoT έχει MQTT για επικοινωνία συσκευών και REST APIs για διαχείριση. Το Azure IoT Hub κάνει το ίδιο. Το ίδιο το Home Assistant εκθέτει REST API παράλληλα με το MQTT integration. Το Smart Wastebin σας ακολουθεί τώρα την ίδια αρχιτεκτονική.


Ερωτήσεις αναφοράς

Απαντήστε τα παρακάτω στο labs/lab08/README.md σας αφού ολοκληρωθεί η υλοποίηση και τα πειράματα.

Σχεδιασμός API

RQ1: Καταγράψτε τον πλήρη σχεδιασμό του API σας — κάθε endpoint, η HTTP method του, το URI, ποιες παραμέτρους δέχεται, και τι επιστρέφει. Παρουσιάστε αυτό ως πίνακα.
RQ2: Γιατί τα endpoints λίστας events χρησιμοποιούν GET και όχι POST;
RQ3: Γιατί το endpoint “mark as emptied” χρησιμοποιεί POST και όχι PUT; Σκεφτείτε το idempotency.
RQ4: Πώς χειριστήκατε την περίπτωση όπου ένας client ζητά bin ή sensor που δεν υπάρχει; Ποιον status code επιστρέφετε και γιατί;

Υλοποίηση

RQ5: Από πού διαβάζει το API τα δεδομένα του; Ιχνηλατήστε τη διαδρομή των event data από τον PIR sensor μέχρι μια API response.
RQ6: Ποιες query parameters υποστηρίζει το events endpoint σας; Δείξτε παράδειγμα request και response.
RQ7: Πώς σχετίζονται τα Flask-RESTx models (api.model) με την τεκμηρίωση Swagger UI; Τι συμβαίνει στο UI όταν προσθέτετε νέο field σε ένα model;
RQ8: Δείξτε screenshot του Swagger UI σας με ορατά endpoints.

MQTT endpoints

RQ9: Εξηγήστε πώς λειτουργεί το POST /mqtt/publish endpoint. Τι κάνει το API όταν λαμβάνει αίτημα publish;
RQ10: Κάνατε publish ένα motion event μέσω του API χρησιμοποιώντας POST /mqtt/publish. Περιγράψτε τη διαδρομή που ακολουθεί αυτό το μήνυμα, από το HTTP request μέχρι το JSONL αρχείο του consumer.
RQ11: Τι επιστρέφει το GET /mqtt/topics; Γιατί πρέπει το API να κάνει subscribe στο smartbin/# για να λειτουργεί αυτό;
RQ12: Καλείτε POST /bins/bin-01/emptied. Αυτό τόσο αποθηκεύει ένα record όσο και κάνει publish στο MQTT. Ποιο είναι το πλεονέκτημα του συνδυασμού και των δύο actions σε ένα endpoint;

AsyncAPI

RQ13: Τι είναι το AsyncAPI και πώς σχετίζεται με το OpenAPI; Γιατί χρειάζεστε και τα δύο για το Smart Wastebin;
RQ14: Πόσα channels τεκμηριώσατε στο AsyncAPI spec σας; Για κάθε ένα, αναφέρετε ποιος είναι ο publisher και ποιος ο subscriber.
RQ15: Δείξτε screenshot του AsyncAPI spec σας rendered σε Swagger Editor ή AsyncAPI Studio.
RQ16: Συγκρίνετε το MotionEvent message schema στο AsyncAPI spec σας με το event_model στον Flask-RESTx κώδικά σας. Περιγράφουν τα ίδια δεδομένα — τι διαφέρει στο context χρήσης του καθενός;

Δοκιμή

RQ17: Δείξτε την εντολή curl και response για: (α) λίστα όλων των bins, (β) ανάκτηση events με limit, (γ) publish MQTT μηνύματος, (δ) αίτημα για bin που δεν υπάρχει.
RQ18: Ποια είναι η διαφορά μεταξύ δοκιμής με Swagger UI και με curl; Πότε θα χρησιμοποιούσατε το καθένα;

Αναστοχασμός

RQ19: Ένα νέο μέλος ομάδας μπαίνει στο project σας. Πρέπει να φτιάξει mobile εφαρμογή που εμφανίζει κατάσταση bin και επιτρέπει σε χρήστες να αναφέρουν γεμάτους κάδους. Τι του δίνετε; Πώς βοηθούν το Swagger UI και το AsyncAPI spec;
RQ20: Με δικά σας λόγια, εξηγήστε γιατί το Smart Wastebin χρειάζεται τόσο push-based σύστημα (MQTT) όσο και pull-based σύστημα (REST API). Τι θα έλειπε αν είχατε μόνο το ένα;


Υπόδειξη project: Smart Wastebin

Το API που φτιάξατε είναι το interface που θα χρησιμοποιούν άλλες εφαρμογές για να αλληλεπιδρούν με το σύστημά σας. Καθώς προσθέτετε περισσότερους sensors και λειτουργικότητα, και το REST API και το AsyncAPI spec μεγαλώνουν μαζί.

Σκεφτείτε το τελικό project demo σας. Μπορείτε να εμφανίσετε το Swagger UI με όλα τα REST endpoints, την AsyncAPI documentation με όλα τα MQTT channels, και το Home Assistant dashboard με live state — τρεις απόψεις ενός συστήματος, καθεμία εξυπηρετεί διαφορετικό κοινό (app developers, τελικοί χρήστες κ.λπ.).


Τι πρέπει να έχει ολοκληρωθεί πριν φύγετε από το εργαστήριο

Πριν το τέλος της συνεδρίας θα πρέπει να έχετε: σχεδιάσει τα API endpoints σας, υλοποιήσει το API με Flask και Flask-RESTx, δημιουργήσει namespaces για bins, sensors, και MQTT, συμπεριλάβει GET endpoints για λίστες και detail views, συμπεριλάβει POST endpoints για καταγραφή events και publishing MQTT μηνυμάτων, προσθέσει υποστήριξη query parameters για events, χειριστεί error cases με κατάλληλους status codes, συνδέσει το API στον MQTT broker, γράψει AsyncAPI spec που καλύπτει όλα τα MQTT topics σας, δοκιμάσει όλα τα endpoints μέσω Swagger UI και curl, τρέξει το πλήρες σύστημα μαζί, ενημερώσει το labs/lab08/README.md, και κάνει push στο GitHub.


Τελικό checklist (Lab 08)

  • api.py δημιουργημένο με Flask και Flask-RESTx
  • Swagger UI προσβάσιμο στη port 5000
  • bins namespace με list, detail, και events endpoints
  • sensors namespace με list και detail endpoints
  • Events endpoint με query parameter limit
  • Endpoint POST /bins/<bin_id>/emptied λειτουργικό
  • mqtt namespace με publish και topics endpoints
  • API συνδεδεμένο στον Mosquitto broker
  • POST /mqtt/publish μπορεί να ενεργοποιήσει MQTT μηνύματα (επαληθευμένο στο Home Assistant ή consumer)
  • GET /mqtt/topics εμφανίζει tracked topic state
  • Response models ορισμένα για Swagger documentation
  • 404 επιστρέφεται για resources που δεν υπάρχουν
  • API διαβάζει event data από JSONL output του consumer
  • asyncapi.yaml δημιουργημένο με channels, operations, και message schemas
  • AsyncAPI spec καλύπτει όλα τα MQTT topics που χρησιμοποιεί το σύστημα
  • AsyncAPI spec προβλέψιμο σε Swagger Editor ή AsyncAPI Studio
  • Δοκιμασμένο μέσω Swagger UI (screenshots στην αναφορά)
  • Δοκιμασμένο μέσω curl
  • labs/lab08/README.md περιέχει API design, κώδικα, screenshots, AsyncAPI spec, και απαντήσεις αναφοράς
  • Commit και push ολοκληρωμένα

Παραδοτέα και υποβολή

Τι πρέπει να υπάρχει στο αποθετήριο (έως το τέλος του εργαστηρίου)

/
├── README.md
├── labs/
│   ├── lab01/
│   ├── ...
│   └── lab08/
│       ├── README.md
│       ├── requirements.txt
│       ├── api.py
│       ├── asyncapi.yaml
│       ├── producer.py
│       ├── consumer.py
│       └── pirlib/
│           ├── __init__.py
│           ├── sampler.py
│           └── interpreter.py

Μην συμπεριλάβετε:

  • venv/
  • __pycache__/
  • *.pyc
  • output/ ή *.jsonl
  • μεγάλα προσωρινά αρχεία εκτός αν ζητηθεί ρητά

Τι πρέπει να περιέχει το labs/lab08/README.md

Δύο σαφώς διαχωρισμένα μέρη:

  1. Κώδικας / runbook — συμπεριλάβετε τον πίνακα σχεδιασμού API, υλοποιήσεις endpoints (ή σχετικά αποσπάσματα), screenshots Swagger UI, AsyncAPI spec (ή σχετικά αποσπάσματα), screenshot rendered AsyncAPI, και πώς να τρέξετε το πλήρες σύστημα
  2. Απαντήσεις στις ερωτήσεις αναφοράς

Ίδιο στυλ με τα προηγούμενα εργαστήρια.


Τέλος εργαστηριακής συνεδρίας — GitHub checkpoint

Πριν φύγετε:

  • κάντε commit την πρόοδό σας
  • κάντε push στο αποθετήριο GitHub της ομάδας σας

Ελάχιστη προσδοκία:

  • όλα τα παραδοτέα παρακολουθούνται από το Git
  • το τελευταίο commit έχει γίνει push
  • το commit message είναι σαφές

Πριν το επόμενο εργαστήριο — υποβολή στο eClass

Υποβάλετε και τα δύο:

  1. Αρχείο κώδικα (.zip)
  2. PDF export του labs/lab08/README.md

Απαιτούμενη μορφή ονόματος PDF αρχείου:

  • lab08_REPORT_<team>.pdf
Previous
Next