Lab 03 — Concurrent Data Pipelines with Queue Management
Deadlines:
- End of lab session (GitHub checkpoint): commit & push your progress to your team repository.
- Before next lab (eClass submission): upload (1) a
.zipwith your code and (2) a PDF export oflabs/lab03/README.md.
Submission contents:
- (1) a
.zipwith your code, and - (2) a PDF export of
labs/lab03/README.md.
Intro to what you need to code
In this lab you will build a small producer–consumer pipeline for motion events on the Raspberry Pi. The goal is to stop treating acquisition, interpretation, and storage as one long sequential loop and instead separate them into components with clear responsibilities.
You will reuse the PIR logic from Lab 02, but this time you will place it inside a pipeline with a bounded queue between the producer and the consumer. One side of the system will detect motion events and enqueue them. The other side will dequeue them, enrich them, and write them to JSON Lines output.
By the end of the exercise, your system should follow this flow:
PIR sensor
↓
sampler / interpreter
↓
producer thread
↓
bounded queue
↓
consumer thread
↓
JSONL writer
↓
JSONL event log
This lab is about both the architecture and its implementation. The PIR sensor and the code you created to read it in lab 2 can be the same. What changes is the way you organize the software around it.
Do not start from zero. Reuse your sampler.py and interpreter.py from Lab 02, move them into labs/lab03/pirlib/, and build the new pipeline around them.
Create the following structure:
/
├── README.md
├── labs/
│ ├── lab01/
│ ├── lab02/
│ └── lab03/
│ ├── README.md
│ ├── requirements.txt
│ ├── run_pipeline.py
│ └── pirlib/
│ ├── __init__.py
│ ├── sampler.py
│ └── interpreter.py
Your Lab 03 work should live in labs/lab03/. The new part is not the PIR detection logic itself but the pipeline that re-uses that logic correctly.
Write the code in this order
You will work on run_pipeline.py. In a larger system, the producer and consumer would usually be separate components or services with clear interfaces between them. For this lab, to keep the implementation manageable, you will run them as separate threads inside the same file. This file should create the queue, set up shared state, start the producer and consumer threads, handle command-line arguments, print optional status information, and stop cleanly when the run finishes or the user presses Ctrl-C.
Use a bounded queue created directly in run_pipeline.py:
from queue import Queue
event_q = Queue(maxsize=args.queue_size)
A bounded queue matters because the Raspberry Pi has limited memory and overload should be visible. An unbounded queue can hide problems until the system becomes unstable.
Use a small shared metrics structure and a simple stop flag:
metrics = {
"produced": 0,
"consumed": 0,
"dropped": 0,
"max_queue": 0,
}
stop_flag = {"stop": False}
If you prefer threading.Event (instead of a stop_flag), that is also acceptable.
Next, add a helper for UTC timestamps. Use the same helper everywhere so your records stay consistent:
from datetime import datetime, timezone
def utc_now_iso() -> str:
return (
datetime.now(timezone.utc)
.isoformat(timespec="milliseconds")
.replace("+00:00", "Z")
)
Build the producer
The producer thread should read the PIR state, pass the raw signal to the interpreter, build structured event records, and try to place them into the queue. If the queue is full, the record should be dropped and the dropped counter should increase.
A suitable producer loop might look like this (beware I am providing you pseudocode for this dont copy paste it):
from queue import Full
import time
import uuid
WHAT FOLLOWS IS PSEUDOCODE!
Set run_id to a new unique identifier
Set seq to 0
While stop flag is not set:
Get the current time
Read one sample from the sampler
Update the interpreter with the sample and current time
For each event returned by the interpreter:
Increase seq by 1
Create an event record containing:
the current UTC timestamp
the device identifier
the event type as "motion"
the motion state as "detected"
the sequence number
the run identifier
Try to add the record to the queue without waiting
If successful:
Increase the produced counter
If the queue is full:
Increase the dropped counter
Sleep for the sample interval
This lab uses a drop-newest policy. In practice that means the producer calls put_nowait(...); if the queue is full, the newest record is discarded. This is simple, visible, and easy to explain.
Build the consumer
The consumer thread should wait for records, dequeue them, add ingest_time, calculate pipeline_latency_ms, and write valid JSONL output. It may also simulate a slowdown if consumer_delay is greater than zero.
Use this helper for parsing UTC timestamps:
from datetime import datetime
def parse_iso_utc(s: str):
return datetime.fromisoformat(s.replace("Z", "+00:00"))
A suitable consumer loop looks like this (beware I am providing you pseudocode for this dont copy paste it):
from queue import Empty
import json
import time
WHAT FOLLOWS IS PSEUDOCODE!
Open the output file in append mode
While the stop flag is not set or the queue is not empty:
Try to remove one record from the queue, waiting up to 0.5 seconds
If no record is available:
continue to the next loop iteration
Add the current UTC time as the ingest time
Parse the event time
Parse the ingest time
Compute pipeline latency in milliseconds
Store the latency in the record
Write the record to the output file as one JSON line
Flush the file so the data is written immediately
Increase the consumed counter by 1
Update the maximum queue size seen so far
Mark the queue item as done
If a consumer delay is configured:
sleep for that delay
The consumer must continue until stop has been requested and the queue has been drained. That is why the loop condition checks both the stop flag and the queue state.
Remember that JSONL means one valid JSON object per line. Status messages belong in the terminal, not in the output file.
Required record fields
Each output record must contain these fields:
event_time, ingest_time, device_id, event_type, motion_state, seq, run_id, and pipeline_latency_ms.
A typical record will look like this:
record = {
"event_time": utc_now_iso(),
"device_id": device_id,
"event_type": "motion",
"motion_state": "detected",
"seq": seq,
"run_id": run_id,
}
The consumer then adds ingest_time and pipeline_latency_ms before writing the line to disk.
Start the threads and stop cleanly
Create one producer thread and one consumer thread in run_pipeline.py.
import threading
producer_t = threading.Thread(
target=producer_loop,
args=(event_q, sampler, interp, args, metrics, stop_flag),
daemon=True,
)
consumer_t = threading.Thread(
target=consumer_loop,
args=(event_q, args.out, args, metrics, stop_flag),
daemon=True,
)
producer_t.start()
consumer_t.start()
A clean shutdown should work like this: start both threads, let the pipeline run until duration expires or Ctrl-C is pressed, then set the stop flag and wait for both threads to finish.
Since I am not sure of your familiarity with parallel processes/threads one hint is:
start_t = time.time()
try:
while (time.time() - start_t) < args.duration:
if args.verbose:
print(
f"[status] produced={metrics['produced']} "
f"consumed={metrics['consumed']} "
f"dropped={metrics['dropped']} "
f"queue={event_q.qsize()} "
f"max_queue={metrics['max_queue']}"
)
time.sleep(1.0)
except KeyboardInterrupt:
print("\n[main] Ctrl-C: stopping...")
finally:
stop_flag["stop"] = True
producer_t.join()
consumer_t.join()
Equivalent designs are acceptable as long as the behavior is correct.
Command-line interface
Your main program should accept arguments such as these:
python run_pipeline.py \
--device-id pir-01 \
--pin 18 \
--sample-interval 0.1 \
--cooldown 5 \
--min-high 0.2 \
--queue-size 100 \
--consumer-delay 0.0 \
--duration 60 \
--out motion_pipeline.jsonl \
--verbose
Include at least --device-id, --pin, --sample-interval, --cooldown, --min-high, --queue-size, --consumer-delay, --duration, --out, and --verbose.
Run the pipeline
Remember that we introduced sample interval and cooldown in the code of the previous lab, so make sure to use values that would lead to sampling fast enough to see.
Once the code is in place, run the pipeline without artificial slowdown:
python run_pipeline.py \
...
--consumer-delay 0.0 \
...
Trigger motion several times and verify that the output file contains valid JSONL records, that each record has both event_time and ingest_time, that pipeline_latency_ms is present, and that terminal status lines are not mixed into the JSONL file.
If --verbose is enabled, print periodic status lines such as:
[status] produced=12 consumed=12 dropped=0 queue=0 max_queue=2
Run the slow-consumer experiment
After the normal run works, repeat the experiment with an artificial consumer delay:
python run_pipeline.py \
...
--consumer-delay 0.5 \
...
try to modify this to check out its effects.
This delay simulates slow storage, slow downstream processing, or temporary overload. Watch what happens to queue length, produced and consumed counts, dropped records, and pipeline_latency_ms.
Environment and dependencies
Follow the same setup process as in the previous labs and make sure requirements.txt includes every external library your implementation depends on.
Why this lab is structured this way
In Lab 02, your program already read sensor input, interpreted signal behavior, created event records, and wrote output. That worked, but everything happened in one place. As systems grow, that design becomes fragile.
A sequential loop such as
read → interpret → write → read → interpret → write
is easy to start with, but if writing becomes slow, acquisition also becomes slow. Timing becomes less predictable, output is delayed, and short-lived events may be missed.
The pipeline design is different:
producer (read / package)
↓
queue
↓
consumer (write)
Here the queue acts as a buffer between acquisition and output. The producer can continue for a while even if the consumer slows down.
Theory: queues, backpressure, and pipelines
A central idea in this lab is the queue. A queue decouples the rate at which events are produced from the rate at which they are consumed. This does not remove overload, but it makes overload visible and manageable.
If the producer generates events faster than the consumer can process them, the queue grows. If that mismatch continues, the queue eventually becomes full. At that point the system must apply a policy. This is backpressure.
Typical backpressure policies include blocking the producer, dropping the newest record, or dropping the oldest one. In this lab you will use a drop-newest policy because it is easy to implement and easy to observe.
The lecture also introduced pipeline phases such as Collect, Ingest, Compute or Transform, Store, and Consume. You have already implemented some of these ideas, even if you did not use those names in Lab 02. The difference now is that the pipeline is explicit and concurrent.
The lecture also discussed ETL and ELT:
ETL = Extract → Transform → Load
ELT = Extract → Load → Transform
Your edge pipeline is much smaller than a warehouse-scale system, but the same concepts still apply. Right now your system already performs some transformation before writing to disk, so you should think about whether your design is closer to ETL, ELT, or a hybrid.
Report questions
Answer the following in your labs/lab03/README.md after the implementation and experiments are complete.
Architecture and reuse
RQ1: Which lecture pipeline phases do you believe you had already implemented in Lab 02?
RQ2: Which part of your Lab 02 code did you reuse directly?
RQ3: Which part did you have to adapt for the pipeline architecture?
Pipeline concepts
RQ4: In your own words, why is a queue useful between acquisition and writing?
RQ5: What is backpressure?
RQ6: Why can a slow writer become a data acquisition problem and not just a storage problem?
ETL and transformation
RQ7: Is your current edge pipeline closer to ETL or ELT? Explain briefly.
RQ8: What transformation already happens before your data is written to disk?
RQ9: Give one example of a transformation that could be moved later to another stage of the system.
Implementation
RQ10: Explain the responsibility of the producer in one sentence.
RQ11: Explain the responsibility of the consumer in one sentence.
RQ12: Show two example JSONL records from your output and explain their fields briefly.
RQ13: What does pipeline_latency_ms mean in your system?
Experimental observations
RQ14: What changed when you introduced --consumer-delay 0.5?
RQ15: Did the queue absorb the slowdown? Explain briefly using your own observations.
RQ16: What is one clear sign, from your terminal status output, that the producer is outrunning the consumer?
RQ17: Why is a bounded queue more informative than an unbounded queue during overload?
RQ18: Why should status lines stay in the terminal instead of being mixed into the JSONL file?
RQ19: Which field lets you group records from the same program execution, and why is that useful?
RQ20: Why would an unbounded queue be dangerous on a Raspberry Pi?
RQ21: If you later replaced the JSONL writer with another output component, which part of your system could stay almost unchanged and why?
Project hint: Smart Wastebin
As you work through this lab, think beyond the simple motion sensor reading that we have done so far. In the full Smart Wastebin project, this code could be only one small part of a larger system. That means you should start thinking now about modular design.
Try to separate sensing, event detection, queueing, logging, and decision-making into clear components with well-defined responsibilities. A good design makes it easier to replace one part later without breaking the rest of the system. For example, in a project such as the one we do here, you may later want to change the sensor (we are not changing the sensor but as an example), add more event types, send data to another service, or connect the motion pipeline to higher-level wastebin logic such as lid control, fill monitoring, or usage analytics. If your code produces clean, well-defined events, the rest of the Smart Wastebin system can consume those events without needing to know how the sensor data was collected. In other words, the more modular your design is now, the easier it will be to extend, test, and integrate later.
What should be finished before you leave the lab
Before the end of the session, you should have reused sampler.py and interpreter.py, created run_pipeline.py, implemented one producer thread and one consumer thread, used a bounded queue, handled queue-full cases with drop counting, written valid JSONL records, added ingest_time and pipeline_latency_ms, run both the normal and slow-consumer cases, updated labs/lab03/README.md, and pushed your work to GitHub.
Final checklist (Lab 03)
- Lab 02 PIR logic reused inside
labs/lab03/pirlib/ - bounded queue implemented in
run_pipeline.py - producer and consumer run concurrently
- queue-full behavior handled with drop counting
- JSONL output contains structured records only
-
pipeline_latency_msadded to records - normal run completed
- slow-consumer run completed
-
labs/lab03/README.mdcontains code, run steps, and report answers - commit and push completed
Deliverables and submission
What must exist in the repository (by end of lab)
Your repository must contain at least:
/
├── README.md
├── labs/
│ ├── lab01/
│ ├── lab02/
│ └── lab03/
│ ├── README.md
│ ├── requirements.txt
│ ├── run_pipeline.py
│ └── pirlib/
│ ├── __init__.py
│ ├── sampler.py
│ └── interpreter.py
Do not include:
venv/__pycache__/*.pyc- large temporary files unless explicitly requested
What labs/lab03/README.md must contain
Your labs/lab03/README.md must contain two clearly separated parts:
- Code / runbook
- Answers to report questions
Use the same general style you used in previous labs.
End of lab session — GitHub checkpoint
Before leaving the lab:
- commit your progress
- push to your team GitHub repository
Minimum expectation:
- all required deliverables are tracked by Git
- latest commit is pushed
- commit message is clear and meaningful
Before next lab — eClass submission
Submit both of the following:
- Code archive (
.zip) - PDF export of
labs/lab03/README.md
Required PDF filename format:
lab03_REPORT_<team>.pdf
What follows is a greek version of the same lab
Lab 03 — Concurrent Data Pipelines with Queue Management
Προθεσμίες:
- Τέλος της εργαστηριακής συνεδρίας (GitHub checkpoint): κάντε commit & push την πρόοδό σας στο team repository σας.
- Πριν από το επόμενο lab (eClass submission): ανεβάστε (1) ένα
.zipμε τον κώδικά σας και (2) ένα PDF export τουlabs/lab03/README.md.
Περιεχόμενα υποβολής:
- (1) ένα
.zipμε τον κώδικά σας, και - (2) ένα PDF export του
labs/lab03/README.md.
Εισαγωγή σε αυτό που πρέπει να κωδικοποιήσετε
Σε αυτό το lab θα υλοποιήσετε ένα μικρό producer–consumer pipeline για motion events στο Raspberry Pi. Ο στόχος είναι να σταματήσετε να αντιμετωπίζετε την acquisition, interpretation και storage ως έναν μεγάλο sequential loop και αντί γι’ αυτό να τα διαχωρίσετε σε components με σαφείς αρμοδιότητες.
Θα επαναχρησιμοποιήσετε τη λογική PIR από το Lab 02, αλλά αυτή τη φορά θα την τοποθετήσετε μέσα σε ένα pipeline με ένα bounded queue ανάμεσα στον producer και τον consumer. Η μία πλευρά του συστήματος θα ανιχνεύει motion events και θα τα κάνει enqueue. Η άλλη πλευρά θα τα κάνει dequeue, θα τα εμπλουτίζει και θα τα γράφει σε JSON Lines output.
Μέχρι το τέλος της άσκησης, το σύστημά σας θα πρέπει να ακολουθεί την εξής ροή:
PIR sensor
↓
sampler / interpreter
↓
producer thread
↓
bounded queue
↓
consumer thread
↓
JSONL writer
↓
JSONL event log
Αυτό το lab αφορά την αρχιτεκτονική καθώς και την υλοποίηση της. Ο PIR sensor και ο κώδικας που δημιουργήσατε για να τον διαβάζετε στο lab 2 μπορούν να παραμείνουν ίδιοι. Αυτό που αλλάζει είναι ο τρόπος με τον οποίο οργανώνετε το software γύρω από αυτόν.
Μην ξεκινήσετε από το μηδέν. Επαναχρησιμοποιήστε τα sampler.py και interpreter.py από το Lab 02, μετακινήστε τα στο labs/lab03/pirlib/, και χτίστε το νέο pipeline γύρω από αυτά.
Δημιουργήστε την ακόλουθη δομή:
/
├── README.md
├── labs/
│ ├── lab01/
│ ├── lab02/
│ └── lab03/
│ ├── README.md
│ ├── requirements.txt
│ ├── run_pipeline.py
│ └── pirlib/
│ ├── __init__.py
│ ├── sampler.py
│ └── interpreter.py
Η εργασία σας για το Lab 03 πρέπει να βρίσκεται στο labs/lab03/. Το νέο κομμάτι δεν είναι η ίδια η λογική PIR detection, αλλά το pipeline που επαναχρησιμοποιεί σωστά αυτή τη λογική.
Γράψτε τον κώδικα με αυτή τη σειρά
Θα δουλέψετε στο run_pipeline.py. Σε ένα μεγαλύτερο σύστημα, ο producer και ο consumer θα ήταν συνήθως ξεχωριστά components ή services με σαφή interfaces μεταξύ τους. Για αυτό το lab, ώστε η υλοποίηση να παραμείνει διαχειρίσιμη, θα τους εκτελέσετε ως ξεχωριστά threads μέσα στο ίδιο αρχείο. Αυτό το αρχείο πρέπει να δημιουργεί το queue, να ρυθμίζει το shared state, να ξεκινά τα producer και consumer threads, να χειρίζεται command-line arguments, να εκτυπώνει προαιρετικές status πληροφορίες και να σταματά καθαρά όταν ολοκληρώνεται η εκτέλεση ή όταν ο χρήστης πατήσει Ctrl-C.
Χρησιμοποιήστε ένα bounded queue που δημιουργείται απευθείας μέσα στο run_pipeline.py:
from queue import Queue
event_q = Queue(maxsize=args.queue_size)
Ένα bounded queue είναι σημαντικό επειδή το Raspberry Pi έχει περιορισμένη μνήμη και το overload πρέπει να είναι ορατό. Ένα unbounded queue μπορεί να κρύψει προβλήματα μέχρι το σύστημα να γίνει ασταθές.
Χρησιμοποιήστε μια μικρή shared metrics structure και ένα απλό stop flag:
metrics = {
"produced": 0,
"consumed": 0,
"dropped": 0,
"max_queue": 0,
}
stop_flag = {"stop": False}
Αν προτιμάτε threading.Event (αντί για stop_flag), αυτό είναι επίσης αποδεκτό.
Στη συνέχεια, προσθέστε ένα helper για UTC timestamps. Χρησιμοποιήστε το ίδιο helper παντού ώστε τα records σας να παραμένουν συνεπή:
from datetime import datetime, timezone
def utc_now_iso() -> str:
return (
datetime.now(timezone.utc)
.isoformat(timespec="milliseconds")
.replace("+00:00", "Z")
)
Δημιουργήστε τον producer
Το producer thread πρέπει να διαβάζει το PIR state, να περνά το raw signal στον interpreter, να δημιουργεί structured event records και να προσπαθεί να τα τοποθετήσει στο queue. Αν το queue είναι full, το record πρέπει να απορρίπτεται και ο dropped counter να αυξάνεται.
Ένα κατάλληλο producer loop θα μπορούσε να μοιάζει ως εξής (προσοχή: σας δίνω pseudocode για αυτό, μην το κάνετε copy paste):
from queue import Full
import time
import uuid
WHAT FOLLOWS IS PSEUDOCODE!
Set run_id to a new unique identifier
Set seq to 0
While stop flag is not set:
Get the current time
Read one sample from the sampler
Update the interpreter with the sample and current time
For each event returned by the interpreter:
Increase seq by 1
Create an event record containing:
the current UTC timestamp
the device identifier
the event type as "motion"
the motion state as "detected"
the sequence number
the run identifier
Try to add the record to the queue without waiting
If successful:
Increase the produced counter
If the queue is full:
Increase the dropped counter
Sleep for the sample interval
Αυτό το lab χρησιμοποιεί policy drop-newest. Στην πράξη αυτό σημαίνει ότι ο producer καλεί put_nowait(...); αν το queue είναι full, το νεότερο record απορρίπτεται. Αυτό είναι απλό, ορατό και εύκολο να εξηγηθεί.
Δημιουργήστε τον consumer
Το consumer thread πρέπει να περιμένει records, να τα κάνει dequeue, να προσθέτει ingest_time, να υπολογίζει pipeline_latency_ms και να γράφει έγκυρο JSONL output. Μπορεί επίσης να προσομοιώνει slowdown αν το consumer_delay είναι μεγαλύτερο από 0.
Χρησιμοποιήστε αυτό το helper για parsing UTC timestamps:
from datetime import datetime
def parse_iso_utc(s: str):
return datetime.fromisoformat(s.replace("Z", "+00:00"))
Ένα κατάλληλο consumer loop μοιάζει ως εξής (προσοχή: σας δίνω pseudocode για αυτό, μην το κάνετε copy paste):
from queue import Empty
import json
import time
WHAT FOLLOWS IS PSEUDOCODE!
Open the output file in append mode
While the stop flag is not set or the queue is not empty:
Try to remove one record from the queue, waiting up to 0.5 seconds
If no record is available:
continue to the next loop iteration
Add the current UTC time as the ingest time
Parse the event time
Parse the ingest time
Compute pipeline latency in milliseconds
Store the latency in the record
Write the record to the output file as one JSON line
Flush the file so the data is written immediately
Increase the consumed counter by 1
Update the maximum queue size seen so far
Mark the queue item as done
If a consumer delay is configured:
sleep for that delay
Ο consumer πρέπει να συνεχίσει μέχρι να ζητηθεί stop και να έχει αδειάσει το queue. Γι’ αυτό η συνθήκη του loop ελέγχει τόσο το stop flag όσο και την κατάσταση του queue.
Θυμηθείτε ότι JSONL σημαίνει ένα έγκυρο JSON object ανά γραμμή. Τα status messages ανήκουν στο terminal, όχι στο output file.
Απαιτούμενα record fields
Κάθε output record πρέπει να περιέχει τα εξής fields:
event_time, ingest_time, device_id, event_type, motion_state, seq, run_id, και pipeline_latency_ms.
Ένα τυπικό record θα μοιάζει ως εξής:
record = {
"event_time": utc_now_iso(),
"device_id": device_id,
"event_type": "motion",
"motion_state": "detected",
"seq": seq,
"run_id": run_id,
}
Στη συνέχεια ο consumer προσθέτει τα ingest_time και pipeline_latency_ms πριν γράψει τη γραμμή στο disk.
Ξεκινήστε τα threads και σταματήστε καθαρά
Δημιουργήστε ένα producer thread και ένα consumer thread στο run_pipeline.py.
import threading
producer_t = threading.Thread(
target=producer_loop,
args=(event_q, sampler, interp, args, metrics, stop_flag),
daemon=True,
)
consumer_t = threading.Thread(
target=consumer_loop,
args=(event_q, args.out, args, metrics, stop_flag),
daemon=True,
)
producer_t.start()
consumer_t.start()
Ένα clean shutdown πρέπει να λειτουργεί ως εξής: ξεκινήστε και τα δύο threads, αφήστε το pipeline να τρέχει μέχρι να λήξει το duration ή να πατηθεί Ctrl-C, μετά θέστε το stop flag και περιμένετε να ολοκληρωθούν και τα δύο threads.
Επειδή δεν είμαι βέβαιος για την εξοικείωσή σας με parallel processes/threads, ένα hint είναι:
start_t = time.time()
try:
while (time.time() - start_t) < args.duration:
if args.verbose:
print(
f"[status] produced={metrics['produced']} "
f"consumed={metrics['consumed']} "
f"dropped={metrics['dropped']} "
f"queue={event_q.qsize()} "
f"max_queue={metrics['max_queue']}"
)
time.sleep(1.0)
except KeyboardInterrupt:
print("\n[main] Ctrl-C: stopping...")
finally:
stop_flag["stop"] = True
producer_t.join()
consumer_t.join()
Παρόμοια designs που κάνουν το ίδιο πράγμα είναι αποδεκτά, αρκεί η συμπεριφορά να είναι σωστή.
Command-line interface
Το main program σας πρέπει να δέχεται arguments όπως τα παρακάτω:
python run_pipeline.py \
--device-id pir-01 \
--pin 18 \
--sample-interval 0.1 \
--cooldown 5 \
--min-high 0.2 \
--queue-size 100 \
--consumer-delay 0.0 \
--duration 60 \
--out motion_pipeline.jsonl \
--verbose
Συμπεριλάβετε τουλάχιστον τα --device-id, --pin, --sample-interval, --cooldown, --min-high, --queue-size, --consumer-delay, --duration, --out, και --verbose.
Εκτελέστε το pipeline
Θυμηθείτε ότι εισαγάγαμε sample interval και cooldown στον κώδικα του προηγούμενου lab, οπότε βεβαιωθείτε ότι χρησιμοποιείτε τιμές που θα οδηγούν σε αρκετά γρήγορο sampling ώστε να είναι ορατό.
Μόλις ο κώδικας είναι έτοιμος, εκτελέστε το pipeline χωρίς artificial slowdown:
python run_pipeline.py \
...
--consumer-delay 0.0 \
...
Προκαλέστε motion αρκετές φορές και επαληθεύστε ότι το output file περιέχει έγκυρα JSONL records, ότι κάθε record έχει και event_time και ingest_time, ότι το pipeline_latency_ms υπάρχει, και ότι οι terminal status lines δεν αναμειγνύονται μέσα στο JSONL file.
Αν το --verbose είναι ενεργοποιημένο, εκτυπώστε περιοδικά status lines όπως:
[status] produced=12 consumed=12 dropped=0 queue=0 max_queue=2
Εκτελέστε το slow-consumer experiment
Αφού το κανονικό run λειτουργήσει, επαναλάβετε το experiment με ένα artificial consumer delay:
python run_pipeline.py \
...
--consumer-delay 0.5 \
...
προσπαθήστε να το τροποποιήσετε αυτό για να παρατηρήσετε τα effects του.
Αυτό το delay προσομοιώνει slow storage, slow downstream processing ή temporary overload. Παρατηρήστε τι συμβαίνει στο queue length, στα produced και consumed counts, στα dropped records, και στο pipeline_latency_ms.
Environment και dependencies
Ακολουθήστε την ίδια διαδικασία setup όπως στα προηγούμενα labs και βεβαιωθείτε ότι το requirements.txt περιλαμβάνει κάθε external library από το οποίο εξαρτάται η implementation σας.
Γιατί αυτό το lab είναι δομημένο με αυτόν τον τρόπο
Στο Lab 02, το πρόγραμμά σας ήδη διάβαζε sensor input, ερμήνευε signal behavior, δημιουργούσε event records και έγραφε output. Αυτό λειτουργούσε, αλλά όλα συνέβαιναν σε ένα μόνο σημείο. Καθώς τα συστήματα μεγαλώνουν, αυτό το design γίνεται fragile.
Ένα sequential loop όπως
read → interpret → write → read → interpret → write
είναι εύκολο για να ξεκινήσει κανείς, αλλά αν το writing γίνει αργό, τότε και το acquisition γίνεται αργό. Το timing γίνεται λιγότερο προβλέψιμο, το output καθυστερεί και short-lived events μπορεί να χαθούν.
Το concurrent pipeline design είναι διαφορετικό:
producer (read / package)
↓
queue
↓
consumer (write)
Εδώ το queue λειτουργεί ως buffer ανάμεσα στην acquisition και το output. Ο producer μπορεί να συνεχίσει για λίγο ακόμη ακόμη κι αν ο consumer επιβραδυνθεί.
Theory: queues, backpressure, και pipelines
Μια κεντρική ιδέα σε αυτό το lab είναι το queue (ουρά). Ένα queue αποσυνδέει τον ρυθμό με τον οποίο παράγονται events από τον ρυθμό με τον οποίο καταναλώνονται. Αυτό δεν εξαλείφει το overload, αλλά το κάνει ορατό και διαχειρίσιμο.
Αν ο producer δημιουργεί events πιο γρήγορα από όσο ο consumer μπορεί να τα επεξεργαστεί, το queue μεγαλώνει. Αν αυτή η ασυμφωνία συνεχιστεί, το queue τελικά γεμίζει. Σε εκείνο το σημείο το σύστημα πρέπει να εφαρμόσει ένα policy. Αυτό είναι το backpressure.
Τυπικά backpressure policies περιλαμβάνουν blocking του producer, dropping του νεότερου record ή dropping του παλαιότερου. Σε αυτό το lab θα χρησιμοποιήσετε policy drop-newest επειδή είναι εύκολο να υλοποιηθεί και εύκολο να παρατηρηθεί.
Η διάλεξη εισήγαγε επίσης pipeline phases όπως Collect, Ingest, Compute ή Transform, Store και Consume. Έχετε ήδη υλοποιήσει κάποιες από αυτές τις ιδέες, ακόμη κι αν δεν χρησιμοποιούσατε αυτά τα ονόματα στο Lab 02. Η διαφορά τώρα είναι ότι το pipeline είναι explicit και concurrent (παράλληλο)
Στη διάλεξη είδαμε επίσης τα ETL και ELT:
ETL = Extract → Transform → Load
ELT = Extract → Load → Transform
Το edge pipeline σας είναι πολύ μικρότερο από ένα warehouse-scale system, αλλά οι ίδιες έννοιες εξακολουθούν να ισχύουν. Αυτή τη στιγμή το σύστημά σας ήδη εκτελεί κάποιο transformation (επεξεργασία) πριν γράψει στο δίσκο, οπότε θα πρέπει να σκεφτείτε αν το design σας είναι πιο κοντά σε ETL, ELT ή κάποιο hybrid.
Ερωτήσεις report (αναφοράς)
Απαντήστε στα παρακάτω στο labs/lab03/README.md αφού ολοκληρωθούν η implementation και τα experiments.
Architecture και reuse
RQ1: Ποιες lecture pipeline phases πιστεύετε ότι είχατε ήδη υλοποιήσει στο Lab 02;
RQ2: Ποιο μέρος του κώδικα του Lab 02 επαναχρησιμοποιήσατε άμεσα;
RQ3: Ποιο μέρος χρειάστηκε να προσαρμόσετε για την pipeline αρχιτεκτονική;
Pipeline concepts
RQ4: Με δικά σας λόγια, γιατί είναι χρήσιμο ένα queue ανάμεσα στο acquisition και το writing;
RQ5: Τι είναι το backpressure;
RQ6: Γιατί ένας αργός writer μπορεί να γίνει πρόβλημα data acquisition και όχι μόνο storage;
ETL και transformation
RQ7: Είναι το τρέχον edge pipeline σας πιο κοντά σε ETL ή ELT; Εξηγήστε σύντομα.
RQ8: Ποιο transformation συμβαίνει ήδη πριν γραφτούν τα δεδομένα σας στο disk;
RQ9: Δώστε ένα παράδειγμα transformation που θα μπορούσε να μεταφερθεί αργότερα σε άλλο stage του συστήματος.
Implementation
RQ10: Εξηγήστε την ευθύνη του producer σε μία πρόταση.
RQ11: Εξηγήστε την ευθύνη του consumer σε μία πρόταση.
RQ12: Δείξτε δύο example JSONL records από το output σας και εξηγήστε σύντομα τα πεδία τους.
RQ13: Τι σημαίνει το pipeline_latency_ms στο σύστημά σας;
Experimental observations
RQ14: Τι άλλαξε όταν εισαγάγατε --consumer-delay 0.5;
RQ15: Απορρόφησε το queue την επιβράδυνση; Εξηγήστε σύντομα χρησιμοποιώντας τις δικές σας παρατηρήσεις.
RQ16: Ποιο είναι ένα σαφές σημάδι, από το terminal status output σας, ότι ο producer τρέχει πιο γρήγορα από τον consumer;
RQ17: Γιατί ένα bounded queue είναι πιο κατατοπιστικό από ένα unbounded queue κατά τη διάρκεια overload;
RQ18: Γιατί τα status μηνύματα πρέπει να παραμένουν στο terminal αντί να αναμειγνύονται στο JSONL file;
RQ19: Ποιο field σας επιτρέπει να ομαδοποιείτε records από την ίδια εκτέλεση του προγράμματος, και γιατί αυτό είναι χρήσιμο;
RQ20: Γιατί ένα unbounded queue θα ήταν επικίνδυνο σε ένα Raspberry Pi;
RQ21: Αν αργότερα αντικαθιστούσατε τον JSONL writer με κάποιο άλλο output component, ποιο μέρος του συστήματός σας θα μπορούσε να παραμείνει σχεδόν αμετάβλητο και γιατί;
Project hint: Smart Wastebin
Καθώς δουλεύετε αυτό το lab, σκεφτείτε πέρα από την απλή ανάγνωση motion sensor που έχουμε κάνει μέχρι τώρα. Στο πλήρες project Smart Wastebin, αυτός ο κώδικας μπορεί να είναι μόνο ένα μικρό μέρος ενός μεγαλύτερου συστήματος. Αυτό σημαίνει ότι πρέπει να αρχίσετε από τώρα να σκέφτεστε modular design.
Προσπαθήστε να διαχωρίσετε sensing, event detection, queueing, logging και decision-making σε σαφή components με καλά ορισμένες αρμοδιότητες. Ένα καλό design διευκολύνει την αντικατάσταση ενός μέρους αργότερα χωρίς να σπάσει το υπόλοιπο σύστημα. Για παράδειγμα, σε ένα project όπως αυτό που κάνουμε εδώ, μπορεί αργότερα να θέλετε να αλλάξετε τον sensor (δεν αλλάζουμε τον sensor αλλά ως παράδειγμα), να προσθέσετε περισσότερα event types, να στείλετε δεδομένα σε άλλο service, ή να συνδέσετε το motion pipeline με higher-level wastebin λογική όπως lid control, fill monitoring ή usage analytics. Αν ο κώδικάς σας παράγει καθαρά, καλά ορισμένα events, το υπόλοιπο σύστημα Smart Wastebin μπορεί να καταναλώνει αυτά τα events χωρίς να χρειάζεται να γνωρίζει πώς συλλέχθηκαν τα sensor data. Με άλλα λόγια, όσο πιο modular είναι το design σας τώρα, τόσο πιο εύκολο θα είναι να το επεκτείνετε, να το ελέγξετε και να το ενσωματώσετε αργότερα.
Τι πρέπει να έχει ολοκληρωθεί πριν φύγετε από το lab
Πριν από το τέλος της συνεδρίας, θα πρέπει να έχετε επαναχρησιμοποιήσει τα sampler.py και interpreter.py, να έχετε δημιουργήσει το run_pipeline.py, να έχετε υλοποιήσει ένα producer thread και ένα consumer thread, να έχετε χρησιμοποιήσει bounded queue, να έχετε χειριστεί queue-full cases με drop counting, να έχετε γράψει έγκυρα JSONL records, να έχετε προσθέσει ingest_time και pipeline_latency_ms, να έχετε εκτελέσει τόσο τις normal όσο και τις slow-consumer cases, να έχετε ενημερώσει το labs/lab03/README.md, και να έχετε κάνει push την εργασία σας στο GitHub.
Τελικό checklist (Lab 03)
- Η λογική PIR του Lab 02 επαναχρησιμοποιήθηκε μέσα στο
labs/lab03/pirlib/ - bounded queue υλοποιήθηκε στο
run_pipeline.py - producer και consumer εκτελούνται concurrently
- η συμπεριφορά queue-full χειρίζεται με drop counting
- το JSONL output περιέχει μόνο structured records
- το
pipeline_latency_msπροστέθηκε στα records - το normal run ολοκληρώθηκε
- το slow-consumer run ολοκληρώθηκε
- το
labs/lab03/README.mdπεριέχει code, run steps και report answers - το commit και push ολοκληρώθηκαν
Deliverables και submission
Τι πρέπει να υπάρχει στο repository (μέχρι το τέλος του lab)
Το repository σας πρέπει να περιέχει τουλάχιστον:
/
├── README.md
├── labs/
│ ├── lab01/
│ ├── lab02/
│ └── lab03/
│ ├── README.md
│ ├── requirements.txt
│ ├── run_pipeline.py
│ └── pirlib/
│ ├── __init__.py
│ ├── sampler.py
│ └── interpreter.py
Μην συμπεριλάβετε:
venv/__pycache__/*.pyc- μεγάλα temporary files εκτός αν ζητηθούν ρητά
Τι πρέπει να περιέχει το labs/lab03/README.md
Το labs/lab03/README.md πρέπει να περιέχει δύο σαφώς διαχωρισμένα μέρη:
- Code / runbook (Κώδικας/πώς τον τρέχουμε)
- Answers to report questions (Απαντήσεις στις ερωτήσεις)
Χρησιμοποιήστε το ίδιο γενικό style που χρησιμοποιήσατε στα προηγούμενα labs.
Τέλος εργαστηριακής συνεδρίας — GitHub checkpoint
Πριν φύγετε από το lab:
- κάντε commit την πρόοδό σας
- κάντε push στο team GitHub repository σας
Ελάχιστη απαίτηση:
- όλα τα απαιτούμενα deliverables παρακολουθούνται από το Git
- το latest commit έχει γίνει push
- το commit message είναι σαφές
Πριν από το επόμενο lab — eClass submission
Υποβάλετε και τα δύο από τα παρακάτω:
- Code archive (
.zip) - PDF export του
labs/lab03/README.md
Απαιτούμενο format ονόματος αρχείου PDF:
lab03_REPORT_<team>.pdf