Concurrent Data Pipelines

Lab 03 — Concurrent Data Pipelines with Queue Management

Deadlines:

  • End of lab session (GitHub checkpoint): commit & push your progress to your team repository.
  • Before next lab (eClass submission): upload (1) a .zip with your code and (2) a PDF export of labs/lab03/README.md.

Submission contents:

  • (1) a .zip with your code, and
  • (2) a PDF export of labs/lab03/README.md.

Intro to what you need to code

In this lab you will build a small producer–consumer pipeline for motion events on the Raspberry Pi. The goal is to stop treating acquisition, interpretation, and storage as one long sequential loop and instead separate them into components with clear responsibilities.

You will reuse the PIR logic from Lab 02, but this time you will place it inside a pipeline with a bounded queue between the producer and the consumer. One side of the system will detect motion events and enqueue them. The other side will dequeue them, enrich them, and write them to JSON Lines output.

By the end of the exercise, your system should follow this flow:

PIR sensor
sampler / interpreter
producer thread
bounded queue
consumer thread
JSONL writer
JSONL event log

This lab is about both the architecture and its implementation. The PIR sensor and the code you created to read it in lab 2 can be the same. What changes is the way you organize the software around it. Do not start from zero. Reuse your sampler.py and interpreter.py from Lab 02, move them into labs/lab03/pirlib/, and build the new pipeline around them.

Create the following structure:

/
├── README.md
├── labs/
│   ├── lab01/
│   ├── lab02/
│   └── lab03/
│       ├── README.md
│       ├── requirements.txt
│       ├── run_pipeline.py
│       └── pirlib/
│           ├── __init__.py
│           ├── sampler.py
│           └── interpreter.py

Your Lab 03 work should live in labs/lab03/. The new part is not the PIR detection logic itself but the pipeline that re-uses that logic correctly.

Write the code in this order

You will work on run_pipeline.py. In a larger system, the producer and consumer would usually be separate components or services with clear interfaces between them. For this lab, to keep the implementation manageable, you will run them as separate threads inside the same file. This file should create the queue, set up shared state, start the producer and consumer threads, handle command-line arguments, print optional status information, and stop cleanly when the run finishes or the user presses Ctrl-C.

Use a bounded queue created directly in run_pipeline.py:

from queue import Queue

event_q = Queue(maxsize=args.queue_size)

A bounded queue matters because the Raspberry Pi has limited memory and overload should be visible. An unbounded queue can hide problems until the system becomes unstable.

Use a small shared metrics structure and a simple stop flag:

metrics = {
    "produced": 0,
    "consumed": 0,
    "dropped": 0,
    "max_queue": 0,
}

stop_flag = {"stop": False}

If you prefer threading.Event (instead of a stop_flag), that is also acceptable.

Next, add a helper for UTC timestamps. Use the same helper everywhere so your records stay consistent:

from datetime import datetime, timezone


def utc_now_iso() -> str:
    return (
        datetime.now(timezone.utc)
        .isoformat(timespec="milliseconds")
        .replace("+00:00", "Z")
    )

Build the producer

The producer thread should read the PIR state, pass the raw signal to the interpreter, build structured event records, and try to place them into the queue. If the queue is full, the record should be dropped and the dropped counter should increase.

A suitable producer loop might look like this (beware I am providing you pseudocode for this dont copy paste it):

from queue import Full
import time
import uuid

WHAT FOLLOWS IS PSEUDOCODE!

Set run_id to a new unique identifier
Set seq to 0

While stop flag is not set:
    Get the current time
    Read one sample from the sampler

    Update the interpreter with the sample and current time
    For each event returned by the interpreter:
        Increase seq by 1

        Create an event record containing:
            the current UTC timestamp
            the device identifier
            the event type as "motion"
            the motion state as "detected"
            the sequence number
            the run identifier

        Try to add the record to the queue without waiting
            If successful:
                Increase the produced counter
            If the queue is full:
                Increase the dropped counter

    Sleep for the sample interval

This lab uses a drop-newest policy. In practice that means the producer calls put_nowait(...); if the queue is full, the newest record is discarded. This is simple, visible, and easy to explain.

Build the consumer

The consumer thread should wait for records, dequeue them, add ingest_time, calculate pipeline_latency_ms, and write valid JSONL output. It may also simulate a slowdown if consumer_delay is greater than zero.

Use this helper for parsing UTC timestamps:

from datetime import datetime


def parse_iso_utc(s: str):
    return datetime.fromisoformat(s.replace("Z", "+00:00"))

A suitable consumer loop looks like this (beware I am providing you pseudocode for this dont copy paste it):

from queue import Empty
import json
import time

WHAT FOLLOWS IS PSEUDOCODE!

Open the output file in append mode

