Lab: data pipeline
Build a small ETL pipeline — read CSV, validate, transform, group, and serialise to JSON.
- Parse and type-convert raw CSV row data
- Validate records with boolean logic and string methods
- Normalise string fields (strip, upper)
- Group records by a field using defaultdict
- Serialise a summary structure to JSON
Optional lab. You'll connect the four lessons of this module into one small pipeline: read rows, validate them, transform fields, group by city, and serialise the result to JSON. Each checkpoint builds on the previous one.
The data
Imagine you're ingesting a CSV export from a registration system. The raw rows
arrive as dictionaries of strings — exactly what csv.DictReader produces:
Notice the problems: names with whitespace, mixed-case cities, an empty name, and a score over 100. A real pipeline handles these before the data reaches any downstream system.
Checkpoint 1 — parse a row
Write parse_row to convert one raw dict (all strings) into a cleaned,
typed record: strip the name, convert the score to int, and upper-case the
city.
Write parse_row(row) that returns a new dict with: "name" stripped of whitespace, "score" converted to int, and "city" stripped and upper-cased.
parse_row({"name": " Alice ", "score": "91", "city": "london"}) → {"name": "Alice", "score": 91, "city": "LONDON"}Checkpoint 2 — validate a record
After parsing, filter out bad records. Write is_valid_record that returns
True only when the name is non-empty (after stripping) and the score is
between 0 and 100 inclusive.
Write is_valid_record(record) that returns True if record["name"] is non-empty (after stripping) and record["score"] is between 0 and 100 inclusive.
is_valid_record({"name": "Alice", "score": 85}) → Trueis_valid_record({"name": "", "score": 85}) → FalseThe pipeline so far
With both pieces in place, run the transformation and validation together. Verify that two bad rows are dropped:
Four records survive — Alice, Bob, Carol, and Eve.
Checkpoint 3 — group by city
Now group the valid records. Write group_by_city that uses a
collections.defaultdict(list) to bucket each record's name under its city key,
and returns a plain dict.
Write group_by_city(records) that returns a dict mapping each city string to a list of the names of records in that city. Use collections.defaultdict(list).
group_by_city([{"name":"Alice","score":91,"city":"LONDON"},{"name":"Carol","score":85,"city":"LONDON"}]) → {"LONDON": ["Alice", "Carol"]}Checkpoint 4 — serialise the summary
The final step: take the grouped dict and produce a JSON string. Write
serialize_summary that returns json.dumps(...) with sort_keys=True and
indent=2, sorting the names lists first so output is deterministic.
Write serialize_summary(groups) that takes a dict of {city: [names]} and returns a JSON string (indent=2, sort_keys=True). Sort each name list before serialising.
serialize_summary({"LONDON": ["Carol", "Alice"], "PARIS": ["Bob"]}) → {
"LONDON": [
"Alice",
"Carol"
],
"PARIS": [
"Bob"
]
}Putting it all together
Four green checks. Here is the complete pipeline assembled — this is the shape real ETL code takes, just at a larger scale:
This is ETL in miniature: Extract (parse raw strings), Transform (validate, normalise, group), Load (serialise to a target format). The same pattern scales to thousands of rows and richer schemas — only the domain logic inside each step changes.