Airflow 3 DAG 작성의 정석
TaskFlow API, 의존성·브랜칭, Dynamic Task Mapping, TaskGroup까지 — Airflow 3에서 DAG를 제대로 쓰는 법.
이 글은 "Airflow 3 실전 연재"의 4편입니다. 직전 편 환경설정 & 최적화에서 클러스터를 다듬었으니, 이제 그 위에서 돌릴 DAG를 어떻게 쓰는가를 다룹니다. 다음 편은 5편: DAG 고급 테크닉입니다.
DAG(Directed Acyclic Graph)는 "무엇을, 어떤 순서로 실행할지"를 코드로 적은 파이프라인 정의입니다. 같은 일을 하는 DAG라도 어떻게 쓰느냐에 따라 읽기 쉬운 코드가 되기도 하고, 6개월 뒤 아무도 못 건드리는 코드가 되기도 합니다. 이 글은 Airflow 3 기준으로 "이렇게 쓰면 됩니다"의 표준을 정리합니다.
Airflow 3에서 DAG 작성의 출발점은 단 하나입니다. 임포트는
from airflow.sdk import .... 워커가 메타데이터 DB에 직접 붙지 않고 Task SDK로 통신하는 구조라, SDK 경로를 쓰는 것이 정석입니다.
두 가지 작성 방식: 전통 Operator vs TaskFlow API
먼저 같은 파이프라인을 두 방식으로 나란히 보겠습니다. "추출 → 변환 → 적재"라는 가장 흔한 ETL입니다.
전통적인 Operator 방식 — 태스크를 객체로 만들고, XCom으로 값을 주고받습니다.
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
import pendulum
def extract(**context):
return {"rows": [1, 2, 3, 4]}
def transform(**context):
ti = context["ti"]
data = ti.xcom_pull(task_ids="extract")
return [r * 10 for r in data["rows"]]
def load(**context):
ti = context["ti"]
rows = ti.xcom_pull(task_ids="transform")
print(f"적재 완료: {rows}")
with DAG(
dag_id="etl_classic",
start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
schedule="@daily",
catchup=False,
) as dag:
t_extract = PythonOperator(task_id="extract", python_callable=extract)
t_transform = PythonOperator(task_id="transform", python_callable=transform)
t_load = PythonOperator(task_id="load", python_callable=load)
t_extract >> t_transform >> t_loadTaskFlow API 방식 — 함수에 @task를 붙이면 그 함수가 곧 태스크입니다. 반환값을 다음 함수 인자로 넘기면 XCom 배선이 자동으로 됩니다.
from airflow.sdk import dag, task
import pendulum
@dag(
schedule="@daily",
start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
catchup=False,
)
def etl_taskflow():
@task
def extract():
return {"rows": [1, 2, 3, 4]}
@task
def transform(payload: dict):
return [r * 10 for r in payload["rows"]]
@task
def load(rows: list):
print(f"적재 완료: {rows}")
load(transform(extract()))
etl_taskflow()차이가 한눈에 보입니다. TaskFlow에서는 xcom_pull/xcom_push를 직접 부르지 않고, 함수 반환값과 인자가 그대로 의존성과 데이터 전달이 됩니다. load(transform(extract()))라는 한 줄이 곧 extract >> transform >> load 순서를 만들어 줍니다.
| 관점 | 전통 Operator | TaskFlow API |
|---|---|---|
| 의존성 표현 | >>로 수동 연결 | 함수 호출 관계로 자동 |
| 데이터 전달 | xcom_pull/push 명시 | 반환값·인자로 암묵 |
| 가독성 | 객체·태스크 ID 위주 | 평범한 파이썬 함수처럼 |
| 적합한 곳 | 기성 Operator(Sensor·Provider 등) | 순수 파이썬 로직 |
둘은 양자택일이 아닙니다. 실무 DAG는 보통 혼합합니다. 순수 파이썬 로직은
@task로, 외부 시스템을 다루는 부분은 기성 Operator로 쓰고, 그 사이를>>로 잇습니다.
Operator · Sensor · Hook의 관계
@task만으로 모든 걸 직접 짜면 외부 시스템 연동 코드를 매번 다시 쓰게 됩니다. Airflow가 제공하는 세 가지 빌딩 블록을 알면 그럴 필요가 없습니다.
- Operator: "한 가지 작업을 한 번 수행"하는 단위입니다.
PythonOperator,BashOperator, Provider가 주는S3...Operator등. - Sensor: 특별한 종류의 Operator로, "어떤 조건이 참이 될 때까지 기다리는" 일을 합니다. 파일 도착, 외부 작업 완료 같은 대기. Airflow 3에서는 deferrable 모드로 두면 Triggerer가 폴링을 대신해 워커 슬롯을 점유하지 않습니다(자세한 내용은 아키텍처 편 참고).
- Hook: 외부 시스템에 붙는 저수준 래퍼입니다. Operator·Sensor 내부에서 Hook을 쓰고, Hook은 Airflow의 Connection(접속 정보)을 읽어 실제 연결을 맺습니다.
정리하면 Hook은 "어떻게 연결하나", Operator·Sensor는 "그 연결로 무엇을 하나" 를 담당합니다. 외부 시스템 연동 자체는 8편: 외부 시스템 연동에서 깊게 다룹니다.
의존성, trigger_rule, 그리고 브랜칭
의존성은 >>(다운스트림)·<<(업스트림)으로 적습니다. 태스크가 많아지면 chain/cross_downstream 헬퍼가 읽기 편합니다.
from airflow.sdk import chain
# a >> b >> c >> d 와 동일
chain(a, b, c, d)기본적으로 태스크는 모든 부모가 성공해야 실행됩니다(all_success). 이걸 바꾸는 것이 trigger_rule입니다. 예를 들어 "앞 단계들이 성공하든 실패하든 무조건 정리 작업을 돌려라"는 all_done을 씁니다.
trigger_rule | 실행 조건 |
|---|---|
all_success (기본) | 모든 부모 성공 |
all_done | 모든 부모 종료(성공·실패 무관) — 정리 태스크에 유용 |
one_success | 부모 중 하나라도 성공 |
none_failed_min_one_success | 실패 없음 + 최소 하나 성공 — 브랜칭 합류 지점에 유용 |
브랜칭은 "조건에 따라 갈래 중 하나만 실행"하는 패턴입니다. @task.branch로 적은 함수가 실행할 다운스트림 태스크의 id를 반환하면, 나머지 갈래는 skip 됩니다.
from airflow.sdk import dag, task
import pendulum
@dag(schedule="@daily",
start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
catchup=False)
def etl_with_branch():
@task
def extract():
return {"rows": [1, 2, 3, 4]}
@task.branch
def check_quality(payload: dict):
# 행이 있으면 적재 갈래로, 비어 있으면 알림 갈래로
return "load" if payload["rows"] else "notify_empty"
@task
def load():
print("적재")
@task
def notify_empty():
print("데이터 없음 — 알림")
# 두 갈래가 합류하는 마무리 태스크: 한쪽이 skip돼도 돌도록 trigger_rule 조정
@task(trigger_rule="none_failed_min_one_success")
def finalize():
print("마무리")
data = extract()
branch = check_quality(data)
branch >> [load(), notify_empty()] >> finalize()
etl_with_branch()여기서 핵심은 **합류 지점의 trigger_rule**입니다. 브랜칭으로 한쪽이 skip되면, 합류 태스크가 기본값 all_success라면 skip된 부모 때문에 함께 skip됩니다. 그래서 none_failed_min_one_success로 바꿔야 정상 실행됩니다. 이 흐름을 그림으로 보면:
Dynamic Task Mapping: 런타임에 팬아웃하기
DAG를 쓰다 보면 "처리할 항목 개수를 미리 알 수 없는" 경우가 생깁니다. 오늘은 파일이 3개, 내일은 50개일 수 있습니다. 이때 항목 수만큼 태스크를 런타임에 자동으로 펼치는 것이 Dynamic Task Mapping입니다. .expand()를 쓰면 됩니다.
from airflow.sdk import dag, task
import pendulum
@dag(schedule="@daily",
start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
catchup=False)
def fanout_pipeline():
@task
def list_files() -> list[str]:
# 처리할 파일 목록을 런타임에 결정 (개수는 매번 달라질 수 있음)
return ["a.csv", "b.csv", "c.csv"]
@task
def process(file_name: str) -> int:
print(f"처리 중: {file_name}")
return len(file_name) # 예시: 각 파일 처리 결과(가짜 값)
@task
def summarize(results: list[int]):
# 모든 매핑된 태스크의 반환값이 리스트로 모여 들어온다
print(f"총 {len(results)}개 처리, 합계 {sum(results)}")
files = list_files()
counts = process.expand(file_name=files) # files 개수만큼 process가 펼쳐짐
summarize(counts) # 결과를 한 태스크로 집계(fan-in)
fanout_pipeline()process.expand(file_name=files) 한 줄이, 업스트림이 만든 리스트의 길이만큼 process 태스크 인스턴스를 생성합니다. 그리고 그 결과들을 summarize가 하나의 리스트로 받아 집계합니다. 1개의 입력 태스크가 N개로 펼쳐졌다가 다시 1개로 모이는 fan-out / fan-in 패턴입니다.
.expand()로 매핑되는 태스크 개수가 폭증하면 스케줄러·메타DB에 부담이 됩니다. 동시 실행은 3편의 동시성 3계층(parallelism,max_active_tasks_per_dag, Pool)으로 제어하세요.
TaskGroup으로 구조화하기 (SubDAG는 사라졌습니다)
태스크가 수십 개가 되면 그래프가 한눈에 안 들어옵니다. 관련 태스크를 묶어 시각적·논리적 그룹으로 접을 수 있는 것이 TaskGroup입니다.
from airflow.sdk import dag, task, task_group
import pendulum
@dag(schedule="@daily",
start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
catchup=False)
def grouped_pipeline():
@task
def extract():
return [1, 2, 3]
@task_group(group_id="cleanse")
def cleanse(data):
@task
def dedup(rows):
return list(set(rows))
@task
def validate(rows):
return rows
return validate(dedup(data))
@task
def load(rows):
print(f"적재: {rows}")
load(cleanse(extract()))
grouped_pipeline()UI에서는 cleanse 그룹이 하나의 박스로 접혀 보이고, 펼치면 내부 dedup·validate가 드러납니다. 그룹 안 태스크 id에는 cleanse.dedup처럼 그룹 prefix가 붙습니다.
중요: Airflow 3에서 SubDAG는 제거되었습니다. 과거 SubDAG로 묶던 패턴은 모두 TaskGroup으로 대체하세요. 더 큰 단위로 DAG를 나누고 싶다면 6편의 Asset 기반 스케줄링으로 DAG 간 의존성을 데이터 인지 방식으로 연결하는 편이 낫습니다.
피해야 할 DAG 안티패턴
DAG 파일은 DAG processor가 주기적으로 파싱합니다(Airflow 3에서 스케줄러와 분리된 독립 프로세스). 즉 파일 안의 top-level 코드는 파싱할 때마다, 모든 DAG에 대해 반복 실행된다는 점이 핵심입니다.
# 안티패턴: top-level에서 무거운 작업을 하면 매 파싱마다 실행된다
import pandas as pd
df = pd.read_parquet("s3://big-bucket/huge.parquet") # ❌ 파싱 때마다 다운로드
# 정석: 무거운 작업은 반드시 태스크 함수 "안"으로
@task
def load_data():
import pandas as pd
return pd.read_parquet("s3://big-bucket/huge.parquet") # ✅ 실행될 때만자주 보는 함정들:
- top-level의 무거운 코드/import: DB 조회, 파일 다운로드, 무거운 라이브러리 import를 DAG 파일 최상단에 두지 마세요. 파싱이 느려지고 스케줄러가 밀립니다. 무거운 import는 태스크 함수 안으로 넣습니다.
- 비결정적 DAG:
datetime.now()나 난수로 DAG 구조(태스크 개수·id) 를 매번 다르게 만들지 마세요. 같은 파일은 항상 같은 그래프를 만들어야 추적·재실행이 안정적입니다. 시간 기반 로직이 필요하면 구조가 아니라 태스크 실행 시점에logical_date나data_interval_start/end를 쓰세요(Airflow 3에서execution_date는 제거됐습니다). - 거대 XCom: 태스크 반환값(XCom)에 큰 DataFrame이나 수십 MB 페이로드를 그대로 담지 마세요. XCom은 메타데이터 DB에 저장되므로 DB가 비대해집니다. 큰 데이터는 외부 스토리지에 두고 경로·키만 XCom으로 넘기는 것이 정석입니다. 이 주제는 7편: XCom & 데이터 전달에서 본격적으로 다룹니다.
이 세 가지만 피해도 "왜 스케줄러가 느리지", "왜 매번 그래프가 달라지지", "왜 메타DB가 터지지" 같은 운영 사고의 상당수를 예방할 수 있습니다.
정리
- Airflow 3 DAG의 출발점은
from airflow.sdk import dag, task. 순수 파이썬은 TaskFlow API로, 외부 연동은 기성 Operator로 쓰고 둘을 섞습니다. - Operator/Sensor가 "무엇을 하나", Hook이 "어떻게 연결하나"를 담당합니다.
- 분기는
@task.branch, 합류 지점은trigger_rule="none_failed_min_one_success"로. 항목 수가 가변이면.expand()로 런타임 팬아웃. - 구조화는 TaskGroup(SubDAG는 제거됨). top-level 무거운 코드·비결정적 구조·거대 XCom은 피합니다.
다음 편 5편: DAG 고급 테크닉에서는 스크립트 실행·파라미터 전달·에러 처리·다른 DAG 호출·PostgreSQL 쿼리 실행·실패 재실행·날짜 기준 같은 한 단계 위의 실전 기술을 다룹니다. 공식 문서는 Airflow Authoring DAGs를 참고하세요.