Blog
airflowdagtaskflowdynamic-task-mappingdata-pipeline

Airflow 3 DAG 작성의 정석

TaskFlow API, 의존성·브랜칭, Dynamic Task Mapping, TaskGroup까지 — Airflow 3에서 DAG를 제대로 쓰는 법.

Data Dynamics2026년 6월 28일15 min read

이 글은 "Airflow 3 실전 연재"의 4편입니다. 직전 편 환경설정 & 최적화에서 클러스터를 다듬었으니, 이제 그 위에서 돌릴 DAG를 어떻게 쓰는가를 다룹니다. 다음 편은 5편: DAG 고급 테크닉입니다.

DAG(Directed Acyclic Graph)는 "무엇을, 어떤 순서로 실행할지"를 코드로 적은 파이프라인 정의입니다. 같은 일을 하는 DAG라도 어떻게 쓰느냐에 따라 읽기 쉬운 코드가 되기도 하고, 6개월 뒤 아무도 못 건드리는 코드가 되기도 합니다. 이 글은 Airflow 3 기준으로 "이렇게 쓰면 됩니다"의 표준을 정리합니다.

Airflow 3에서 DAG 작성의 출발점은 단 하나입니다. 임포트는 from airflow.sdk import .... 워커가 메타데이터 DB에 직접 붙지 않고 Task SDK로 통신하는 구조라, SDK 경로를 쓰는 것이 정석입니다.

두 가지 작성 방식: 전통 Operator vs TaskFlow API

먼저 같은 파이프라인을 두 방식으로 나란히 보겠습니다. "추출 → 변환 → 적재"라는 가장 흔한 ETL입니다.

전통적인 Operator 방식 — 태스크를 객체로 만들고, XCom으로 값을 주고받습니다.

from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
import pendulum
 
 
def extract(**context):
    return {"rows": [1, 2, 3, 4]}
 
 
def transform(**context):
    ti = context["ti"]
    data = ti.xcom_pull(task_ids="extract")
    return [r * 10 for r in data["rows"]]
 
 
def load(**context):
    ti = context["ti"]
    rows = ti.xcom_pull(task_ids="transform")
    print(f"적재 완료: {rows}")
 
 
with DAG(
    dag_id="etl_classic",
    start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
    schedule="@daily",
    catchup=False,
) as dag:
    t_extract = PythonOperator(task_id="extract", python_callable=extract)
    t_transform = PythonOperator(task_id="transform", python_callable=transform)
    t_load = PythonOperator(task_id="load", python_callable=load)
 
    t_extract >> t_transform >> t_load

TaskFlow API 방식 — 함수에 @task를 붙이면 그 함수가 곧 태스크입니다. 반환값을 다음 함수 인자로 넘기면 XCom 배선이 자동으로 됩니다.

from airflow.sdk import dag, task
import pendulum
 
 
@dag(
    schedule="@daily",
    start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
    catchup=False,
)
def etl_taskflow():
    @task
    def extract():
        return {"rows": [1, 2, 3, 4]}
 
    @task
    def transform(payload: dict):
        return [r * 10 for r in payload["rows"]]
 
    @task
    def load(rows: list):
        print(f"적재 완료: {rows}")
 
    load(transform(extract()))
 
 
etl_taskflow()

차이가 한눈에 보입니다. TaskFlow에서는 xcom_pull/xcom_push를 직접 부르지 않고, 함수 반환값과 인자가 그대로 의존성과 데이터 전달이 됩니다. load(transform(extract()))라는 한 줄이 곧 extract >> transform >> load 순서를 만들어 줍니다.

관점전통 OperatorTaskFlow API
의존성 표현>>로 수동 연결함수 호출 관계로 자동
데이터 전달xcom_pull/push 명시반환값·인자로 암묵
가독성객체·태스크 ID 위주평범한 파이썬 함수처럼
적합한 곳기성 Operator(Sensor·Provider 등)순수 파이썬 로직

둘은 양자택일이 아닙니다. 실무 DAG는 보통 혼합합니다. 순수 파이썬 로직은 @task로, 외부 시스템을 다루는 부분은 기성 Operator로 쓰고, 그 사이를 >>로 잇습니다.

Operator · Sensor · Hook의 관계

@task만으로 모든 걸 직접 짜면 외부 시스템 연동 코드를 매번 다시 쓰게 됩니다. Airflow가 제공하는 세 가지 빌딩 블록을 알면 그럴 필요가 없습니다.

Loading diagram…
  • Operator: "한 가지 작업을 한 번 수행"하는 단위입니다. PythonOperator, BashOperator, Provider가 주는 S3...Operator 등.
  • Sensor: 특별한 종류의 Operator로, "어떤 조건이 참이 될 때까지 기다리는" 일을 합니다. 파일 도착, 외부 작업 완료 같은 대기. Airflow 3에서는 deferrable 모드로 두면 Triggerer가 폴링을 대신해 워커 슬롯을 점유하지 않습니다(자세한 내용은 아키텍처 편 참고).
  • Hook: 외부 시스템에 붙는 저수준 래퍼입니다. Operator·Sensor 내부에서 Hook을 쓰고, Hook은 Airflow의 Connection(접속 정보)을 읽어 실제 연결을 맺습니다.

