Blog
airflowdagtaskflowdynamic-task-mappingdata-pipeline

The Right Way to Author Airflow 3 DAGs

TaskFlow API, dependencies & branching, Dynamic Task Mapping, and TaskGroups — how to write DAGs properly in Airflow 3.

Data DynamicsJune 28, 202610 min read

This is Part 4 of the Airflow 3 in Practice series. In the previous part, Configuration & Optimization, we tuned the cluster; now we look at how to write the DAGs that run on top of it. The next part is Part 5: Advanced DAG Techniques.

A DAG (Directed Acyclic Graph) is a pipeline definition that spells out "what to run, and in what order" as code. Two DAGs can do exactly the same thing, yet depending on how they are written, one becomes easy-to-read code and the other becomes code no one dares to touch six months later. This article lays out the standard for "this is how you do it" on Airflow 3.

In Airflow 3, authoring a DAG starts from a single point: imports come from from airflow.sdk import .... Because workers no longer connect directly to the metadata DB but instead communicate via the Task SDK, using the SDK import path is the canonical approach.

Two ways to author: classic Operators vs the TaskFlow API

Let's first look at the same pipeline written both ways, side by side. We'll use the most common ETL: "extract → transform → load."

The classic Operator approach — you create tasks as objects and pass values around via XCom.

from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
import pendulum
 
 
def extract(**context):
    return {"rows": [1, 2, 3, 4]}
 
 
def transform(**context):
    ti = context["ti"]
    data = ti.xcom_pull(task_ids="extract")
    return [r * 10 for r in data["rows"]]
 
 
def load(**context):
    ti = context["ti"]
    rows = ti.xcom_pull(task_ids="transform")
    print(f"Load complete: {rows}")
 
 
with DAG(
    dag_id="etl_classic",
    start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
    schedule="@daily",
    catchup=False,
) as dag:
    t_extract = PythonOperator(task_id="extract", python_callable=extract)
    t_transform = PythonOperator(task_id="transform", python_callable=transform)
    t_load = PythonOperator(task_id="load", python_callable=load)
 
    t_extract >> t_transform >> t_load

The TaskFlow API approach — decorate a function with @task and that function is the task. Pass a function's return value as the argument to the next function and the XCom wiring happens automatically.

from airflow.sdk import dag, task
import pendulum
 
 
@dag(
    schedule="@daily",
    start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
    catchup=False,
)
def etl_taskflow():
    @task
    def extract():
        return {"rows": [1, 2, 3, 4]}
 
    @task
    def transform(payload: dict):
        return [r * 10 for r in payload["rows"]]
 
    @task
    def load(rows: list):
        print(f"Load complete: {rows}")
 
    load(transform(extract()))
 
 
etl_taskflow()

The difference is obvious at a glance. With TaskFlow you never call xcom_pull/xcom_push directly; instead the function return values and arguments themselves become the dependencies and the data hand-off. The single line load(transform(extract())) is what creates the extract >> transform >> load order.

AspectClassic OperatorsTaskFlow API
Dependency expressionWired manually with >>Inferred from function call relationships
Data passingExplicit xcom_pull/pushImplicit via return values and arguments
ReadabilityCentered on objects and task IDsReads like plain Python functions
Best fitOff-the-shelf Operators (Sensors, Providers, etc.)Pure Python logic

The two are not an either/or choice. Real-world DAGs usually mix them. Write pure Python logic with @task, handle the parts that touch external systems with off-the-shelf Operators, and stitch them together with >>.

How Operator, Sensor, and Hook relate

If you write everything by hand with only @task, you end up rewriting external-system integration code every time. Knowing the three building blocks Airflow provides means you don't have to.

