Data Orchestration Compared: Airflow vs Dagster vs Prefect — A Modern Stack Guide

python dev.to

data orchestration is the discipline of turning a tangle of ingestion jobs, transformations, machine-learning steps, reverse-ETL pushes, and freshness sensors into one observable, retryable, scheduled graph — and in 2026 the three production-grade choices are Apache Airflow, Dagster, and Prefect. Each one solves the same orchestration problem with a different mental model: Airflow thinks in DAGs and operators, Dagster thinks in software defined assets, and Prefect thinks in Pythonic flows and tasks with sub-flows and dynamic mapping baked in. The choice is not "which tool is best"; it is "which mental model matches my team's pipeline shape, asset literacy, and on-call appetite" — and airflow vs dagster plus dagster vs prefect are the two comparisons every modern data pipeline orchestration review boils down to.

This guide is a deep-dive anatomy comparison built for the engineer who has to defend a tool choice in a design review, migrate a legacy dag scheduler stack onto a newer asset-aware platform, or pick the right airflow alternatives for an ML team that lives in Python. Section by section, we walk the anatomy of each orchestrator — the runtime parts, the developer-facing primitives, and the operational tax — then close with a five-dimension decision matrix plus three worked migration examples (an Airflow DAG ported to a Dagster asset graph, a cron-style Airflow loop ported to a Prefect flow, and a Dagster asset graph translated into a Prefect deployment). Each section follows the same teaching shape: explanation, question, input, code, traced execution, output, and why this works — the same shape interviewers love when they ask you to whiteboard an orchestrator design.

When you want hands-on reps immediately after reading, browse ETL drills →, drill data-validation problems →, sharpen aggregation reconciliation patterns →, reinforce database problems →, rehearse SQL practice →, or widen coverage on the full Python practice library →.


On this page


1. Why data orchestration is its own interview track

data orchestration — a distinct discipline from cron, ETL tools, and pipeline frameworks

The one-sentence invariant: data orchestration is the layer that turns a set of jobs into a graph — with dependencies, retries, schedules, sensors, backfills, and observability — and it is a distinct discipline because the failure modes (skipped runs, partial-state pipelines, silent freshness rot, broken backfills) are graph-shaped, not script-shaped. A senior orchestration engineer is not a generalist scripter who happens to use cron; they think in DAGs, assets, and flows, and they automate dependency-aware retries, partition-aware backfills, and observability hooks as first-class artefacts in the platform.

What interviewers actually score on data pipeline orchestration rounds.

  • Anatomy fluency — can you draw the Airflow runtime (scheduler + executor + webserver + metadata DB) on a whiteboard from memory, then do the same for Dagster (daemon + webserver + sensors + IO managers) and Prefect (Cloud / server + work pools + workers + deployments)?
  • Mental-model literacy — can you explain task-first vs asset-first vs flow-first in one sentence each, and pick the right mental model for a given pipeline?
  • dag scheduler mechanics — what triggers a DAG run; how is scheduling decoupled from execution; what happens when the scheduler crashes mid-run; what is a start_date gotcha?
  • Retry + backfill discipline — given a 30-day backfill that failed on day 12, what do you re-run, and why?
  • Tooling tradeoffs — when would you pick airflow alternatives like Dagster or Prefect, and what are the migration costs?
  • Production-safety patternsidempotency, dead-letter queues, late-arriving data, partitioned assets, sensors vs schedules — can you wire them in the platform of choice?

The 5-dimension comparison map this guide walks through.

  • Dimension 1 — Maturity / ecosystem — Airflow has 10+ years of operators, providers, and managed services (MWAA, Astronomer, Cloud Composer); Dagster and Prefect are growing fast but their plugin libraries are smaller.
  • Dimension 2 — Asset awareness — Dagster is asset-first by construction; Airflow added Datasets as a lightweight asset signal; Prefect handles assets via artifacts and downstream wiring, not as a primary primitive.
  • Dimension 3 — Dynamic flows — Prefect makes dynamic flow generation and sub-flows feel native; Airflow added the TaskFlow API and dynamic task mapping; Dagster supports DynamicOut but the asset model is the more idiomatic path.
  • Dimension 4 — Hosting options — All three offer hosted SaaS (Astronomer / MWAA / Composer; Dagster Cloud; Prefect Cloud) plus open-source self-hosting paths.
  • Dimension 5 — Best for — Airflow excels at cron-style ETL plus large teams; Dagster shines for data-product graphs and lineage; Prefect is the ergonomic winner for Pythonic ML and dynamic API workflows.

Why orchestration is its own track and not a Python round.

  • Schedules are not crons — a data orchestration system has to know what depends on what, not just when to fire — that's the difference between cron and a DAG scheduler.
  • Retries are graph-aware — when task B depends on task A and A fails, you re-run A and only the downstream tasks; cron has no concept of this.
  • Backfills are partition-aware — re-running a 30-day window means filling 30 daily partitions in the right order with the right inputs; a script can't do this without you re-implementing the orchestrator.
  • Observability is structural — a good orchestrator gives you per-task logs, per-DAG SLA monitors, per-asset freshness alerts, and lineage out-of-the-box; you don't bolt that on after.
  • Asset awareness is the senior shift — task-first orchestrators (Airflow's original model) think in jobs; asset-first orchestrators (Dagster) think in tables; the second mental model maps better to data-product teams.

Worked example — same pipeline expressed in three orchestrators

Detailed explanation. Real interviews probe whether you can express the same business pipeline in all three tools. Below is a canonical four-step ETL — fetch_api → validate → load_warehouse → notify — and how it lands in Airflow, Dagster, and Prefect.

Question. Express the same four-step daily ETL pipeline (fetch from API → validate rows → load into the warehouse → notify Slack) as a minimal pipeline definition in each of Airflow, Dagster, and Prefect. Highlight the shape difference (task graph vs asset graph vs flow).

Input. A scheduled-daily pipeline that hits a REST endpoint, validates 1k–10k rows in memory, loads them into warehouse.fact_events, and posts a Slack message.

Code.

# Airflow — task-first DAG (Airflow 2.x TaskFlow API)
from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule="@daily", start_date=datetime(2026, 5, 1), catchup=False)
def daily_etl():
    @task
    def fetch_api():
        return {"rows": [{"id": i} for i in range(1000)]}
    @task
    def validate(payload):
        return [r for r in payload["rows"] if r["id"] is not None]
    @task
    def load_warehouse(rows):
        # INSERT INTO warehouse.fact_events ...
        return len(rows)
    @task
    def notify(n):
        # Slack post: f"Loaded {n} rows"
        return "ok"
    notify(load_warehouse(validate(fetch_api())))

daily_etl()
Enter fullscreen mode Exit fullscreen mode
# Dagster — asset-first graph
from dagster import asset, AssetExecutionContext, Definitions

@asset
def raw_events() -> dict:
    return {"rows": [{"id": i} for i in range(1000)]}

@asset
def clean_events(raw_events: dict) -> list:
    return [r for r in raw_events["rows"] if r["id"] is not None]

@asset
def fact_events(clean_events: list) -> int:
    # INSERT INTO warehouse.fact_events ...
    return len(clean_events)

@asset
def notify_slack(fact_events: int) -> str:
    return f"Loaded {fact_events} rows"

defs = Definitions(assets=[raw_events, clean_events, fact_events, notify_slack])
Enter fullscreen mode Exit fullscreen mode
# Prefect — flow-first, Pythonic
from prefect import flow, task

@task
def fetch_api():
    return {"rows": [{"id": i} for i in range(1000)]}

@task
def validate(payload):
    return [r for r in payload["rows"] if r["id"] is not None]

@task
def load_warehouse(rows):
    return len(rows)

@task
def notify(n):
    return "ok"

@flow(name="etl_pipeline")
def etl_pipeline():
    payload = fetch_api()
    clean = validate(payload)
    n = load_warehouse(clean)
    return notify(n)
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Airflow wraps each step as a @task; the DAG's schedule="@daily" is owned by the scheduler; start_date plus catchup=False controls the first-run semantics.
  2. Dagster flips the mental model: each step is an @asset, the dependency graph is inferred from function arguments (clean_events(raw_events) implies clean_events depends on raw_events), and the result of each asset is a materialised table you can browse in the catalog.
  3. Prefect sits closest to plain Python: the @flow is a regular function, @task decorators add retries + observability, and execution is driven by the runtime returning values like normal Python calls.
  4. The three runtimes produce the same business outcome — but the mental model of what you are building is different in each case.
  5. The choice between them is rarely about whether they can run the pipeline; it is about which mental model your team finds natural and which platform features (catalog, partitioning, sub-flows) you need on day 90.

Output (the run-summary view in each tool).

tool shape the entity you click what shows up in the UI
Airflow DAG of tasks a DAG Run per-task logs, retry buttons, Gantt
Dagster asset graph an asset materialisations, asset checks, lineage
Prefect flow run a flow + sub-flow task states, sub-flow timeline, artifacts