정리하면 Hook은 "어떻게 연결하나", Operator·Sensor는 "그 연결로 무엇을 하나" 를 담당합니다. 외부 시스템 연동 자체는 8편: 외부 시스템 연동에서 깊게 다룹니다.

의존성, trigger_rule, 그리고 브랜칭

의존성은 >>(다운스트림)·<<(업스트림)으로 적습니다. 태스크가 많아지면 chain/cross_downstream 헬퍼가 읽기 편합니다.

from airflow.sdk import chain
 
# a >> b >> c >> d 와 동일
chain(a, b, c, d)

기본적으로 태스크는 모든 부모가 성공해야 실행됩니다(all_success). 이걸 바꾸는 것이 trigger_rule입니다. 예를 들어 "앞 단계들이 성공하든 실패하든 무조건 정리 작업을 돌려라"는 all_done을 씁니다.

trigger_rule실행 조건
all_success (기본)모든 부모 성공
all_done모든 부모 종료(성공·실패 무관) — 정리 태스크에 유용
one_success부모 중 하나라도 성공
none_failed_min_one_success실패 없음 + 최소 하나 성공 — 브랜칭 합류 지점에 유용

브랜칭은 "조건에 따라 갈래 중 하나만 실행"하는 패턴입니다. @task.branch로 적은 함수가 실행할 다운스트림 태스크의 id를 반환하면, 나머지 갈래는 skip 됩니다.

from airflow.sdk import dag, task
import pendulum
 
 
@dag(schedule="@daily",
     start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
     catchup=False)
def etl_with_branch():
    @task
    def extract():
        return {"rows": [1, 2, 3, 4]}
 
    @task.branch
    def check_quality(payload: dict):
        # 행이 있으면 적재 갈래로, 비어 있으면 알림 갈래로
        return "load" if payload["rows"] else "notify_empty"
 
    @task
    def load():
        print("적재")
 
    @task
    def notify_empty():
        print("데이터 없음 — 알림")
 
    # 두 갈래가 합류하는 마무리 태스크: 한쪽이 skip돼도 돌도록 trigger_rule 조정
    @task(trigger_rule="none_failed_min_one_success")
    def finalize():
        print("마무리")
 
    data = extract()
    branch = check_quality(data)
    branch >> [load(), notify_empty()] >> finalize()
 
 
etl_with_branch()

여기서 핵심은 **합류 지점의 trigger_rule**입니다. 브랜칭으로 한쪽이 skip되면, 합류 태스크가 기본값 all_success라면 skip된 부모 때문에 함께 skip됩니다. 그래서 none_failed_min_one_success로 바꿔야 정상 실행됩니다. 이 흐름을 그림으로 보면:

Loading diagram…

Dynamic Task Mapping: 런타임에 팬아웃하기

DAG를 쓰다 보면 "처리할 항목 개수를 미리 알 수 없는" 경우가 생깁니다. 오늘은 파일이 3개, 내일은 50개일 수 있습니다. 이때 항목 수만큼 태스크를 런타임에 자동으로 펼치는 것이 Dynamic Task Mapping입니다. .expand()를 쓰면 됩니다.

from airflow.sdk import dag, task
import pendulum
 
 
@dag(schedule="@daily",
     start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
     catchup=False)
def fanout_pipeline():
    @task
    def list_files() -> list[str]:
        # 처리할 파일 목록을 런타임에 결정 (개수는 매번 달라질 수 있음)
        return ["a.csv", "b.csv", "c.csv"]
 
    @task
    def process(file_name: str) -> int:
        print(f"처리 중: {file_name}")
        return len(file_name)  # 예시: 각 파일 처리 결과(가짜 값)
 
    @task
    def summarize(results: list[int]):
        # 모든 매핑된 태스크의 반환값이 리스트로 모여 들어온다
        print(f"총 {len(results)}개 처리, 합계 {sum(results)}")
 
    files = list_files()
    counts = process.expand(file_name=files)  # files 개수만큼 process가 펼쳐짐
    summarize(counts)                          # 결과를 한 태스크로 집계(fan-in)
 
 
fanout_pipeline()

process.expand(file_name=files) 한 줄이, 업스트림이 만든 리스트의 길이만큼 process 태스크 인스턴스를 생성합니다. 그리고 그 결과들을 summarize가 하나의 리스트로 받아 집계합니다. 1개의 입력 태스크가 N개로 펼쳐졌다가 다시 1개로 모이는 fan-out / fan-in 패턴입니다.

Loading diagram…

