Lab: database-backed data pipeline
Build an end-to-end pipeline — discover files, validate records, store in SQLite, and aggregate results.
- Parse and validate records with boolean logic
- Bulk-load clean rows into an in-memory SQLite database
- Query and aggregate stored data with parameterised SQL
- Chain pathlib, sqlite3, and any/all into a coherent pipeline
Optional lab. The Data Engineering lessons gave you four tools in isolation. This lab puts them in sequence: the output of one stage is the input to the next. Work through the checkpoints in order — each one builds on the last.
A data pipeline has three stages that appear everywhere, from tiny scripts to production ETL jobs: extract (read raw data), transform (validate and clean), and load (store the result). Here, your CSV lives in memory as a list of dicts, your validator is a boolean predicate, your store is SQLite, and your output is an aggregation query. The wiring is yours to write.
Checkpoint 1 — validate a record
Before loading anything, decide whether a record is worth keeping. A validator
is just a predicate — a function that returns True or False.
Write validate_record(record) that returns True only if: (1) "name" is a non-empty string, and (2) "score" is a number (int or float) that is >= 0. Use all() over a list of conditions.
validate_record({"name": "Alice", "score": 95}) → Truevalidate_record({"name": "", "score": 95}) → FalseNotice that missing name key — record.get("name") returns None, which fails
the isinstance(…, str) check cleanly. That's the kind of defensive composition
all() gives you for free.
Checkpoint 2 — load clean records into SQLite
Now connect the validator to the database. Filter out bad rows before inserting,
use executemany for the bulk load, and return results as tuples.
Write load_records(records) that: (1) filters records using validate_record, (2) creates an in-memory SQLite database with a "scores" table (name TEXT, score REAL), (3) bulk-inserts the valid rows with executemany and ? placeholders, and (4) returns all rows as a list of (name, score) tuples ordered by score DESC.
load_records([{"name": "A", "score": 90}, {"name": "", "score": 80}]) → [('A', 90.0)]The separation of concerns here is deliberate: validate_record knows nothing
about SQLite; load_records knows nothing about what makes a record valid. Each
function does one thing — the architecture lesson's single-responsibility
principle, applied to a pipeline.
Checkpoint 3 — aggregate with SQL
Your data is in the database. Now answer a question: who are the top scorers?
Write a query that uses LIMIT and an ORDER BY to pull the top N rows.
Write top_n(conn, n) that queries a "scores" table (name TEXT, score REAL) and returns the top n rows ordered by score descending as a list of (name, score) tuples. Use a ? placeholder for n.
top_n(conn, 2) → [('A', 90.0), ('B', 80.0)]Putting it all together
Here is the complete pipeline in about fifteen lines — each stage clearly separated and each tool doing its job:
import sqlite3
from pathlib import Path
RAW_RECORDS = [
{"name": "Alice", "score": 95},
{"name": "Bob", "score": 72},
{"name": "", "score": 80}, # will be rejected
{"name": "Carol", "score": 88},
{"name": "David", "score": -5}, # will be rejected
]
def validate_record(r):
return all([
isinstance(r.get("name"), str) and len(r["name"]) > 0,
isinstance(r.get("score"), (int, float)) and r["score"] >= 0,
])
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE scores (name TEXT, score REAL)")
valid = [r for r in RAW_RECORDS if validate_record(r)]
conn.executemany("INSERT INTO scores VALUES (?, ?)",
[(r["name"], r["score"]) for r in valid])
conn.commit()
top = conn.execute(
"SELECT name, score FROM scores ORDER BY score DESC LIMIT ?", (3,)
).fetchall()
print(top) # [('Alice', 95.0), ('Carol', 88.0), ('Bob', 72.0)]
conn.close()Done?
All three checkpoints green? You've built a complete ETL pipeline — the same shape used in production jobs that process millions of rows, just at a smaller scale. The patterns hold: validate at the boundary, insert in bulk with parameters, answer questions with SQL. That completes the Data Engineering module.