Rule of thumb: the shape the tool surfaces is the shape your team will end up thinking in. Pick the shape first, then evaluate ecosystem and hosting second.

airflow vs dagster and dagster vs prefect — the four senior signals

Signal 1 — opinionated tool choice with a one-sentence reason. Senior orchestration engineers do not say "all three are good"; they say "I run Airflow for our cron-style ETL because the operator library is unbeatable; I run Dagster on the data-product graph because the asset model + catalog give me lineage for free; I'd reach for Prefect on ML / API-heavy workflows that need dynamic mapping and sub-flows."

Signal 2 — anatomy over feature lists. Junior engineers list features. Seniors describe the runtime — "Airflow has a scheduler, an executor (Celery / Kubernetes / Local), a webserver, a metadata DB (Postgres) — when the scheduler dies, runs stop being scheduled but in-flight tasks continue on the executor; recovery is metadata-DB-state-driven" — because anatomy is what predicts production behaviour.

Signal 3 — migration-cost awareness. Senior engineers know that moving from a dag scheduler to an asset-first tool is not a rewrite; it is a re-modelling. Junior engineers underestimate the cost of re-teaching the team to think in assets vs tasks.

Signal 4 — partitioning + backfill reasoning. When a backfill is asked for, senior engineers describe the partition strategy (daily, hourly, static_partitioned), the concurrency cap, and the cost; junior engineers describe the wall-clock estimate.

SQL
Topic — etl
ETL pipeline drills

Practice →

SQL
Topic — data-validation
Data validation practice

Practice →

Solution Using a 5-dimension decision matrix

Code.

-- One canonical decision matrix — every row maps one dimension to all three tools.
CREATE TABLE orchestrator_decision_matrix AS
SELECT * FROM (VALUES
    ('maturity_ecosystem', 'massive',                      'growing',                     'growing'),
    ('asset_awareness',    'datasets (lightweight)',       'asset-first (native)',        'artifacts (lightweight)'),
    ('dynamic_flows',      'TaskFlow API + dynamic_map',   'DynamicOut + partitioned asset','native (sub-flows + .map)'),
    ('hosting_options',    'MWAA + Astronomer + Composer', 'Dagster Cloud + self-host',   'Prefect Cloud + OSS server'),
    ('best_for',           'cron-style ETL + large teams', 'data product graph + lineage','Pythonic ML / API workflows')
) AS t(dimension, airflow, dagster, prefect);
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

dimension airflow dagster prefect
maturity_ecosystem massive growing growing
asset_awareness datasets (lightweight) asset-first (native) artifacts (lightweight)
dynamic_flows TaskFlow API + dynamic_map DynamicOut + partitioned asset native (sub-flows + .map)
hosting_options MWAA + Astronomer + Composer Dagster Cloud + self-host Prefect Cloud + OSS server
best_for cron-style ETL + large teams data product graph + lineage Pythonic ML / API workflows
  1. Row 1 — maturity_ecosystem — Airflow has the deepest plugin library (1000+ providers) and the most managed-service options; Dagster and Prefect are smaller but professional.
  2. Row 2 — asset_awareness — Dagster's software defined assets are first-class; Airflow Datasets and Prefect artifacts are lighter, secondary signals.
  3. Row 3 — dynamic_flows — Prefect's sub-flows + .map make dynamic patterns idiomatic; Airflow's dynamic_task_mapping works but is bolted on; Dagster typically prefers asset-shape over dynamic graphs.
  4. Row 4 — hosting_options — all three are first-class on hosted SaaS and self-hosted; nobody is locked out by deployment shape.
  5. Row 5 — best_for is the synthesis row; pick by team shape, not by feature count.

Output.

dimension winner tie-breaker
maturity_ecosystem Airflow operator count + managed services
asset_awareness Dagster catalog, lineage, asset checks
dynamic_flows Prefect sub-flow + .map ergonomics
hosting_options All three tie
best_for depends team mental model

Why this works — concept by concept:

  • Decision matrix — turns the vague "which tool is best?" into a one-row lookup; interviewers love a candidate who has internalised the tradeoffs as data, not opinion.
  • Per-dimension winner — admits there is no universal winner; the senior signal is naming a winner per dimension, not crowning one tool overall.
  • Tie-breaker column — surfaces the real differentiator on each row; the actual feature that closes the deal.
  • "depends" is allowed — the synthesis row admits ambiguity rather than over-claiming; this is the senior signal.
  • CostO(1) to read the matrix; the actual evaluation cost is meetings + a 1-month spike to model two example pipelines in your top-two candidates.

2. Apache Airflow anatomy — DAGs, operators, scheduler, executor, metadata DB

apache airflow — the five-piece runtime every interview tests

Apache Airflow is the original task-first orchestrator and still the largest installed base in 2026. The runtime breaks into five pieces — scheduler, executor, webserver, metadata DB (Postgres / MySQL), and worker processes (when using Celery / Kubernetes) — and the job of a senior Airflow engineer is to understand how each piece fails independently and what the recovery story looks like. Every airflow vs dagster interview eventually circles back to "draw the Airflow runtime on the board"; if you cannot, you do not understand the trade you're making against Dagster's daemon + asset model.

The five runtime pieces and what each one does.

  • scheduler — long-running Python process that reads the metadata DB, decides which DAG runs to create and which TaskInstances to enqueue, and pushes them onto the executor's queue. When this dies, in-flight tasks keep running but new ones stop being scheduled.
  • executor — pluggable backend that actually runs tasks. The common ones: LocalExecutor (in-process; dev), CeleryExecutor (worker pool + Redis / RabbitMQ broker; classical prod), KubernetesExecutor (pod-per-task; cloud-native), CeleryKubernetesExecutor (hybrid). Choice of executor is the single biggest production decision in Airflow.
  • webserver — Flask app that renders the DAG, Graph, Gantt, and TaskInstance views; can die without stopping execution (purely UI).
  • metadata DB — Postgres or MySQL holding DagRun, TaskInstance, XCom, Variable, Connection rows. This is the system of record; if it dies, the whole platform stops.
  • worker — only relevant for Celery / Kubernetes executors; the actual Python process running the task code, typically inside a Docker container.

The DAG — the developer-facing primitive.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from datetime import datetime

with DAG(
    dag_id="daily_etl",
    start_date=datetime(2026, 5, 1),
    schedule="@daily",
    catchup=False,
    default_args={"retries": 3, "retry_delay": 300},
) as dag:
    sense_source = S3KeySensor(
        task_id="sense_source",
        bucket_key="s3://raw/{{ ds }}/_SUCCESS",
        timeout=60 * 60,
    )
    extract   = PythonOperator(task_id="extract",       python_callable=lambda: ...)
    transform = PythonOperator(task_id="transform",     python_callable=lambda: ...)
    quality   = PythonOperator(task_id="quality_check", python_callable=lambda: ...)
    publish   = PythonOperator(task_id="publish",       python_callable=lambda: ...)

    sense_source >> extract >> transform >> quality >> publish
Enter fullscreen mode Exit fullscreen mode
  • DAG — directed acyclic graph; the unit of scheduling.
  • start_date + catchup=False — the canonical "start fresh from now" pattern; without catchup=False Airflow will backfill every missed run since start_date, which has burned many junior engineers.
  • schedule="@daily" — cron alias; @hourly, @weekly, or a raw cron string also work.
  • >> operator — sets dependencies; A >> B reads A then B.
  • S3KeySensorsensor operator; an Airflow primitive that blocks until an external condition is satisfied.

Why the executor choice dominates the production decision.

  • LocalExecutor — single-machine, no scaling; fine for dev, never prod.
  • CeleryExecutor — needs Redis or RabbitMQ as a broker + 2+ worker processes; classical Airflow ops; mature but heavyweight (one more cluster to monitor).
  • KubernetesExecutor — one pod per task; no idle workers when nothing is running; great for variable workloads; needs k8s expertise on the team.
  • CeleryKubernetesExecutor — long-running k8s pods for hot tasks + Celery workers for everything else; the hybrid most large shops settle on.
  • Managed servicesMWAA (AWS), Astronomer, Cloud Composer (GCP) all hide the executor pick; you choose them when you don't want to run the runtime yourself.

Worked example — write a daily Airflow DAG with a sensor, retries, and an SLA

Detailed explanation. Real interviews ask you to write a minimal but production-shaped DAG. The shape every reviewer checks: start_date plus catchup=False, a sensor as the first gate, per-task retries, and a top-level sla on the slowest task.

Question. Write a daily daily_etl DAG with five tasks (sense_source → extract → transform → quality_check → publish), default retries of 3, a 30-minute SLA on transform, and catchup=False. Use the TaskFlow API for clarity.

Input. An S3 bucket where the upstream team drops a s3://raw/<date>/_SUCCESS marker each day around 02:30 UTC; the warehouse target is a Postgres fact_orders table.