While the stop flag is not set or the queue is not empty:
    Try to remove one record from the queue, waiting up to 0.5 seconds
    If no record is available:
        continue to the next loop iteration

    Add the current UTC time as the ingest time

    Parse the event time
    Parse the ingest time
    Compute pipeline latency in milliseconds
    Store the latency in the record

    Write the record to the output file as one JSON line
    Flush the file so the data is written immediately

    Increase the consumed counter by 1
    Update the maximum queue size seen so far
    Mark the queue item as done

    If a consumer delay is configured:
        sleep for that delay

The consumer must continue until stop has been requested and the queue has been drained. That is why the loop condition checks both the stop flag and the queue state.

Remember that JSONL means one valid JSON object per line. Status messages belong in the terminal, not in the output file.

Required record fields

Each output record must contain these fields:

event_time, ingest_time, device_id, event_type, motion_state, seq, run_id, and pipeline_latency_ms.

A typical record will look like this:

record = {
    "event_time": utc_now_iso(),
    "device_id": device_id,
    "event_type": "motion",
    "motion_state": "detected",
    "seq": seq,
    "run_id": run_id,
}

The consumer then adds ingest_time and pipeline_latency_ms before writing the line to disk.

Start the threads and stop cleanly

Create one producer thread and one consumer thread in run_pipeline.py.

import threading

producer_t = threading.Thread(
    target=producer_loop,
    args=(event_q, sampler, interp, args, metrics, stop_flag),
    daemon=True,
)

consumer_t = threading.Thread(
    target=consumer_loop,
    args=(event_q, args.out, args, metrics, stop_flag),
    daemon=True,
)

producer_t.start()
consumer_t.start()

A clean shutdown should work like this: start both threads, let the pipeline run until duration expires or Ctrl-C is pressed, then set the stop flag and wait for both threads to finish.

Since I am not sure of your familiarity with parallel processes/threads one hint is:

start_t = time.time()

try:
    while (time.time() - start_t) < args.duration:
        if args.verbose:
            print(
                f"[status] produced={metrics['produced']} "
                f"consumed={metrics['consumed']} "
                f"dropped={metrics['dropped']} "
                f"queue={event_q.qsize()} "
                f"max_queue={metrics['max_queue']}"
            )
        time.sleep(1.0)
except KeyboardInterrupt:
    print("\n[main] Ctrl-C: stopping...")
finally:
    stop_flag["stop"] = True
    producer_t.join()
    consumer_t.join()

Equivalent designs are acceptable as long as the behavior is correct.

Command-line interface

Your main program should accept arguments such as these:

python run_pipeline.py \
  --device-id pir-01 \
  --pin 18 \
  --sample-interval 0.1 \
  --cooldown 5 \
  --min-high 0.2 \
  --queue-size 100 \
  --consumer-delay 0.0 \
  --duration 60 \
  --out motion_pipeline.jsonl \
  --verbose

Include at least --device-id, --pin, --sample-interval, --cooldown, --min-high, --queue-size, --consumer-delay, --duration, --out, and --verbose.

Run the pipeline

Remember that we introduced sample interval and cooldown in the code of the previous lab, so make sure to use values that would lead to sampling fast enough to see.

Once the code is in place, run the pipeline without artificial slowdown:

python run_pipeline.py \
  ...
  --consumer-delay 0.0 \
  ...

Trigger motion several times and verify that the output file contains valid JSONL records, that each record has both event_time and ingest_time, that pipeline_latency_ms is present, and that terminal status lines are not mixed into the JSONL file.

If --verbose is enabled, print periodic status lines such as:

[status] produced=12 consumed=12 dropped=0 queue=0 max_queue=2

Run the slow-consumer experiment

After the normal run works, repeat the experiment with an artificial consumer delay:

python run_pipeline.py \
  ...
  --consumer-delay 0.5 \
  ...  

try to modify this to check out its effects.

This delay simulates slow storage, slow downstream processing, or temporary overload. Watch what happens to queue length, produced and consumed counts, dropped records, and pipeline_latency_ms.

Environment and dependencies

Follow the same setup process as in the previous labs and make sure requirements.txt includes every external library your implementation depends on.

Why this lab is structured this way

In Lab 02, your program already read sensor input, interpreted signal behavior, created event records, and wrote output. That worked, but everything happened in one place. As systems grow, that design becomes fragile.

A sequential loop such as

read → interpret → write → read → interpret → write

is easy to start with, but if writing becomes slow, acquisition also becomes slow. Timing becomes less predictable, output is delayed, and short-lived events may be missed.

The pipeline design is different:

producer (read / package)
       queue
consumer (write)

Here the queue acts as a buffer between acquisition and output. The producer can continue for a while even if the consumer slows down.

Theory: queues, backpressure, and pipelines

A central idea in this lab is the queue. A queue decouples the rate at which events are produced from the rate at which they are consumed. This does not remove overload, but it makes overload visible and manageable.

If the producer generates events faster than the consumer can process them, the queue grows. If that mismatch continues, the queue eventually becomes full. At that point the system must apply a policy. This is backpressure.

Typical backpressure policies include blocking the producer, dropping the newest record, or dropping the oldest one. In this lab you will use a drop-newest policy because it is easy to implement and easy to observe.