Loading diagram…
  • Operator: a unit that "performs one piece of work, once." PythonOperator, BashOperator, the S3...Operator that a Provider supplies, and so on.
  • Sensor: a special kind of Operator whose job is to "wait until some condition becomes true" — waiting for a file to arrive, an external job to finish, and the like. In Airflow 3, putting a Sensor in deferrable mode lets the Triggerer handle the polling, so it doesn't occupy a worker slot (for details see the Architecture part).
  • Hook: a low-level wrapper that attaches to an external system. Operators and Sensors use Hooks internally, and a Hook reads Airflow's Connection (connection details) to establish the actual connection.

In short, a Hook handles "how to connect," while Operators and Sensors handle "what to do over that connection." External system integration itself is covered in depth in Part 8: External System Integration.

Dependencies, trigger_rule, and branching

Dependencies are expressed with >> (downstream) and << (upstream). When you have many tasks, the chain/cross_downstream helpers are easier to read.

from airflow.sdk import chain
 
# Equivalent to a >> b >> c >> d
chain(a, b, c, d)

By default, a task runs only when all of its parents succeed (all_success). What changes this is the trigger_rule. For example, "run a cleanup task no matter what, whether the preceding steps succeed or fail" uses all_done.

trigger_ruleRun condition
all_success (default)All parents succeeded
all_doneAll parents finished (regardless of success/failure) — useful for cleanup tasks
one_successAt least one parent succeeded
none_failed_min_one_successNo failures + at least one success — useful at branch join points

Branching is the pattern of "running only one of several branches depending on a condition." A function decorated with @task.branch returns the id of the downstream task to run, and the remaining branches are skipped.

from airflow.sdk import dag, task
import pendulum
 
 
@dag(schedule="@daily",
     start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
     catchup=False)
def etl_with_branch():
    @task
    def extract():
        return {"rows": [1, 2, 3, 4]}
 
    @task.branch
    def check_quality(payload: dict):
        # If there are rows, take the load branch; if empty, take the notify branch
        return "load" if payload["rows"] else "notify_empty"
 
    @task
    def load():
        print("load")
 
    @task
    def notify_empty():
        print("No data — notifying")
 
    # Final task where the two branches join: adjust trigger_rule so it runs even if one side is skipped
    @task(trigger_rule="none_failed_min_one_success")
    def finalize():
        print("finalize")
 
    data = extract()
    branch = check_quality(data)
    branch >> [load(), notify_empty()] >> finalize()
 
 
etl_with_branch()

The key here is the trigger_rule at the join point. When branching skips one side, if the join task keeps the default all_success, it gets skipped too because of the skipped parent. So you must switch it to none_failed_min_one_success for it to run correctly. Visualized, the flow looks like this:

Loading diagram…

Dynamic Task Mapping: fanning out at runtime

As you work with DAGs, you run into cases where "you can't know the number of items to process ahead of time." Today there might be 3 files, tomorrow 50. Dynamic Task Mapping is what automatically expands tasks at runtime to match the number of items. You use .expand().

from airflow.sdk import dag, task
import pendulum
 
 
@dag(schedule="@daily",
     start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
     catchup=False)
def fanout_pipeline():
    @task
    def list_files() -> list[str]:
        # Decide the list of files to process at runtime (the count can vary each time)
        return ["a.csv", "b.csv", "c.csv"]
 
    @task
    def process(file_name: str) -> int:
        print(f"Processing: {file_name}")
        return len(file_name)  # Example: the result of processing each file (a fake value)
 
    @task
    def summarize(results: list[int]):
        # The return values of all mapped tasks arrive collected into a list
        print(f"Processed {len(results)} items, total {sum(results)}")
 
    files = list_files()
    counts = process.expand(file_name=files)  # process is expanded to match the number of files
    summarize(counts)                          # Aggregate the results into a single task (fan-in)
 
 
fanout_pipeline()

The single line process.expand(file_name=files) creates as many process task instances as the length of the list produced upstream. Then summarize receives those results as a single list and aggregates them. This is the fan-out / fan-in pattern: one input task expands into N and then collapses back into one.