Code.

from airflow.decorators import dag, task
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from datetime import datetime, timedelta

@dag(
    dag_id="daily_etl",
    start_date=datetime(2026, 5, 1),
    schedule="@daily",
    catchup=False,
    default_args={"retries": 3, "retry_delay": timedelta(minutes=5)},
    tags=["etl", "daily"],
)
def daily_etl():
    sense = S3KeySensor(
        task_id="sense_source",
        bucket_key="s3://raw/{{ ds }}/_SUCCESS",
        timeout=60 * 60,
        poke_interval=60,
    )

    @task
    def extract(**ctx) -> int:
        # read from S3 prefix s3://raw/{{ ds }}/
        return 1000  # rows pulled

    @task(sla=timedelta(minutes=30))
    def transform(n_rows: int) -> int:
        # validate, normalise, enrich
        return n_rows

    @task
    def quality_check(n_rows: int) -> int:
        assert n_rows > 0, "no rows to publish"
        return n_rows

    @task
    def publish(n_rows: int) -> str:
        # write into warehouse.fact_orders
        return f"published {n_rows} rows"

    sense >> publish(quality_check(transform(extract())))

daily_etl()
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. @dag(...) registers the DAG with dag_id="daily_etl"; catchup=False prevents the dreaded "fill 200 days at once" surprise.
  2. default_args={"retries": 3, "retry_delay": ...} applies retries to every task without repeating yourself.
  3. S3KeySensor is the first gate; it blocks until the _SUCCESS marker is present, capped at one hour.
  4. @task(sla=timedelta(minutes=30)) decorates transform with an SLA; Airflow records SLA misses in the metadata DB and can fire sla_miss_callback.
  5. Dependency chain sense >> publish(quality_check(transform(extract()))) is one of the canonical TaskFlow shapes — the outer >> wires the sensor into the rest of the call chain.

Output (the DAG Run row in the metadata DB after a successful run).

dag_id run_id state start end sla_missed
daily_etl scheduled__2026-05-29 success 02:30 UTC 02:48 UTC false

Rule of thumb: every production DAG ships with start_date + catchup=False + per-task retries + at least one SLA + a sensor as the first gate. Senior reviewers will block the PR if any one is missing.

airflow alternatives — when to keep Airflow vs when to migrate

  • Keep Airflow when — you have a large operator library you already depend on (S3, BigQuery, Snowflake, dbt, Spark, Databricks, etc.); your team thinks in tasks not assets; you run on MWAA / Astronomer / Composer.
  • Consider Dagster when — your team is a data-product team that thinks in tables / models rather than jobs; you want a built-in asset catalog, freshness checks, and column-level lineage.
  • Consider Prefect when — your team is ML / API-heavy, lives in Python, and needs dynamic flows + sub-flows as first-class primitives.
  • The migration cost — re-modelling 50 DAGs as 50 asset graphs (or 50 flows) is a 1-2 quarter project for a team of 2-3 engineers; do not treat it as a script port.
  • The hybrid pattern — many teams run Airflow for legacy ETL plus Dagster for the data-product graph plus Prefect for ML; one orchestrator does not always have to win.

SQL
Topic — etl
Airflow / ETL pipeline drills

Practice →

Python
Language — python
Python pipeline practice

Practice →

Solution Using a sensor + TaskFlow + SLA + KubernetesExecutor production pattern

Code.

# Production-shaped Airflow DAG for daily ETL.
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.models.baseoperator import chain
from datetime import datetime, timedelta

DEFAULT_ARGS = {
    "owner": "data-platform",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "execution_timeout": timedelta(hours=2),
}

@dag(
    dag_id="fact_orders_daily",
    start_date=datetime(2026, 5, 1),
    schedule="@daily",
    catchup=False,
    default_args=DEFAULT_ARGS,
    tags=["fact_orders", "warehouse"],
)
def fact_orders_daily():
    sense = S3KeySensor(
        task_id="sense_source",
        bucket_key="s3://raw/orders/{{ ds }}/_SUCCESS",
        timeout=60 * 60,
        poke_interval=60,
        mode="reschedule",   # frees the worker slot while waiting
    )

    @task
    def extract(**ctx) -> int:
        return 1_000_000

    @task(sla=timedelta(minutes=30))
    def transform(n: int) -> int:
        return n

    @task
    def quality_check(n: int) -> int:
        assert n > 0
        return n

    @task
    def publish(n: int) -> str:
        return f"published {n} rows to warehouse.fact_orders"

    chain(sense, publish(quality_check(transform(extract()))))

fact_orders_daily()
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

component choice why
executor KubernetesExecutor pod-per-task; no idle workers
metadata DB Postgres (managed) system of record
sensor mode reschedule frees worker slot during long wait
retries 3 absorbs transient API failures
sla 30 min on transform gates the slow step
catchup false avoids 200-day backfill surprise
  1. The KubernetesExecutor choice means each task spawns its own pod; the scheduler enqueues k8s pod creation, not a Celery task.
  2. S3KeySensor(mode="reschedule") flips the sensor from "hold the worker for an hour" to "wake up every minute and re-check"; the saved worker slot is critical at scale.
  3. default_args apply across every task; no per-task duplication of retries / retry_delay.
  4. The SLA on transform gates the slowest step; SLA misses fire sla_miss_callback (usually Slack + PagerDuty wiring).
  5. chain(...) is the explicit dependency wiring; >> is equivalent but chain(...) is clearer for multi-step pipelines.

Output.

dag_id executor state duration sla_miss
fact_orders_daily KubernetesExecutor success 28m false

Why this works — concept by concept:

  • Five-piece runtime literacy — naming the scheduler, executor, webserver, metadata DB, and workers separately is the senior signal; juniors blur them into "Airflow".
  • Sensor in reschedule mode — the canonical scale-aware sensor pattern; without it, hour-long sensors block worker slots and pin the cluster.
  • SLA gating — the SLA goes on the slowest step (transform), not the whole DAG; alerting on the bottleneck is the production-safe pattern.
  • catchup=False — the most-burned beginner pitfall; ship every new DAG with it explicit, not implicit.
  • Cost — for a 1M-row daily load, ~$0.10-$1 per run on managed Airflow + warehouse compute; the runtime cost of orchestration is dominated by the work itself, not the scheduler.

3. Dagster anatomy — software-defined assets, IO managers, the data catalog

dagstersoftware defined assets and the asset-first mental model

Dagster flips the orchestrator mental model on its head. Instead of "what jobs do I need to run, and when?", it asks "what data assets do I produce, and what produces them?". Software defined assets (SDAs) are the core primitive: a Python function decorated with @asset declares both the dataset it produces and the upstream datasets it depends on (inferred from function arguments). Dagster then derives the orchestration graph from the asset graph — schedules, sensors, retries, and partitioning are wired onto the asset, not onto a task. This is the single biggest dagster vs prefect and dagster vs airflow differentiator.

The four runtime pieces.

  • dagster-daemon — the long-running process that runs schedules, sensors, and the run queue; the closest analogue to Airflow's scheduler.
  • dagster-webserver (formerly Dagit) — React UI for the asset graph, asset catalog, lineage, materialisations, asset checks, and run history.
  • run launcher — pluggable; choices include DefaultRunLauncher (in-process), K8sRunLauncher (one job per run), DockerRunLauncher (one container per run).
  • IO Manager — Dagster-specific: a pluggable layer that handles how asset outputs are persisted (and how downstream assets load them); picks include s3_io_manager, snowflake_io_manager, postgres_io_manager, custom.

The SDA — the developer-facing primitive.

from dagster import asset, AssetExecutionContext, MetadataValue

@asset(
    description="Raw orders pulled from the OLTP source",
    group_name="orders",
)
def raw_orders(context: AssetExecutionContext) -> list[dict]:
    rows = [{"id": i, "amount": 10 * i} for i in range(1, 1001)]
    context.add_output_metadata({
        "row_count": MetadataValue.int(len(rows)),
        "preview":   MetadataValue.json(rows[:5]),
    })
    return rows

@asset(group_name="orders")
def clean_orders(raw_orders: list[dict]) -> list[dict]:
    return [r for r in raw_orders if r["amount"] >= 0]

@asset(group_name="marts")
def daily_sales_mart(clean_orders: list[dict]) -> int:
    return sum(r["amount"] for r in clean_orders)
Enter fullscreen mode Exit fullscreen mode
  • @asset — declares both the dataset and the dependency edges; clean_orders(raw_orders) infers the edge raw_orders → clean_orders.
  • group_name — partitions the asset graph in the UI; great for separating orders, customers, marts.
  • context.add_output_metadata(...) — attaches row counts, previews, and quality signals to each materialisation; this is what powers the asset catalog UI.
  • No DAG file — the asset graph is the DAG; you do not write a separate scheduling artifact.