The lecture also introduced pipeline phases such as Collect, Ingest, Compute or Transform, Store, and Consume. You have already implemented some of these ideas, even if you did not use those names in Lab 02. The difference now is that the pipeline is explicit and concurrent.

The lecture also discussed ETL and ELT:

ETL = Extract → Transform → Load
ELT = Extract → Load → Transform

Your edge pipeline is much smaller than a warehouse-scale system, but the same concepts still apply. Right now your system already performs some transformation before writing to disk, so you should think about whether your design is closer to ETL, ELT, or a hybrid.

Report questions

Answer the following in your labs/lab03/README.md after the implementation and experiments are complete.

Architecture and reuse

RQ1: Which lecture pipeline phases do you believe you had already implemented in Lab 02?
RQ2: Which part of your Lab 02 code did you reuse directly?
RQ3: Which part did you have to adapt for the pipeline architecture?

Pipeline concepts

RQ4: In your own words, why is a queue useful between acquisition and writing?
RQ5: What is backpressure?
RQ6: Why can a slow writer become a data acquisition problem and not just a storage problem?

ETL and transformation

RQ7: Is your current edge pipeline closer to ETL or ELT? Explain briefly.
RQ8: What transformation already happens before your data is written to disk?
RQ9: Give one example of a transformation that could be moved later to another stage of the system.

Implementation

RQ10: Explain the responsibility of the producer in one sentence.
RQ11: Explain the responsibility of the consumer in one sentence.
RQ12: Show two example JSONL records from your output and explain their fields briefly.
RQ13: What does pipeline_latency_ms mean in your system?

Experimental observations

RQ14: What changed when you introduced --consumer-delay 0.5?
RQ15: Did the queue absorb the slowdown? Explain briefly using your own observations.
RQ16: What is one clear sign, from your terminal status output, that the producer is outrunning the consumer?
RQ17: Why is a bounded queue more informative than an unbounded queue during overload?
RQ18: Why should status lines stay in the terminal instead of being mixed into the JSONL file?
RQ19: Which field lets you group records from the same program execution, and why is that useful?
RQ20: Why would an unbounded queue be dangerous on a Raspberry Pi?
RQ21: If you later replaced the JSONL writer with another output component, which part of your system could stay almost unchanged and why?

Project hint: Smart Wastebin

As you work through this lab, think beyond the simple motion sensor reading that we have done so far. In the full Smart Wastebin project, this code could be only one small part of a larger system. That means you should start thinking now about modular design.

Try to separate sensing, event detection, queueing, logging, and decision-making into clear components with well-defined responsibilities. A good design makes it easier to replace one part later without breaking the rest of the system. For example, in a project such as the one we do here, you may later want to change the sensor (we are not changing the sensor but as an example), add more event types, send data to another service, or connect the motion pipeline to higher-level wastebin logic such as lid control, fill monitoring, or usage analytics. If your code produces clean, well-defined events, the rest of the Smart Wastebin system can consume those events without needing to know how the sensor data was collected. In other words, the more modular your design is now, the easier it will be to extend, test, and integrate later.


What should be finished before you leave the lab

Before the end of the session, you should have reused sampler.py and interpreter.py, created run_pipeline.py, implemented one producer thread and one consumer thread, used a bounded queue, handled queue-full cases with drop counting, written valid JSONL records, added ingest_time and pipeline_latency_ms, run both the normal and slow-consumer cases, updated labs/lab03/README.md, and pushed your work to GitHub.

Final checklist (Lab 03)

  • Lab 02 PIR logic reused inside labs/lab03/pirlib/
  • bounded queue implemented in run_pipeline.py
  • producer and consumer run concurrently
  • queue-full behavior handled with drop counting
  • JSONL output contains structured records only
  • pipeline_latency_ms added to records
  • normal run completed
  • slow-consumer run completed
  • labs/lab03/README.md contains code, run steps, and report answers
  • commit and push completed

Deliverables and submission

What must exist in the repository (by end of lab)

Your repository must contain at least:

/
├── README.md
├── labs/
│   ├── lab01/
│   ├── lab02/
│   └── lab03/
│       ├── README.md
│       ├── requirements.txt
│       ├── run_pipeline.py
│       └── pirlib/
│           ├── __init__.py
│           ├── sampler.py
│           └── interpreter.py

Do not include:

  • venv/
  • __pycache__/
  • *.pyc
  • large temporary files unless explicitly requested

What labs/lab03/README.md must contain

Your labs/lab03/README.md must contain two clearly separated parts:

  1. Code / runbook
  2. Answers to report questions

Use the same general style you used in previous labs.


End of lab session — GitHub checkpoint

Before leaving the lab:

  • commit your progress
  • push to your team GitHub repository

Minimum expectation:

  • all required deliverables are tracked by Git
  • latest commit is pushed
  • commit message is clear and meaningful

Before next lab — eClass submission

Submit both of the following:

  1. Code archive (.zip)
  2. PDF export of labs/lab03/README.md

