Prefect in practice
Define a two-task Prefect flow, run it locally, and observe task states — all in ordinary Python without any external infrastructure.
- Define a Prefect flow with two tasks that pass data between them
- Run the flow locally and observe task states printed to the terminal
- Add retry configuration to a task and confirm it fires on simulated failure
The previous lesson explained Prefect's flow/task model. Here you will write one. Because Prefect is not available in the in-browser runtime, the demo simulates its decorator behaviour using a lightweight wrapper — the structure and output mirror exactly what you would see running real Prefect locally.
Run the snippet and observe:
fetch_datafails on its first attempt and retries automatically.process_datastarts only afterfetch_datasucceeds — Prefect inferred the dependency from the fact thatprocess_datareceivesfetch_data's return value.- The flow prints a completion time and the task state log shows the retry.
Real Prefect code
With Prefect installed (pip install prefect), the real version of this pipeline
is nearly identical:
from prefect import flow, task
import requests
@task(retries=2, retry_delay_seconds=10, log_prints=True)
def fetch_data(url: str) -> list:
response = requests.get(url, timeout=30)
response.raise_for_status()
records = response.json()
print(f"Fetched {len(records)} records")
return records
@task(log_prints=True)
def process_data(records: list) -> list:
filtered = [r for r in records if r["value"] >= 20]
print(f"Filtered to {len(filtered)} records")
return filtered
@flow(name="example-pipeline", log_prints=True)
def pipeline(url: str = "https://api.example.com/data") -> list:
raw = fetch_data(url)
return process_data(raw)
if __name__ == "__main__":
pipeline()log_prints=True routes print() calls to the Prefect logger, making them
visible in the Prefect UI. Run with python pipeline.py or schedule it via
prefect deploy.
Prefect infers task dependencies from data flow. If you want two tasks to run in parallel, call them without passing the output of one to the other — Prefect will submit both immediately and wait for both to complete before proceeding.
Parametrised runs
Because the flow accepts url as a parameter, you can run it against different
endpoints without touching the code:
# CLI (requires prefect deployment)
prefect flow-run create example-pipeline --param url=https://staging.api.example.com/dataOr from Python:
pipeline(url="https://staging.api.example.com/data")This makes testing against staging trivial and production runs auditable — the parameters used for each run are stored in the Prefect run record.
Where to go next
Next: lab — DAG pipeline — express a four-step pipeline as both a Makefile and a Prefect flow, then compare the developer experience of each approach.
Prefect concepts
Prefect wraps your Python functions with flow and task decorators, adding retries, observability, parametrisation, and scheduling without rewriting your pipeline logic.
Lab: DAG pipeline
Express a four-step pipeline as both a Makefile and a Prefect flow, run both, and compare the developer experience of each approach.