Why software defined assets change the conversation.

  • The catalog is automatic — every asset is a row in the data catalog; you get freshness, lineage, ownership, and column-level metadata for free.
  • Lineage is structural — you can click any asset and walk its upstreams and downstreams in the UI; tools like Atlan / DataHub require you to wire lineage manually, Dagster derives it.
  • Asset checks are first-class@asset_check lets you attach data quality assertions directly to the asset, not as a separate Airflow task; failed checks fire alerts and gate downstream materialisation.
  • Partitioned assets@asset(partitions_def=DailyPartitionsDefinition(...)) declares the partition shape; backfills become "materialise these 30 partitions" rather than "trigger this DAG 30 times".
  • Schedules + sensors wrap assets@schedule and @sensor create runs that materialise named assets, not separate tasks; the asset is the unit, not the job.

Worked example — write the same daily ETL as a Dagster asset graph with partitions and an IO manager

Detailed explanation. Real interviews ask you to write a daily-partitioned asset graph with one IO manager and one asset check. The shape every reviewer checks: DailyPartitionsDefinition, one asset per stage, an IO manager that persists output, and an @asset_check on the mart.

Question. Write a four-asset daily-partitioned pipeline (raw_orders → clean_orders → daily_sales_mart → exec_dashboard) with a DailyPartitionsDefinition starting 2026-05-01, an @asset_check ensuring daily_sales_mart >= 0, and an IO manager that persists outputs to S3.

Input. A daily window of raw_orders per partition; the pipeline materialises four assets per partition and the asset check fires after daily_sales_mart.

Code.

from dagster import (
    asset, asset_check, AssetCheckResult, AssetCheckSeverity,
    DailyPartitionsDefinition, Definitions, define_asset_job,
    ScheduleDefinition, MetadataValue, AssetExecutionContext,
)
from dagster_aws.s3 import s3_pickle_io_manager, s3_resource

daily = DailyPartitionsDefinition(start_date="2026-05-01")

@asset(partitions_def=daily, group_name="orders")
def raw_orders(context: AssetExecutionContext) -> list[dict]:
    day = context.partition_key
    return [{"id": i, "amount": 10 * i, "day": day} for i in range(1, 1001)]

@asset(partitions_def=daily, group_name="orders")
def clean_orders(raw_orders: list[dict]) -> list[dict]:
    return [r for r in raw_orders if r["amount"] >= 0]

@asset(partitions_def=daily, group_name="marts")
def daily_sales_mart(clean_orders: list[dict]) -> int:
    return sum(r["amount"] for r in clean_orders)

@asset(partitions_def=daily, group_name="marts")
def exec_dashboard(daily_sales_mart: int) -> dict:
    return {"total": daily_sales_mart, "status": "ok"}

@asset_check(asset="daily_sales_mart")
def mart_non_negative(daily_sales_mart: int) -> AssetCheckResult:
    return AssetCheckResult(
        passed=daily_sales_mart >= 0,
        severity=AssetCheckSeverity.ERROR,
        metadata={"total": MetadataValue.int(daily_sales_mart)},
    )

daily_job = define_asset_job("daily_job", selection="*")
daily_sched = ScheduleDefinition(job=daily_job, cron_schedule="@daily")

defs = Definitions(
    assets=[raw_orders, clean_orders, daily_sales_mart, exec_dashboard],
    asset_checks=[mart_non_negative],
    schedules=[daily_sched],
    resources={"io_manager": s3_pickle_io_manager.configured({"s3_bucket": "dagster-io"}),
               "s3":         s3_resource},
)
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. DailyPartitionsDefinition(start_date="2026-05-01") declares the partition shape; every asset that uses it has one materialisation per day.
  2. Each asset declares its dependencies via function arguments — clean_orders(raw_orders) implies the edge.
  3. @asset_check(asset="daily_sales_mart") attaches a quality assertion to the mart asset; failed checks fire severities (WARN, ERROR).
  4. define_asset_job("daily_job", selection="*") defines a job that materialises every asset; ScheduleDefinition(... cron_schedule="@daily") fires it daily.
  5. Definitions(..., resources={"io_manager": ...}) wires the S3 IO manager so every asset's output is persisted to S3 without per-asset boilerplate.

Output (materialisation summary in the asset catalog).

asset partition status row_count bytes_io
raw_orders 2026-05-29 materialised 1000 23 KB
clean_orders 2026-05-29 materialised 1000 23 KB
daily_sales_mart 2026-05-29 materialised 1 8 B
exec_dashboard 2026-05-29 materialised 1 24 B

Rule of thumb: every Dagster pipeline ships with a partitions_def, an IO manager wired at the Definitions level (never per-asset), and at least one @asset_check on the leaf mart. Senior reviewers will block the PR if any one is missing.

software defined assets vs Airflow tasks — the mental-model translation

  • Airflow task = "do this work"; success = the function ran.
  • Dagster asset = "produce this dataset"; success = the dataset exists and is fresh.
  • Airflow XCom = task-to-task value passing; small payloads only.
  • Dagster IO manager = asset-to-asset value passing; persisted to S3 / Snowflake / Postgres; arbitrary size.
  • Airflow DagRun = one run of one DAG; tasks share a Run ID.
  • Dagster materialisation = one production of one asset; per-asset history.
  • Migration heuristic — every Airflow task that produces a table becomes a Dagster asset; every Airflow task that does sensing stays as a Dagster sensor; every Airflow operator that orchestrates without producing data becomes a Dagster op (the lower-level primitive).

airflow vs dagster — the day-90 differences

  • Day 1 — Airflow is faster to spin up if you already know it; Dagster has a steeper learning curve (assets + IO managers + partitions all at once).
  • Day 30 — Dagster's asset catalog is paying for itself; you can answer "is the dashboard fresh?" in one click instead of hopping across three Airflow DAGs.
  • Day 90 — Dagster's asset_check story has replaced a half-dozen BashOperator lines you used to write in Airflow; the asset catalog has become the team's single source of truth on data freshness; lineage in the UI has eliminated a 30-minute weekly "where does this column come from?" exercise.
  • Day 365 — your data team is now thinking in tables, not jobs; new hires onboard via the asset catalog, not via DAG-file walkthroughs; the migration cost has paid off — but only if the team committed to the model shift.

SQL
Topic — data-validation
Asset / data-validation drills

Practice →

SQL
Topic — aggregation
Aggregation pipeline patterns

Practice →

Solution Using a partitioned asset graph + IO manager + asset checks

Code.

# Production-shaped Dagster pipeline; 4 assets, 1 check, 1 schedule, S3 IO.
from dagster import (
    asset, asset_check, AssetCheckResult, AssetCheckSeverity,
    DailyPartitionsDefinition, Definitions, define_asset_job,
    ScheduleDefinition, MetadataValue, AssetExecutionContext,
)
from dagster_aws.s3 import s3_pickle_io_manager, s3_resource
from dagster_snowflake import snowflake_io_manager

daily = DailyPartitionsDefinition(start_date="2026-05-01")

@asset(partitions_def=daily, group_name="orders", io_manager_key="s3_io")
def raw_orders(context: AssetExecutionContext) -> list[dict]:
    return [{"id": i, "amount": 10 * i, "day": context.partition_key} for i in range(1, 1001)]

@asset(partitions_def=daily, group_name="orders", io_manager_key="s3_io")
def clean_orders(raw_orders: list[dict]) -> list[dict]:
    return [r for r in raw_orders if r["amount"] >= 0]

@asset(partitions_def=daily, group_name="marts", io_manager_key="sf_io")
def daily_sales_mart(clean_orders: list[dict]) -> int:
    return sum(r["amount"] for r in clean_orders)

@asset_check(asset="daily_sales_mart")
def mart_non_negative(daily_sales_mart: int) -> AssetCheckResult:
    return AssetCheckResult(
        passed=daily_sales_mart >= 0,
        severity=AssetCheckSeverity.ERROR,
        metadata={"total": MetadataValue.int(daily_sales_mart)},
    )

defs = Definitions(
    assets=[raw_orders, clean_orders, daily_sales_mart],
    asset_checks=[mart_non_negative],
    schedules=[ScheduleDefinition(
        job=define_asset_job("daily_job", selection="*"),
        cron_schedule="@daily",
    )],
    resources={
        "s3_io": s3_pickle_io_manager.configured({"s3_bucket": "dagster-io"}),
        "sf_io": snowflake_io_manager.configured({"database": "ANALYTICS"}),
        "s3":    s3_resource,
    },
)
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

asset partition io_manager persisted_to check
raw_orders 2026-05-29 s3_io s3://dagster-io/raw_orders/2026-05-29
clean_orders 2026-05-29 s3_io s3://dagster-io/clean_orders/2026-05-29
daily_sales_mart 2026-05-29 sf_io ANALYTICS.MARTS.daily_sales_mart mart_non_negative (PASS)
  1. The partition key 2026-05-29 flows through every asset; one materialisation per day, per asset.
  2. io_manager_key="s3_io" on the raw and clean stages persists pickle blobs to S3; io_manager_key="sf_io" on the mart writes a Snowflake table.
  3. The mart_non_negative check runs after daily_sales_mart materialises; a False result fires AssetCheckSeverity.ERROR and blocks downstream materialisation.
  4. The ScheduleDefinition fires daily; every fire materialises all three assets in dependency order; partition gets stamped automatically.
  5. Definitions(...) is the single registration point — no Variable, no Connection, no dags_folder to manage.