Required PDF filename format:

  • lab03_REPORT_<team>.pdf

What follows is a greek version of the same lab

Lab 03 — Concurrent Data Pipelines with Queue Management

Προθεσμίες:

  • Τέλος της εργαστηριακής συνεδρίας (GitHub checkpoint): κάντε commit & push την πρόοδό σας στο team repository σας.
  • Πριν από το επόμενο lab (eClass submission): ανεβάστε (1) ένα .zip με τον κώδικά σας και (2) ένα PDF export του labs/lab03/README.md.

Περιεχόμενα υποβολής:

  • (1) ένα .zip με τον κώδικά σας, και
  • (2) ένα PDF export του labs/lab03/README.md.

Εισαγωγή σε αυτό που πρέπει να κωδικοποιήσετε

Σε αυτό το lab θα υλοποιήσετε ένα μικρό producer–consumer pipeline για motion events στο Raspberry Pi. Ο στόχος είναι να σταματήσετε να αντιμετωπίζετε την acquisition, interpretation και storage ως έναν μεγάλο sequential loop και αντί γι’ αυτό να τα διαχωρίσετε σε components με σαφείς αρμοδιότητες.

Θα επαναχρησιμοποιήσετε τη λογική PIR από το Lab 02, αλλά αυτή τη φορά θα την τοποθετήσετε μέσα σε ένα pipeline με ένα bounded queue ανάμεσα στον producer και τον consumer. Η μία πλευρά του συστήματος θα ανιχνεύει motion events και θα τα κάνει enqueue. Η άλλη πλευρά θα τα κάνει dequeue, θα τα εμπλουτίζει και θα τα γράφει σε JSON Lines output.

Μέχρι το τέλος της άσκησης, το σύστημά σας θα πρέπει να ακολουθεί την εξής ροή:

PIR sensor
sampler / interpreter
producer thread
bounded queue
consumer thread
JSONL writer
JSONL event log

Αυτό το lab αφορά την αρχιτεκτονική καθώς και την υλοποίηση της. Ο PIR sensor και ο κώδικας που δημιουργήσατε για να τον διαβάζετε στο lab 2 μπορούν να παραμείνουν ίδιοι. Αυτό που αλλάζει είναι ο τρόπος με τον οποίο οργανώνετε το software γύρω από αυτόν. Μην ξεκινήσετε από το μηδέν. Επαναχρησιμοποιήστε τα sampler.py και interpreter.py από το Lab 02, μετακινήστε τα στο labs/lab03/pirlib/, και χτίστε το νέο pipeline γύρω από αυτά.

Δημιουργήστε την ακόλουθη δομή:

/
├── README.md
├── labs/
│   ├── lab01/
│   ├── lab02/
│   └── lab03/
│       ├── README.md
│       ├── requirements.txt
│       ├── run_pipeline.py
│       └── pirlib/
│           ├── __init__.py
│           ├── sampler.py
│           └── interpreter.py

Η εργασία σας για το Lab 03 πρέπει να βρίσκεται στο labs/lab03/. Το νέο κομμάτι δεν είναι η ίδια η λογική PIR detection, αλλά το pipeline που επαναχρησιμοποιεί σωστά αυτή τη λογική.

Γράψτε τον κώδικα με αυτή τη σειρά

Θα δουλέψετε στο run_pipeline.py. Σε ένα μεγαλύτερο σύστημα, ο producer και ο consumer θα ήταν συνήθως ξεχωριστά components ή services με σαφή interfaces μεταξύ τους. Για αυτό το lab, ώστε η υλοποίηση να παραμείνει διαχειρίσιμη, θα τους εκτελέσετε ως ξεχωριστά threads μέσα στο ίδιο αρχείο. Αυτό το αρχείο πρέπει να δημιουργεί το queue, να ρυθμίζει το shared state, να ξεκινά τα producer και consumer threads, να χειρίζεται command-line arguments, να εκτυπώνει προαιρετικές status πληροφορίες και να σταματά καθαρά όταν ολοκληρώνεται η εκτέλεση ή όταν ο χρήστης πατήσει Ctrl-C.

Χρησιμοποιήστε ένα bounded queue που δημιουργείται απευθείας μέσα στο run_pipeline.py:

from queue import Queue

event_q = Queue(maxsize=args.queue_size)

Ένα bounded queue είναι σημαντικό επειδή το Raspberry Pi έχει περιορισμένη μνήμη και το overload πρέπει να είναι ορατό. Ένα unbounded queue μπορεί να κρύψει προβλήματα μέχρι το σύστημα να γίνει ασταθές.

Χρησιμοποιήστε μια μικρή shared metrics structure και ένα απλό stop flag:

metrics = {
    "produced": 0,
    "consumed": 0,
    "dropped": 0,
    "max_queue": 0,
}

stop_flag = {"stop": False}

Αν προτιμάτε threading.Event (αντί για stop_flag), αυτό είναι επίσης αποδεκτό.