.expand()로 매핑되는 태스크 개수가 폭증하면 스케줄러·메타DB에 부담이 됩니다. 동시 실행은 3편의 동시성 3계층(parallelism, max_active_tasks_per_dag, Pool)으로 제어하세요.

TaskGroup으로 구조화하기 (SubDAG는 사라졌습니다)

태스크가 수십 개가 되면 그래프가 한눈에 안 들어옵니다. 관련 태스크를 묶어 시각적·논리적 그룹으로 접을 수 있는 것이 TaskGroup입니다.

from airflow.sdk import dag, task, task_group
import pendulum
 
 
@dag(schedule="@daily",
     start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
     catchup=False)
def grouped_pipeline():
    @task
    def extract():
        return [1, 2, 3]
 
    @task_group(group_id="cleanse")
    def cleanse(data):
        @task
        def dedup(rows):
            return list(set(rows))
 
        @task
        def validate(rows):
            return rows
 
        return validate(dedup(data))
 
    @task
    def load(rows):
        print(f"적재: {rows}")
 
    load(cleanse(extract()))
 
 
grouped_pipeline()

UI에서는 cleanse 그룹이 하나의 박스로 접혀 보이고, 펼치면 내부 dedup·validate가 드러납니다. 그룹 안 태스크 id에는 cleanse.dedup처럼 그룹 prefix가 붙습니다.

중요: Airflow 3에서 SubDAG는 제거되었습니다. 과거 SubDAG로 묶던 패턴은 모두 TaskGroup으로 대체하세요. 더 큰 단위로 DAG를 나누고 싶다면 6편의 Asset 기반 스케줄링으로 DAG 간 의존성을 데이터 인지 방식으로 연결하는 편이 낫습니다.

피해야 할 DAG 안티패턴

DAG 파일은 DAG processor가 주기적으로 파싱합니다(Airflow 3에서 스케줄러와 분리된 독립 프로세스). 즉 파일 안의 top-level 코드는 파싱할 때마다, 모든 DAG에 대해 반복 실행된다는 점이 핵심입니다.

# 안티패턴: top-level에서 무거운 작업을 하면 매 파싱마다 실행된다
import pandas as pd
df = pd.read_parquet("s3://big-bucket/huge.parquet")  # ❌ 파싱 때마다 다운로드
 
 
# 정석: 무거운 작업은 반드시 태스크 함수 "안"으로
@task
def load_data():
    import pandas as pd
    return pd.read_parquet("s3://big-bucket/huge.parquet")  # ✅ 실행될 때만

자주 보는 함정들:

  • top-level의 무거운 코드/import: DB 조회, 파일 다운로드, 무거운 라이브러리 import를 DAG 파일 최상단에 두지 마세요. 파싱이 느려지고 스케줄러가 밀립니다. 무거운 import는 태스크 함수 안으로 넣습니다.
  • 비결정적 DAG: datetime.now()나 난수로 DAG 구조(태스크 개수·id) 를 매번 다르게 만들지 마세요. 같은 파일은 항상 같은 그래프를 만들어야 추적·재실행이 안정적입니다. 시간 기반 로직이 필요하면 구조가 아니라 태스크 실행 시점logical_datedata_interval_start/end를 쓰세요(Airflow 3에서 execution_date는 제거됐습니다).
  • 거대 XCom: 태스크 반환값(XCom)에 큰 DataFrame이나 수십 MB 페이로드를 그대로 담지 마세요. XCom은 메타데이터 DB에 저장되므로 DB가 비대해집니다. 큰 데이터는 외부 스토리지에 두고 경로·키만 XCom으로 넘기는 것이 정석입니다. 이 주제는 7편: XCom & 데이터 전달에서 본격적으로 다룹니다.

이 세 가지만 피해도 "왜 스케줄러가 느리지", "왜 매번 그래프가 달라지지", "왜 메타DB가 터지지" 같은 운영 사고의 상당수를 예방할 수 있습니다.

정리

  • Airflow 3 DAG의 출발점은 from airflow.sdk import dag, task. 순수 파이썬은 TaskFlow API로, 외부 연동은 기성 Operator로 쓰고 둘을 섞습니다.
  • Operator/Sensor가 "무엇을 하나", Hook이 "어떻게 연결하나"를 담당합니다.
  • 분기는 @task.branch, 합류 지점은 trigger_rule="none_failed_min_one_success"로. 항목 수가 가변이면 .expand()런타임 팬아웃.
  • 구조화는 TaskGroup(SubDAG는 제거됨). top-level 무거운 코드·비결정적 구조·거대 XCom은 피합니다.

다음 편 5편: DAG 고급 테크닉에서는 스크립트 실행·파라미터 전달·에러 처리·다른 DAG 호출·PostgreSQL 쿼리 실행·실패 재실행·날짜 기준 같은 한 단계 위의 실전 기술을 다룹니다. 공식 문서는 Airflow Authoring DAGs를 참고하세요.