Output.

run_id partition assets_materialised checks_passed wall_clock
daily_job_2026-05-29 2026-05-29 3 1 2m 14s

Why this works — concept by concept:

  • Software-defined assets — the graph is implied by function arguments; no separate DAG file, no manual edge wiring; the data product is the orchestration unit.
  • IO manager separation — persistence is configured at the Definitions level; one swap from s3_pickle_io_manager to snowflake_io_manager retargets every asset without touching the asset code.
  • Asset checks — quality assertions live next to the asset; they fire automatically post-materialisation and gate downstream runs.
  • Partitions def — backfills become "materialise this set of partitions"; the daily / hourly / static_partitioned options cover ~95% of real pipelines.
  • Cost — Dagster Cloud Pro for a small team is ~$50-$200 / engineer / month; self-hosted is free but requires running the daemon + webserver yourself; the asset catalog UI is the feature most teams say pays for the migration.

4. Prefect anatomy — flows, tasks, work pools, deployments

prefect — flows, tasks, work pools, and the Pythonic mental model

Prefect is the most "Python-native" of the three orchestrators in 2026: a flow is a function decorated with @flow, a task is a function decorated with @task, and running a flow is running a normal Python function that the Prefect runtime decorates with retries, state, logging, and observability. The shift from Prefect 1.x to Prefect 2.x / Prefect 3.x introduced the work pool + worker + deployment triad that powers Prefect's hybrid SaaS + on-prem story. Where Airflow makes you build a DAG and Dagster makes you declare assets, Prefect lets you write code that looks like ordinary Python and gain orchestration as a side effect.

The four runtime pieces.

  • Prefect Server / Prefect Cloud — the orchestrator; tracks flow runs, task runs, schedules, and deployments; stores state in a Postgres / SQLite metadata DB.
  • Work Pool — a typed pool that workers pull from; types include process, docker, kubernetes, ecs, cloud-run; the work pool decouples scheduling from execution.
  • Worker — long-running process (or container) that polls a work pool and runs flows; you can run multiple worker types in parallel.
  • Deployment — a versioned, schedule-bound packaging of a flow with its parameters, work pool, and storage; the unit of "this flow runs in production".

The flow + task — the developer-facing primitive.

from prefect import flow, task
from prefect.logging import get_run_logger

@task(retries=3, retry_delay_seconds=30)
def fetch_api() -> dict:
    logger = get_run_logger()
    logger.info("Hitting API")
    return {"rows": [{"id": i} for i in range(1000)]}

@task(retries=3, retry_delay_seconds=30)
def validate(payload: dict) -> list:
    return [r for r in payload["rows"] if r["id"] is not None]

@task
def load_warehouse(rows: list) -> int:
    return len(rows)

@task
def notify(n: int) -> str:
    return f"Loaded {n} rows"

@flow(name="etl_pipeline", retries=1, log_prints=True)
def etl_pipeline() -> str:
    payload = fetch_api()
    clean   = validate(payload)
    n       = load_warehouse(clean)
    return notify(n)

if __name__ == "__main__":
    etl_pipeline()
Enter fullscreen mode Exit fullscreen mode
  • @flow — turns a Python function into a Prefect flow; gets retries, state, and a UI page in Prefect Cloud / Server.
  • @task — turns a Python function into a Prefect task; gets per-call retries, caching, and log streaming.
  • get_run_logger() — pulls a logger that pipes into Prefect's per-run log view.
  • Imperative style — execution flows like normal Python; no >> dependency wiring; the runtime infers the graph from the order of calls and the data flow.

Why work pools + deployments matter.

  • Decouples what to run from *where to run it* — the same flow can deploy to a process pool in dev, a kubernetes pool in prod, and an ecs pool on a cost-optimised account.
  • Workers are stateless — they pull work from the pool, run it, report status; you scale workers independently of the orchestrator.
  • Deployments are versioned — each prefect deploy produces a new deployment row; you can pin schedules, parameters, and storage location per version.
  • Hybrid execution — Prefect Cloud is the orchestrator, but the workers run in your VPC, so the code and data never leave your account; this is the architecture most regulated industries pick.
  • Sub-flows — calling a @flow inside another @flow creates a sub-flow run; the parent flow's UI shows it as a nested timeline, and the sub-flow has its own state, retries, and observability.

Worked example — write a Prefect flow with a sub-flow, retries, and a work-pool deployment

Detailed explanation. Real interviews ask you to write a parent flow + sub-flow pattern with retries on the inner steps and a deployment to a named work pool. The shape every reviewer checks: a @flow for the orchestrator, a @flow for the inner unit, @task decorators with retries, and a Deployment definition.

Question. Write a parent etl_pipeline flow that (1) fetches from an API, (2) validates, (3) loads the warehouse, (4) invokes a sub-flow refresh_marts to refresh two downstream marts, and (5) notifies Slack. The sub-flow must have its own retries; the parent must deploy to a default-pool work pool with a daily schedule.

Input. An API endpoint, two downstream marts (sales_mart, customer_mart), and a Slack webhook.

Code.

from prefect import flow, task
from prefect.client.schemas.schedules import CronSchedule
from prefect.deployments import Deployment

@task(retries=3, retry_delay_seconds=30)
def fetch_api() -> dict:
    return {"rows": [{"id": i, "amount": 10 * i} for i in range(1, 1001)]}

@task(retries=3)
def validate(payload: dict) -> list:
    return [r for r in payload["rows"] if r["amount"] >= 0]

@task
def load_warehouse(rows: list) -> int:
    return len(rows)

@task
def refresh_mart(name: str, n_rows: int) -> str:
    return f"{name}: {n_rows} rows"

@flow(name="refresh_marts", retries=2)
def refresh_marts(n_rows: int) -> list[str]:
    sales    = refresh_mart("sales_mart",    n_rows)
    customer = refresh_mart("customer_mart", n_rows)
    return [sales, customer]

@task
def notify(message: str) -> str:
    return "ok"

@flow(name="etl_pipeline", retries=1, log_prints=True)
def etl_pipeline() -> str:
    payload = fetch_api()
    clean   = validate(payload)
    n       = load_warehouse(clean)
    marts   = refresh_marts(n)
    return notify(f"Loaded {n} rows, {len(marts)} marts refreshed")

if __name__ == "__main__":
    Deployment.build_from_flow(
        flow=etl_pipeline,
        name="etl_pipeline_daily",
        work_pool_name="default-pool",
        schedules=[CronSchedule(cron="0 2 * * *", timezone="UTC")],
    ).apply()
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The four @task decorators add per-call retries; the runtime captures input, output, and exception state for each task run.
  2. The inner refresh_marts is itself a @flow; calling it from etl_pipeline produces a sub-flow run visible in the UI.
  3. The sub-flow has its own retries=2 independent of the parent's retries=1; this is the canonical "retry the whole sub-tree" pattern.
  4. Deployment.build_from_flow(...) packages the flow with its work pool and schedule; apply() persists the deployment row in Prefect Cloud / Server.
  5. At 02:00 UTC every day, the scheduler creates a flow run; a worker on default-pool picks it up and executes.

Output (the flow run summary in the Prefect UI).

flow_run_id flow state duration sub_flows
7f3a... etl_pipeline Completed 3m 12s 1 (refresh_marts)

Rule of thumb: every production Prefect deployment ships with a named flow, retries on the slow tasks, a sub-flow for any logical group of work that deserves its own retry boundary, and a deployment pinned to a work pool — not raw prefect.run() calls.

prefect vs airflow — the day-to-day differences

  • Authoring — Prefect feels like Python; Airflow feels like a config-as-code declaration of a DAG.
  • Dynamic flows — Prefect's .map() and sub-flows are first-class; Airflow's dynamic_task_mapping is bolted on and harder to reason about at scale.
  • Hybrid execution — Prefect Cloud + on-prem workers is the canonical "control plane in cloud, data plane in our VPC" pattern; Airflow's managed services mostly run the whole stack in the vendor's account.
  • Deployments are versioned — Prefect deployments are first-class versioned objects; Airflow's "DAG file in the dags_folder" is older-school.
  • Failure-first design — every Prefect task has retries, caching, state, and timeout as decorator args; Airflow needs more boilerplate per task.