Στη συνέχεια, προσθέστε ένα helper για UTC timestamps. Χρησιμοποιήστε το ίδιο helper παντού ώστε τα records σας να παραμένουν συνεπή:

from datetime import datetime, timezone


def utc_now_iso() -> str:
    return (
        datetime.now(timezone.utc)
        .isoformat(timespec="milliseconds")
        .replace("+00:00", "Z")
    )

Δημιουργήστε τον producer

Το producer thread πρέπει να διαβάζει το PIR state, να περνά το raw signal στον interpreter, να δημιουργεί structured event records και να προσπαθεί να τα τοποθετήσει στο queue. Αν το queue είναι full, το record πρέπει να απορρίπτεται και ο dropped counter να αυξάνεται.

Ένα κατάλληλο producer loop θα μπορούσε να μοιάζει ως εξής (προσοχή: σας δίνω pseudocode για αυτό, μην το κάνετε copy paste):

from queue import Full
import time
import uuid

WHAT FOLLOWS IS PSEUDOCODE!

Set run_id to a new unique identifier
Set seq to 0

While stop flag is not set:
    Get the current time
    Read one sample from the sampler

    Update the interpreter with the sample and current time
    For each event returned by the interpreter:
        Increase seq by 1

        Create an event record containing:
            the current UTC timestamp
            the device identifier
            the event type as "motion"
            the motion state as "detected"
            the sequence number
            the run identifier

        Try to add the record to the queue without waiting
            If successful:
                Increase the produced counter
            If the queue is full:
                Increase the dropped counter

    Sleep for the sample interval

Αυτό το lab χρησιμοποιεί policy drop-newest. Στην πράξη αυτό σημαίνει ότι ο producer καλεί put_nowait(...); αν το queue είναι full, το νεότερο record απορρίπτεται. Αυτό είναι απλό, ορατό και εύκολο να εξηγηθεί.

Δημιουργήστε τον consumer

Το consumer thread πρέπει να περιμένει records, να τα κάνει dequeue, να προσθέτει ingest_time, να υπολογίζει pipeline_latency_ms και να γράφει έγκυρο JSONL output. Μπορεί επίσης να προσομοιώνει slowdown αν το consumer_delay είναι μεγαλύτερο από 0.

Χρησιμοποιήστε αυτό το helper για parsing UTC timestamps:

from datetime import datetime


def parse_iso_utc(s: str):
    return datetime.fromisoformat(s.replace("Z", "+00:00"))

Ένα κατάλληλο consumer loop μοιάζει ως εξής (προσοχή: σας δίνω pseudocode για αυτό, μην το κάνετε copy paste):

from queue import Empty
import json
import time

WHAT FOLLOWS IS PSEUDOCODE!

Open the output file in append mode

While the stop flag is not set or the queue is not empty:
    Try to remove one record from the queue, waiting up to 0.5 seconds
    If no record is available:
        continue to the next loop iteration

    Add the current UTC time as the ingest time

    Parse the event time
    Parse the ingest time
    Compute pipeline latency in milliseconds
    Store the latency in the record

    Write the record to the output file as one JSON line
    Flush the file so the data is written immediately

    Increase the consumed counter by 1
    Update the maximum queue size seen so far
    Mark the queue item as done

    If a consumer delay is configured:
        sleep for that delay

Ο consumer πρέπει να συνεχίσει μέχρι να ζητηθεί stop και να έχει αδειάσει το queue. Γι’ αυτό η συνθήκη του loop ελέγχει τόσο το stop flag όσο και την κατάσταση του queue.

Θυμηθείτε ότι JSONL σημαίνει ένα έγκυρο JSON object ανά γραμμή. Τα status messages ανήκουν στο terminal, όχι στο output file.

Απαιτούμενα record fields

Κάθε output record πρέπει να περιέχει τα εξής fields:

event_time, ingest_time, device_id, event_type, motion_state, seq, run_id, και pipeline_latency_ms.

Ένα τυπικό record θα μοιάζει ως εξής:

record = {
    "event_time": utc_now_iso(),
    "device_id": device_id,
    "event_type": "motion",
    "motion_state": "detected",
    "seq": seq,
    "run_id": run_id,
}

Στη συνέχεια ο consumer προσθέτει τα ingest_time και pipeline_latency_ms πριν γράψει τη γραμμή στο disk.

Ξεκινήστε τα threads και σταματήστε καθαρά

Δημιουργήστε ένα producer thread και ένα consumer thread στο run_pipeline.py.

import threading

producer_t = threading.Thread(
    target=producer_loop,
    args=(event_q, sampler, interp, args, metrics, stop_flag),
    daemon=True,
)

consumer_t = threading.Thread(
    target=consumer_loop,
    args=(event_q, args.out, args, metrics, stop_flag),
    daemon=True,
)

producer_t.start()
consumer_t.start()

Ένα clean shutdown πρέπει να λειτουργεί ως εξής: ξεκινήστε και τα δύο threads, αφήστε το pipeline να τρέχει μέχρι να λήξει το duration ή να πατηθεί Ctrl-C, μετά θέστε το stop flag και περιμένετε να ολοκληρωθούν και τα δύο threads.