Loading diagram…

If the number of tasks mapped by .expand() explodes, it burdens the scheduler and the metadata DB. Control concurrent execution with the three concurrency layers from Part 3 (parallelism, max_active_tasks_per_dag, Pools).

Structuring with TaskGroups (SubDAGs are gone)

When you have dozens of tasks, the graph stops being readable at a glance. A TaskGroup lets you bundle related tasks into a visual and logical group that can be collapsed.

from airflow.sdk import dag, task, task_group
import pendulum
 
 
@dag(schedule="@daily",
     start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
     catchup=False)
def grouped_pipeline():
    @task
    def extract():
        return [1, 2, 3]
 
    @task_group(group_id="cleanse")
    def cleanse(data):
        @task
        def dedup(rows):
            return list(set(rows))
 
        @task
        def validate(rows):
            return rows
 
        return validate(dedup(data))
 
    @task
    def load(rows):
        print(f"Load: {rows}")
 
    load(cleanse(extract()))
 
 
grouped_pipeline()

In the UI the cleanse group shows up collapsed as a single box, and expanding it reveals the inner dedup and validate. Task ids inside the group get a group prefix, like cleanse.dedup.

Important: SubDAGs have been removed in Airflow 3. Replace every pattern you used to bundle with a SubDAG with a TaskGroup instead. If you want to split DAGs into larger units, you're better off connecting cross-DAG dependencies in a data-aware way with Asset-based scheduling from Part 6.

DAG anti-patterns to avoid

DAG files are parsed periodically by the DAG processor (in Airflow 3, an independent process separate from the scheduler). The crucial point is that top-level code in the file runs repeatedly, for every DAG, on every parse.

# Anti-pattern: doing heavy work at the top level runs it on every parse
import pandas as pd
df = pd.read_parquet("s3://big-bucket/huge.parquet")  # ❌ downloaded on every parse
 
 
# The right way: heavy work must go "inside" a task function
@task
def load_data():
    import pandas as pd
    return pd.read_parquet("s3://big-bucket/huge.parquet")  # ✅ only when it runs

Common pitfalls you'll see:

  • Heavy top-level code/imports: don't put DB queries, file downloads, or heavy library imports at the very top of a DAG file. Parsing slows down and the scheduler falls behind. Move heavy imports inside task functions.
  • Non-deterministic DAGs: don't use datetime.now() or random numbers to make the DAG structure (number of tasks, ids) different every time. The same file should always produce the same graph, so that tracking and re-runs stay stable. If you need time-based logic, use logical_date or data_interval_start/end at the task's execution time, not in the structure (execution_date was removed in Airflow 3).
  • Giant XComs: don't stuff a large DataFrame or a payload of tens of MB straight into a task's return value (XCom). XCom is stored in the metadata DB, so the DB balloons. The canonical approach is to keep large data in external storage and pass only the path or key through XCom. This topic is covered in earnest in Part 7: XCom & Data Passing.

Avoiding just these three prevents a good share of operational incidents like "why is the scheduler slow," "why is the graph different every time," and "why is the metadata DB blowing up."

Wrap-up

  • The starting point for an Airflow 3 DAG is from airflow.sdk import dag, task. Write pure Python with the TaskFlow API, external integrations with off-the-shelf Operators, and mix the two.
  • Operators/Sensors handle "what to do," and Hooks handle "how to connect."
  • Use @task.branch for branching and trigger_rule="none_failed_min_one_success" at the join point. When the item count is variable, use .expand() for runtime fan-out.
  • For structuring, use TaskGroups (SubDAGs are removed). Avoid heavy top-level code, non-deterministic structure, and giant XComs.

In the next part, Part 5: Advanced DAG Techniques, we cover a level up of practical techniques: running scripts, passing parameters, error handling, calling other DAGs, executing PostgreSQL queries, re-running failures, and date-based logic. For the official documentation, see Airflow Authoring DAGs.