dagster vs prefect — the asset axis vs the flow axis

  • Dagster thinks in tables (assets); Prefect thinks in functions (flows + tasks).
  • Dagster's catalog is the single biggest "I didn't know how much I needed this" feature; Prefect's UI is task-and-flow shaped, not asset-shaped.
  • Prefect's sub-flows + .map are the single biggest "I didn't know how much I needed this" feature on the dynamic-pipeline axis.
  • Choose Dagster when your team is producing data products and the catalog matters.
  • Choose Prefect when your team is producing dynamic workflows (ML training pipelines, customer-by-customer API loops, ad-hoc backfills) and Pythonic ergonomics matter more than lineage.

Python
Language — python
Python flow practice

Practice →

SQL
Topic — etl
ETL workflow drills

Practice →

Solution Using a flow + sub-flow + work-pool deployment pattern

Code.

# Production-shaped Prefect deployment; parent flow + sub-flow + scheduled work pool.
from prefect import flow, task
from prefect.client.schemas.schedules import CronSchedule
from prefect.deployments import Deployment
from prefect.logging import get_run_logger

@task(retries=3, retry_delay_seconds=30, log_prints=True)
def fetch_api() -> dict:
    return {"rows": [{"id": i, "amount": 10 * i} for i in range(1, 10_001)]}

@task(retries=3, retry_delay_seconds=30)
def validate(payload: dict) -> list:
    rows = [r for r in payload["rows"] if r["amount"] >= 0]
    assert rows, "no rows after validation"
    return rows

@task(retries=3, retry_delay_seconds=60)
def load_warehouse(rows: list) -> int:
    return len(rows)

@task(retries=2)
def refresh_one_mart(name: str, n_rows: int) -> str:
    return f"{name}: refreshed with {n_rows} rows"

@flow(name="refresh_marts", retries=2, log_prints=True)
def refresh_marts(n_rows: int) -> list[str]:
    return [refresh_one_mart(n, n_rows) for n in ("sales_mart", "customer_mart", "exec_mart")]

@task
def notify_slack(message: str) -> str:
    return "ok"

@flow(name="etl_pipeline", retries=1, log_prints=True, timeout_seconds=60 * 60)
def etl_pipeline() -> str:
    logger = get_run_logger()
    payload = fetch_api()
    clean   = validate(payload)
    n       = load_warehouse(clean)
    marts   = refresh_marts(n)
    logger.info(f"Loaded {n} rows; refreshed {len(marts)} marts")
    return notify_slack(f"etl_pipeline OK: {n} rows, {len(marts)} marts")

if __name__ == "__main__":
    Deployment.build_from_flow(
        flow=etl_pipeline,
        name="etl_pipeline_daily",
        work_pool_name="default-pool",
        schedules=[CronSchedule(cron="0 2 * * *", timezone="UTC")],
        tags=["etl", "daily"],
    ).apply()
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

component choice why
parent flow etl_pipeline (retries=1) top-level orchestration unit
sub-flow refresh_marts (retries=2) independent retry boundary
tasks retries=3 on slow + lossy API + warehouse calls
work pool default-pool decouples scheduling from execution
schedule cron "0 2 * * *" UTC nightly batch
timeout 60 min on parent hard cap
  1. The four @tasks wrap discrete units of work; each has its own retry policy tuned to its failure mode.
  2. The refresh_marts sub-flow is its own retryable unit; if refresh_one_mart("sales_mart", ...) fails three times, the sub-flow can re-run independently of the parent.
  3. The parent's timeout_seconds=60*60 is a hard cap; without it, a hanging API call can stall the deployment for hours.
  4. Deployment.build_from_flow(...).apply() writes the deployment to Prefect Cloud / Server; the work pool will pull the run at 02:00 UTC.
  5. The UI shows the parent flow with the sub-flow nested inside; per-task logs are streamed live.

Output.

deployment flow_run sub_flows state wall_clock
etl_pipeline_daily 02:00 UTC 2026-05-29 1 Completed 3m 18s

Why this works — concept by concept:

  • Flow + sub-flow pattern — the parent owns the timeline; the sub-flow owns its retry boundary; together they make recovery surgical instead of all-or-nothing.
  • Per-task retry tuning — slow API calls get long backoffs; warehouse loads get longer; validation gets fewer retries because failures are usually deterministic.
  • Work pool decoupling — the same flow can deploy to process, docker, kubernetes pools without code change; the deployment row is the per-environment binding.
  • Hybrid execution — Prefect Cloud + on-prem workers means the orchestrator UI is SaaS but your data stays in your VPC; this is the architecture regulated industries pick.
  • Cost — Prefect Cloud's free tier covers small teams; paid tiers run ~$50-$150 / engineer / month; the hybrid model means the data-plane cost is in your account, which lets finance plan budgets per environment.

5. Decision matrix — pick the right orchestrator (with worked migration examples)

airflow vs dagster vs prefect — the five-dimension decision matrix

After three sections of anatomy, the synthesis is a five-dimension matrix the rest of this section walks through with worked migration examples. The matrix is intentionally short — five rows, three columns, fifteen cells — because senior reviewers want a one-screen artifact they can defend in a design review.

The five dimensions and their winners.

  • Maturity / ecosystem — Airflow wins; 10+ years of operators and three first-class managed services.
  • Asset awareness — Dagster wins; software defined assets are the native primitive, not a bolt-on.
  • Dynamic flows — Prefect wins; sub-flows + .map() make dynamic patterns idiomatic.
  • Hosting options — all three are first-class on managed SaaS and self-hosted; no winner.
  • Best for — depends on team shape; Airflow for cron-style ETL + large teams, Dagster for data-product graphs, Prefect for Pythonic ML / API workflows.

The three pipeline shapes and the canonical tool pick.

  • Shape 1 — "cron-style ETL across hundreds of pipelines" — pick Airflow. The operator library and managed services are unmatched; the task-first mental model fits when you have 100+ pipelines maintained by a large team.
  • Shape 2 — "data-product team with a small number of high-value assets" — pick Dagster. The asset graph, asset catalog, asset checks, partitioned backfills, and lineage are worth the migration cost.
  • Shape 3 — "ML / API / dynamic Python workflows" — pick Prefect. Sub-flows, .map(), retries-as-decorator-args, and the hybrid Cloud + on-prem worker model fit when pipelines are Python-shaped, not SQL-shaped.

The senior signal — name the pipeline shape, then the tool.

  • "For cron-style ETL across 200 pipelines, we run Airflow on Astronomer."
  • "For our data-product graph of 30 marts with lineage and freshness contracts, we run Dagster Cloud."
  • "For our ML training and customer-by-customer API workflows that need dynamic mapping, we run Prefect Cloud with on-prem workers."
  • "One organisation can run all three; the choice is per-pipeline-shape, not company-wide."

Worked example A — port an Airflow DAG to a Dagster asset graph

Detailed explanation. This is the canonical migration. Take a 4-task Airflow DAG that produces a fact_orders table and re-shape it as a 4-asset Dagster graph. The shape change matters more than the line-count change.

Question. Re-shape this Airflow DAG as a Dagster asset graph, preserving the daily schedule and the dependency order. Identify which Airflow primitive maps to which Dagster primitive.

Input.

# Airflow — before
from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule="@daily", start_date=datetime(2026, 5, 1), catchup=False)
def fact_orders_daily():
    @task
    def extract_orders() -> list: ...
    @task
    def clean_orders(rows: list) -> list: ...
    @task
    def load_fact_orders(rows: list) -> int: ...
    @task
    def quality_check(n: int) -> int: ...
    quality_check(load_fact_orders(clean_orders(extract_orders())))

fact_orders_daily()
Enter fullscreen mode Exit fullscreen mode

Code.

# Dagster — after (asset graph)
from dagster import (
    asset, asset_check, AssetCheckResult, AssetCheckSeverity,
    DailyPartitionsDefinition, Definitions, define_asset_job,
    ScheduleDefinition,
)

daily = DailyPartitionsDefinition(start_date="2026-05-01")

@asset(partitions_def=daily)
def raw_orders() -> list: ...

@asset(partitions_def=daily)
def clean_orders(raw_orders: list) -> list: ...

@asset(partitions_def=daily)
def fact_orders(clean_orders: list) -> int: ...

@asset_check(asset="fact_orders")
def fact_orders_positive(fact_orders: int) -> AssetCheckResult:
    return AssetCheckResult(passed=fact_orders > 0, severity=AssetCheckSeverity.ERROR)

defs = Definitions(
    assets=[raw_orders, clean_orders, fact_orders],
    asset_checks=[fact_orders_positive],
    schedules=[ScheduleDefinition(
        job=define_asset_job("daily_job", selection="*"),
        cron_schedule="@daily",
    )],
)
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The four Airflow @tasks become three Dagster @assets plus one @asset_check — the quality check stops being a separate task and becomes an attribute of the asset it guards.
  2. The Airflow DAG-level schedule="@daily" becomes a DailyPartitionsDefinition plus a ScheduleDefinition; the partition shape is now first-class.
  3. Dependencies are inferred from function arguments — clean_orders(raw_orders) declares the edge with no extra wiring.
  4. The Airflow start_date + catchup=False becomes the partition start_date; Dagster's backfill UI lets you pick which partitions to fill rather than catching up by default.
  5. The total line count is similar; the mental model is the noticeable shift — you stopped thinking in tasks and started thinking in tables.