Επειδή δεν είμαι βέβαιος για την εξοικείωσή σας με parallel processes/threads, ένα hint είναι:

start_t = time.time()

try:
    while (time.time() - start_t) < args.duration:
        if args.verbose:
            print(
                f"[status] produced={metrics['produced']} "
                f"consumed={metrics['consumed']} "
                f"dropped={metrics['dropped']} "
                f"queue={event_q.qsize()} "
                f"max_queue={metrics['max_queue']}"
            )
        time.sleep(1.0)
except KeyboardInterrupt:
    print("\n[main] Ctrl-C: stopping...")
finally:
    stop_flag["stop"] = True
    producer_t.join()
    consumer_t.join()

Παρόμοια designs που κάνουν το ίδιο πράγμα είναι αποδεκτά, αρκεί η συμπεριφορά να είναι σωστή.

Command-line interface

Το main program σας πρέπει να δέχεται arguments όπως τα παρακάτω:

python run_pipeline.py \
  --device-id pir-01 \
  --pin 18 \
  --sample-interval 0.1 \
  --cooldown 5 \
  --min-high 0.2 \
  --queue-size 100 \
  --consumer-delay 0.0 \
  --duration 60 \
  --out motion_pipeline.jsonl \
  --verbose

Συμπεριλάβετε τουλάχιστον τα --device-id, --pin, --sample-interval, --cooldown, --min-high, --queue-size, --consumer-delay, --duration, --out, και --verbose.

Εκτελέστε το pipeline

Θυμηθείτε ότι εισαγάγαμε sample interval και cooldown στον κώδικα του προηγούμενου lab, οπότε βεβαιωθείτε ότι χρησιμοποιείτε τιμές που θα οδηγούν σε αρκετά γρήγορο sampling ώστε να είναι ορατό.

Μόλις ο κώδικας είναι έτοιμος, εκτελέστε το pipeline χωρίς artificial slowdown:

python run_pipeline.py \
  ...
  --consumer-delay 0.0 \
  ...

Προκαλέστε motion αρκετές φορές και επαληθεύστε ότι το output file περιέχει έγκυρα JSONL records, ότι κάθε record έχει και event_time και ingest_time, ότι το pipeline_latency_ms υπάρχει, και ότι οι terminal status lines δεν αναμειγνύονται μέσα στο JSONL file.

Αν το --verbose είναι ενεργοποιημένο, εκτυπώστε περιοδικά status lines όπως:

[status] produced=12 consumed=12 dropped=0 queue=0 max_queue=2

Εκτελέστε το slow-consumer experiment

Αφού το κανονικό run λειτουργήσει, επαναλάβετε το experiment με ένα artificial consumer delay:

python run_pipeline.py \
  ...
  --consumer-delay 0.5 \
  ...  

προσπαθήστε να το τροποποιήσετε αυτό για να παρατηρήσετε τα effects του.

Αυτό το delay προσομοιώνει slow storage, slow downstream processing ή temporary overload. Παρατηρήστε τι συμβαίνει στο queue length, στα produced και consumed counts, στα dropped records, και στο pipeline_latency_ms.

Environment και dependencies

Ακολουθήστε την ίδια διαδικασία setup όπως στα προηγούμενα labs και βεβαιωθείτε ότι το requirements.txt περιλαμβάνει κάθε external library από το οποίο εξαρτάται η implementation σας.

Γιατί αυτό το lab είναι δομημένο με αυτόν τον τρόπο

Στο Lab 02, το πρόγραμμά σας ήδη διάβαζε sensor input, ερμήνευε signal behavior, δημιουργούσε event records και έγραφε output. Αυτό λειτουργούσε, αλλά όλα συνέβαιναν σε ένα μόνο σημείο. Καθώς τα συστήματα μεγαλώνουν, αυτό το design γίνεται fragile.

Ένα sequential loop όπως

read → interpret → write → read → interpret → write

είναι εύκολο για να ξεκινήσει κανείς, αλλά αν το writing γίνει αργό, τότε και το acquisition γίνεται αργό. Το timing γίνεται λιγότερο προβλέψιμο, το output καθυστερεί και short-lived events μπορεί να χαθούν.

Το concurrent pipeline design είναι διαφορετικό:

producer (read / package)
       queue
consumer (write)

Εδώ το queue λειτουργεί ως buffer ανάμεσα στην acquisition και το output. Ο producer μπορεί να συνεχίσει για λίγο ακόμη ακόμη κι αν ο consumer επιβραδυνθεί.

Theory: queues, backpressure, και pipelines

Μια κεντρική ιδέα σε αυτό το lab είναι το queue (ουρά). Ένα queue αποσυνδέει τον ρυθμό με τον οποίο παράγονται events από τον ρυθμό με τον οποίο καταναλώνονται. Αυτό δεν εξαλείφει το overload, αλλά το κάνει ορατό και διαχειρίσιμο.