Output (the mapping table reviewers want to see).

Airflow primitive Dagster primitive shape difference
@dag(schedule="@daily") DailyPartitionsDefinition + ScheduleDefinition partition becomes first-class
@task extract_orders @asset raw_orders task → asset (the table)
@task clean_orders(rows) @asset clean_orders(raw_orders) dependency inferred from arg
@task quality_check(n) @asset_check(asset="fact_orders") check attached to asset
XCom passing IO manager (e.g. S3, Snowflake) persisted, arbitrary-size
catchup=False partitions backfill (UI-driven) choose partitions explicitly

Rule of thumb: the migration is a re-modelling, not a port. Reviewers reject ports that keep the task-first mental model and just rename @task to @asset — the model shift is the whole point.

Worked example B — port a cron-style Airflow loop to a Prefect flow

Detailed explanation. This is the lighter migration; both tools are task-first, so the shape change is smaller and most of the win is in dynamic mapping + sub-flows.

Question. Re-shape this Airflow DAG that processes a list of regions as a Prefect flow that uses .map() for fan-out and a sub-flow for downstream notifications.

Input.

# Airflow — before
from airflow.decorators import dag, task
from datetime import datetime

REGIONS = ["US", "EU", "APAC", "LATAM"]

@dag(schedule="@hourly", start_date=datetime(2026, 5, 1), catchup=False)
def regional_pipeline():
    @task
    def process_region(region: str) -> int:
        return 100  # rows processed

    @task
    def notify(totals: list) -> str:
        return f"processed {sum(totals)} rows"

    totals = process_region.expand(region=REGIONS)
    notify(totals)

regional_pipeline()
Enter fullscreen mode Exit fullscreen mode

Code.

# Prefect — after (flow + .map + sub-flow notify)
from prefect import flow, task
from prefect.client.schemas.schedules import CronSchedule
from prefect.deployments import Deployment

REGIONS = ["US", "EU", "APAC", "LATAM"]

@task(retries=3, retry_delay_seconds=30)
def process_region(region: str) -> int:
    return 100

@task
def send_one_notice(message: str) -> str:
    return "ok"

@flow(name="notify_subflow", retries=2)
def notify_subflow(totals: list[int]) -> list[str]:
    msg = f"processed {sum(totals)} rows"
    return [send_one_notice(msg), send_one_notice("backup channel: " + msg)]

@flow(name="regional_pipeline", retries=1, log_prints=True)
def regional_pipeline():
    totals = process_region.map(REGIONS)
    return notify_subflow([t.result() for t in totals])

if __name__ == "__main__":
    Deployment.build_from_flow(
        flow=regional_pipeline,
        name="regional_pipeline_hourly",
        work_pool_name="default-pool",
        schedules=[CronSchedule(cron="0 * * * *", timezone="UTC")],
    ).apply()
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Airflow's process_region.expand(region=REGIONS) becomes Prefect's process_region.map(REGIONS); same idea, slightly different ergonomics.
  2. The notification step becomes its own @flow (notify_subflow) so it gets its own retry boundary and its own UI page.
  3. t.result() blocks until each mapped task completes and unwraps its return value; the parent flow waits before invoking the sub-flow.
  4. Deployment.build_from_flow(...) packages the flow with its work pool and CronSchedule; the deployment is the versioned production artifact.
  5. The line count is similar; the win is the cleaner sub-flow boundary and the more Pythonic mapping syntax.

Output.

Airflow primitive Prefect primitive win
@dag(schedule="@hourly") @flow + Deployment(... CronSchedule ...) versioned deployment
@task @task similar shape
expand(region=...) .map(REGIONS) Pythonic mapping
notify(totals) @flow notify_subflow(...) independent retry boundary
metadata DB-driven retries per-task retries=N declarative

Rule of thumb: port to Prefect when the win is Pythonic ergonomics — dynamic mapping, sub-flows, hybrid execution — not when the win is "we have a Python codebase". Both tools are Python.

Worked example C — translate a Dagster asset graph into a Prefect deployment

Detailed explanation. This is the trickiest direction. Dagster's asset-first model loses some structure when translated to Prefect's task-and-flow model; you keep the dependency edges but you lose the catalog + asset checks.

Question. Translate this Dagster asset graph into a Prefect deployment that preserves the dependency order and adds back a manual quality check at the leaf.

Input.

# Dagster — before
from dagster import asset, asset_check, AssetCheckResult, Definitions

@asset
def raw_orders() -> list: ...
@asset
def clean_orders(raw_orders: list) -> list: ...
@asset
def fact_orders(clean_orders: list) -> int: ...
@asset_check(asset="fact_orders")
def fact_check(fact_orders: int) -> AssetCheckResult:
    return AssetCheckResult(passed=fact_orders > 0)

defs = Definitions(assets=[raw_orders, clean_orders, fact_orders], asset_checks=[fact_check])
Enter fullscreen mode Exit fullscreen mode

Code.

# Prefect — after
from prefect import flow, task
from prefect.deployments import Deployment

@task(retries=3)
def raw_orders() -> list: ...

@task(retries=3)
def clean_orders(rows: list) -> list: ...

@task(retries=3)
def fact_orders(rows: list) -> int: ...

@task
def fact_check(n: int) -> int:
    assert n > 0, f"fact_orders must be > 0; got {n}"
    return n

@flow(name="orders_pipeline", retries=1)
def orders_pipeline() -> int:
    raw   = raw_orders()
    clean = clean_orders(raw)
    n     = fact_orders(clean)
    return fact_check(n)

if __name__ == "__main__":
    Deployment.build_from_flow(
        flow=orders_pipeline,
        name="orders_pipeline_daily",
        work_pool_name="default-pool",
    ).apply()
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Each @asset becomes a @task; the dependency edges still come from function arguments.
  2. The Dagster @asset_check becomes a regular @task (fact_check) that asserts and raises on failure; you lose the structural attachment but keep the assertion.
  3. The Definitions(...) registration becomes a Deployment.build_from_flow(...).apply(); the catalog UI is gone.
  4. Schedules + partitions you had in Dagster become deployment-level CronSchedule + your own partition_key parameter.
  5. You lose: the asset catalog, lineage, partitioned backfills (Prefect handles backfills differently), asset-level freshness alerts.

Output (the loss-and-gain table reviewers want).

Dagster feature Prefect equivalent net
@asset @task shape preserved
@asset_check @task that asserts structural attachment lost
asset catalog UI flow runs UI catalog UX lost
DailyPartitionsDefinition manual partition_key parameter manual wiring
IO manager manual S3 / Snowflake writes more boilerplate
Definitions(...) Deployment.build_from_flow(...) similar shape

Rule of thumb: Dagster → Prefect is a lossy translation; only do it when the team specifically needs Prefect's flow + sub-flow ergonomics enough to give up the asset catalog. Most teams that want Pythonic flows pick Prefect first; teams that have already adopted Dagster rarely migrate off.

SQL
Topic — etl
Orchestrator-shape drills

Practice →

Python
Language — python
Python orchestration patterns

Practice →

Solution Using a per-pipeline-shape tool-selection matrix

Code.

-- Materialise the per-pipeline-shape choice as a query you can paste into a design doc.
CREATE TABLE orchestrator_choice AS
SELECT * FROM (VALUES
    ('cron-style ETL, 100+ pipelines', 'Airflow',  'massive operator library + MWAA / Astronomer / Composer'),
    ('data-product graph + lineage',   'Dagster',  'asset graph + catalog + checks + partitioned backfills'),
    ('ML / API / dynamic Python',      'Prefect',  'sub-flows + .map + hybrid Cloud + on-prem workers'),
    ('regulated industry (data plane in VPC)', 'Prefect or Airflow self-host', 'control vs data plane separation'),
    ('small team, fast onboarding',    'Prefect',  'Pythonic; flows look like functions'),
    ('large team, existing operators', 'Airflow',  'operator ecosystem + existing skill base'),
    ('multi-tool org',                 'Hybrid',   'Airflow for ETL + Dagster for marts + Prefect for ML')
) AS t(pipeline_shape, recommended_tool, tie_breaker);
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