Αν ο producer δημιουργεί events πιο γρήγορα από όσο ο consumer μπορεί να τα επεξεργαστεί, το queue μεγαλώνει. Αν αυτή η ασυμφωνία συνεχιστεί, το queue τελικά γεμίζει. Σε εκείνο το σημείο το σύστημα πρέπει να εφαρμόσει ένα policy. Αυτό είναι το backpressure.

Τυπικά backpressure policies περιλαμβάνουν blocking του producer, dropping του νεότερου record ή dropping του παλαιότερου. Σε αυτό το lab θα χρησιμοποιήσετε policy drop-newest επειδή είναι εύκολο να υλοποιηθεί και εύκολο να παρατηρηθεί.

Η διάλεξη εισήγαγε επίσης pipeline phases όπως Collect, Ingest, Compute ή Transform, Store και Consume. Έχετε ήδη υλοποιήσει κάποιες από αυτές τις ιδέες, ακόμη κι αν δεν χρησιμοποιούσατε αυτά τα ονόματα στο Lab 02. Η διαφορά τώρα είναι ότι το pipeline είναι explicit και concurrent (παράλληλο)

Στη διάλεξη είδαμε επίσης τα ETL και ELT:

ETL = Extract → Transform → Load
ELT = Extract → Load → Transform

Το edge pipeline σας είναι πολύ μικρότερο από ένα warehouse-scale system, αλλά οι ίδιες έννοιες εξακολουθούν να ισχύουν. Αυτή τη στιγμή το σύστημά σας ήδη εκτελεί κάποιο transformation (επεξεργασία) πριν γράψει στο δίσκο, οπότε θα πρέπει να σκεφτείτε αν το design σας είναι πιο κοντά σε ETL, ELT ή κάποιο hybrid.

Ερωτήσεις report (αναφοράς)

Απαντήστε στα παρακάτω στο labs/lab03/README.md αφού ολοκληρωθούν η implementation και τα experiments.

Architecture και reuse

RQ1: Ποιες lecture pipeline phases πιστεύετε ότι είχατε ήδη υλοποιήσει στο Lab 02;
RQ2: Ποιο μέρος του κώδικα του Lab 02 επαναχρησιμοποιήσατε άμεσα;
RQ3: Ποιο μέρος χρειάστηκε να προσαρμόσετε για την pipeline αρχιτεκτονική;

Pipeline concepts

RQ4: Με δικά σας λόγια, γιατί είναι χρήσιμο ένα queue ανάμεσα στο acquisition και το writing;
RQ5: Τι είναι το backpressure;
RQ6: Γιατί ένας αργός writer μπορεί να γίνει πρόβλημα data acquisition και όχι μόνο storage;

ETL και transformation

RQ7: Είναι το τρέχον edge pipeline σας πιο κοντά σε ETL ή ELT; Εξηγήστε σύντομα.
RQ8: Ποιο transformation συμβαίνει ήδη πριν γραφτούν τα δεδομένα σας στο disk;
RQ9: Δώστε ένα παράδειγμα transformation που θα μπορούσε να μεταφερθεί αργότερα σε άλλο stage του συστήματος.

Implementation

RQ10: Εξηγήστε την ευθύνη του producer σε μία πρόταση.
RQ11: Εξηγήστε την ευθύνη του consumer σε μία πρόταση.
RQ12: Δείξτε δύο example JSONL records από το output σας και εξηγήστε σύντομα τα πεδία τους.
RQ13: Τι σημαίνει το pipeline_latency_ms στο σύστημά σας;

Experimental observations

RQ14: Τι άλλαξε όταν εισαγάγατε --consumer-delay 0.5;
RQ15: Απορρόφησε το queue την επιβράδυνση; Εξηγήστε σύντομα χρησιμοποιώντας τις δικές σας παρατηρήσεις.
RQ16: Ποιο είναι ένα σαφές σημάδι, από το terminal status output σας, ότι ο producer τρέχει πιο γρήγορα από τον consumer;
RQ17: Γιατί ένα bounded queue είναι πιο κατατοπιστικό από ένα unbounded queue κατά τη διάρκεια overload;
RQ18: Γιατί τα status μηνύματα πρέπει να παραμένουν στο terminal αντί να αναμειγνύονται στο JSONL file;
RQ19: Ποιο field σας επιτρέπει να ομαδοποιείτε records από την ίδια εκτέλεση του προγράμματος, και γιατί αυτό είναι χρήσιμο;
RQ20: Γιατί ένα unbounded queue θα ήταν επικίνδυνο σε ένα Raspberry Pi;
RQ21: Αν αργότερα αντικαθιστούσατε τον JSONL writer με κάποιο άλλο output component, ποιο μέρος του συστήματός σας θα μπορούσε να παραμείνει σχεδόν αμετάβλητο και γιατί;

Project hint: Smart Wastebin