pipeline_shape recommended_tool tie_breaker
cron-style ETL, 100+ pipelines Airflow massive operator library + managed services
data-product graph + lineage Dagster asset graph + catalog + checks + partitioned backfills
ML / API / dynamic Python Prefect sub-flows + .map + hybrid execution
regulated industry Prefect or Airflow self-host data plane stays in VPC
small team, fast onboarding Prefect flows look like Python functions
large team, existing operators Airflow ecosystem + skill base
multi-tool org Hybrid run each per-shape
  1. Row 1 — Airflow is the right default for cron-style ETL at scale; you do not throw away 200 working DAGs to chase a trend.
  2. Row 2 — Dagster is the right default when the data product itself is the unit of work; the catalog UI pays for itself.
  3. Row 3 — Prefect is the right default for ML-shaped pipelines that need dynamic mapping and sub-flows as first-class primitives.
  4. Row 4 — for regulated industries, the self-hosted path (Airflow OSS, Prefect Cloud + on-prem workers) keeps the data plane in your VPC; Dagster Cloud is hybrid too.
  5. Row 5-6 — team shape often dominates; Pythonic teams pick Prefect, large enterprise teams stay on Airflow.
  6. Row 7 — "run all three" is the senior, contrarian answer; one tool does not have to win at the org level.

Output.

pipeline_shape recommended_tool
cron-style ETL, 100+ pipelines Airflow
data-product graph + lineage Dagster
ML / API / dynamic Python Prefect
regulated industry Prefect or Airflow self-host
multi-tool org Hybrid

Why this works — concept by concept:

  • Per-pipeline-shape selection — collapses the vague "best tool" debate into a one-row lookup keyed on the kind of pipeline you are building.
  • Tie-breaker column — surfaces the actual deciding feature on each row, not the marketing-list feature.
  • "Hybrid" is allowed — admits that real organisations often run multiple orchestrators; senior reviewers respect this.
  • Regulated-industry row — explicitly calls out the data-plane / control-plane distinction that compliance teams care about.
  • CostO(1) to read; the actual migration spike to model two example pipelines in your top-two candidates is 1-2 weeks of engineering time.

Choosing the right orchestrator (cheat sheet)

A one-screen cheat sheet for data orchestration — pick the tool that matches your pipeline shape, team mental model, and asset literacy.

Your situation … Tool Canonical primitive Why
Cron-style ETL across 100+ pipelines Airflow @dag + @task + operators massive operator library + MWAA / Astronomer / Composer
Need an asset catalog + lineage Dagster @asset + IO manager software-defined assets are native
Pythonic ML / API workflows Prefect @flow + @task + sub-flow sub-flows + .map + Pythonic ergonomics
Want dynamic task mapping Airflow 2.x or Prefect dynamic_task_mapping / .map() both first-class; Prefect feels more natural
Need partitioned backfills Dagster DailyPartitionsDefinition partition shape is structural
Need column-level lineage Dagster asset catalog + metadata derived from asset graph
Need data-plane in our VPC Prefect Cloud + on-prem workers, or Airflow self-host work pool / executor hybrid execution
Need 1000+ pre-built operators Airflow provider packages every cloud + every SaaS already wired
Team thinks in tables, not jobs Dagster @asset + asset checks mental model fit
Team thinks in functions, not configs Prefect @flow flows look like Python functions
Small team, no orchestrator yet Prefect @flow shortest time-to-first-pipeline
Large enterprise, existing Airflow stay on Airflow @dag migration cost rarely justifies churn
ML pipelines with dynamic shapes Prefect @flow + .map + sub-flow dynamic fan-out + nested retries
Multi-team org, multi-shape pipelines Hybrid (all three) per-team one tool does not have to win
Backfilling 90 days of partitions Dagster partition UI backfill first-class UX for partition selection
Migrating off cron + bash Prefect @flow shortest learning curve from "scripts"
Migrating off Luigi Airflow or Dagster @dag / @asset both common Luigi targets
Need EU + US + APAC region pinning All three per-deployment / per-pool every tool supports region binding
Want free + open source only All three OSS self-host every tool ships an OSS path

Frequently asked questions

What is data orchestration and how is it different from cron or a CI system?

Data orchestration is the discipline of turning a set of data jobs into a graph with dependencies, retries, schedules, sensors, backfills, and observability — and it differs from cron because cron has no concept of dependencies (it just fires jobs at times), and from a CI system because CI runs on code changes and is not partition-aware, sensor-aware, or backfill-aware. A modern dag scheduler like Airflow, Dagster, or Prefect knows that B depends on A, knows how to re-run only the failed branch of a graph, knows how to fill 30 daily partitions in order, and knows how to surface lineage and freshness in a UI. Cron and CI cannot do any of those without you re-implementing the orchestrator on top.

Airflow vs Dagster vs Prefect — which one should I pick in 2026?

There is no universal winner — pick the tool that matches your pipeline shape, your team's mental model, and your asset literacy. Airflow wins on cron-style ETL across 100+ pipelines because of the operator library and managed services (MWAA, Astronomer, Cloud Composer). Dagster wins on data-product graphs because software defined assets, the catalog, asset checks, and partitioned backfills are native. Prefect wins on Pythonic ML / API workflows because sub-flows, .map(), retries-as-decorator-args, and the hybrid Cloud + on-prem worker model fit Python-shaped pipelines best. Many modern orgs run all three — one orchestrator does not have to win at the org level; pick per pipeline shape.

What are software-defined assets, and why are they Dagster's killer feature?

Software defined assets (SDAs) flip the orchestrator mental model from "what jobs do I need to run, and when?" to "what data assets do I produce, and what produces them?". Each @asset declares both the dataset it produces and the upstream datasets it depends on (inferred from function arguments); Dagster derives the orchestration graph, the catalog, the lineage, the freshness contracts, and the partitioning from the asset graph. The killer feature is that the data product itself becomes the unit of work — not the job that produces it. This means you get an automatic data catalog with row counts, previews, freshness, lineage, and asset checks per asset, without bolting on tools like Atlan / DataHub. Teams that adopt Dagster usually say the catalog UI is what pays for the migration; the SDA mental shift is what stays.

What are Airflow alternatives — and what do you give up by leaving Airflow?

The main airflow alternatives in 2026 are Dagster (asset-first, native catalog, native partitioning, native asset checks) and Prefect (Pythonic flows, sub-flows, dynamic mapping, hybrid Cloud + on-prem). Leaving Airflow costs you: (1) the largest operator library in the industry (1000+ providers cover every cloud + warehouse + SaaS); (2) three mature managed services (MWAA, Astronomer, Cloud Composer); (3) the largest installed-base community + StackOverflow corpus; (4) the largest pool of engineers who already know the tool. In return you gain: an asset-first mental model (Dagster) or a Python-first ergonomic model (Prefect). The migration cost is non-trivial — 1-2 quarters for a 50-DAG estate — so most large orgs keep Airflow for legacy ETL and adopt Dagster or Prefect for new pipelines rather than rewriting wholesale.

What is the difference between an Airflow operator, a Dagster asset, and a Prefect task?

An Airflow operator is a class that defines one unit of work (e.g. S3KeySensor, PythonOperator, BashOperator, SnowflakeOperator); the developer composes operators into a DAG and Airflow's scheduler runs them in dependency order. A Dagster asset is a Python function decorated with @asset that declares the dataset it produces; dependencies are inferred from function arguments, the asset graph is the DAG, and the asset catalog tracks materialisations and freshness per asset. A Prefect task is a Python function decorated with @task that gains retries, caching, and observability; tasks are composed inside a @flow (or a sub-flow), and execution flows like normal Python with the runtime decorating each call. The mental shift: operator = "do this work"; asset = "produce this dataset"; task = "this function with retries and observability". Each tool's killer feature falls out of its primitive.

How do I handle backfills in Airflow vs Dagster vs Prefect?

Airflow backfills via airflow dags backfill -s START -e END dag_id; the scheduler enqueues every DagRun in the window in order. The classical gotcha is catchup=True (default) automatically backfilling every missed run since start_date — always ship new DAGs with catchup=False. Dagster treats partitions as first-class: you declare a DailyPartitionsDefinition, then backfill via the UI by selecting partitions to materialise; the partition shape (daily, hourly, static_partitioned, multi_partitioned) is structural, so backfills become "materialise these N partitions" rather than "trigger this DAG N times". Prefect handles backfills by re-running deployments with explicit parameters={"partition_key": ...}; partition shape is not as first-class as in Dagster, so you typically wire it as a flow parameter. For pipelines that backfill often, Dagster's partition UI is the most ergonomic; Airflow's backfill is the most battle-tested.


Practice on PipeCode

PipeCode ships 450+ data-engineering interview problems — including SQL + Python drills keyed to the same data orchestration skill set this guide teaches (DAG shape, dependency graphs, partitioned backfills, asset checks, dynamic flow mapping, sub-flows, sensor-and-schedule wiring). Whether you're prepping airflow vs dagster design rounds the night before a screen or shipping an airflow alternatives migration over a quarter, the practice library mirrors the same anatomy-first mental model — plus the dbt tests + Great Expectations + warehouse + workflow patterns you'll wire into your production orchestrator of choice.

Kick off via Explore practice →; drill the SQL practice lane →; fan out into the ETL pipeline lane →; rehearse data-validation drills →; reinforce aggregation reconciliation patterns →; widen coverage on the full Python practice library →.

Source: dev.to

arrow_back Back to Tutorials