Καθώς δουλεύετε αυτό το lab, σκεφτείτε πέρα από την απλή ανάγνωση motion sensor που έχουμε κάνει μέχρι τώρα. Στο πλήρες project Smart Wastebin, αυτός ο κώδικας μπορεί να είναι μόνο ένα μικρό μέρος ενός μεγαλύτερου συστήματος. Αυτό σημαίνει ότι πρέπει να αρχίσετε από τώρα να σκέφτεστε modular design.

Προσπαθήστε να διαχωρίσετε sensing, event detection, queueing, logging και decision-making σε σαφή components με καλά ορισμένες αρμοδιότητες. Ένα καλό design διευκολύνει την αντικατάσταση ενός μέρους αργότερα χωρίς να σπάσει το υπόλοιπο σύστημα. Για παράδειγμα, σε ένα project όπως αυτό που κάνουμε εδώ, μπορεί αργότερα να θέλετε να αλλάξετε τον sensor (δεν αλλάζουμε τον sensor αλλά ως παράδειγμα), να προσθέσετε περισσότερα event types, να στείλετε δεδομένα σε άλλο service, ή να συνδέσετε το motion pipeline με higher-level wastebin λογική όπως lid control, fill monitoring ή usage analytics. Αν ο κώδικάς σας παράγει καθαρά, καλά ορισμένα events, το υπόλοιπο σύστημα Smart Wastebin μπορεί να καταναλώνει αυτά τα events χωρίς να χρειάζεται να γνωρίζει πώς συλλέχθηκαν τα sensor data. Με άλλα λόγια, όσο πιο modular είναι το design σας τώρα, τόσο πιο εύκολο θα είναι να το επεκτείνετε, να το ελέγξετε και να το ενσωματώσετε αργότερα.


Τι πρέπει να έχει ολοκληρωθεί πριν φύγετε από το lab

Πριν από το τέλος της συνεδρίας, θα πρέπει να έχετε επαναχρησιμοποιήσει τα sampler.py και interpreter.py, να έχετε δημιουργήσει το run_pipeline.py, να έχετε υλοποιήσει ένα producer thread και ένα consumer thread, να έχετε χρησιμοποιήσει bounded queue, να έχετε χειριστεί queue-full cases με drop counting, να έχετε γράψει έγκυρα JSONL records, να έχετε προσθέσει ingest_time και pipeline_latency_ms, να έχετε εκτελέσει τόσο τις normal όσο και τις slow-consumer cases, να έχετε ενημερώσει το labs/lab03/README.md, και να έχετε κάνει push την εργασία σας στο GitHub.

Τελικό checklist (Lab 03)

  • Η λογική PIR του Lab 02 επαναχρησιμοποιήθηκε μέσα στο labs/lab03/pirlib/
  • bounded queue υλοποιήθηκε στο run_pipeline.py
  • producer και consumer εκτελούνται concurrently
  • η συμπεριφορά queue-full χειρίζεται με drop counting
  • το JSONL output περιέχει μόνο structured records
  • το pipeline_latency_ms προστέθηκε στα records
  • το normal run ολοκληρώθηκε
  • το slow-consumer run ολοκληρώθηκε
  • το labs/lab03/README.md περιέχει code, run steps και report answers
  • το commit και push ολοκληρώθηκαν

Deliverables και submission

Τι πρέπει να υπάρχει στο repository (μέχρι το τέλος του lab)

Το repository σας πρέπει να περιέχει τουλάχιστον:

/
├── README.md
├── labs/
│   ├── lab01/
│   ├── lab02/
│   └── lab03/
│       ├── README.md
│       ├── requirements.txt
│       ├── run_pipeline.py
│       └── pirlib/
│           ├── __init__.py
│           ├── sampler.py
│           └── interpreter.py

Μην συμπεριλάβετε:

  • venv/
  • __pycache__/
  • *.pyc
  • μεγάλα temporary files εκτός αν ζητηθούν ρητά

Τι πρέπει να περιέχει το labs/lab03/README.md

Το labs/lab03/README.md πρέπει να περιέχει δύο σαφώς διαχωρισμένα μέρη:

  1. Code / runbook (Κώδικας/πώς τον τρέχουμε)
  2. Answers to report questions (Απαντήσεις στις ερωτήσεις)

Χρησιμοποιήστε το ίδιο γενικό style που χρησιμοποιήσατε στα προηγούμενα labs.


Τέλος εργαστηριακής συνεδρίας — GitHub checkpoint

Πριν φύγετε από το lab:

  • κάντε commit την πρόοδό σας
  • κάντε push στο team GitHub repository σας

Ελάχιστη απαίτηση:

  • όλα τα απαιτούμενα deliverables παρακολουθούνται από το Git
  • το latest commit έχει γίνει push
  • το commit message είναι σαφές

Πριν από το επόμενο lab — eClass submission

Υποβάλετε και τα δύο από τα παρακάτω:

  1. Code archive (.zip)
  2. PDF export του labs/lab03/README.md

Απαιτούμενο format ονόματος αρχείου PDF:

  • lab03_REPORT_<team>.pdf
Previous