Blog
airflowschedulingassettimetabledata-pipeline

Airflow 3 스케줄링 & Asset — 데이터 인지 파이프라인

cron·Timetable·Asset 기반 스케줄링을 비교하고, 생산자-소비자 Asset으로 DAG를 연결하며, logical_date·catchup·시간대 함정까지 실전 관점에서 풀어봅니다.

Data Dynamics2026년 6월 30일17 min read

DAG를 잘 짜는 것과 "언제 돌릴지"를 잘 정하는 것은 완전히 다른 기술입니다. 많은 팀이 @daily로 시작했다가, "앞 DAG가 끝나면 바로 뒤 DAG를 돌리고 싶은데 시간이 안 맞는다"는 벽에 부딪힙니다. 새벽 2시에 데이터가 들어올 줄 알고 3시에 집계 DAG를 걸어뒀는데, 데이터가 4시에 늦게 도착하면 빈 테이블을 집계하는 식이죠.

Airflow 3는 이 문제에 대한 명확한 답을 내놓았습니다. 시간이 아니라 데이터를 기준으로 스케줄을 거는 것입니다.

이 글은 Airflow 3 실전 연재의 6편입니다. 직전 편 5편: DAG 고급 테크닉에서 파라미터·에러 처리·재실행 같은 DAG 실전 기술을 다뤘다면, 이번 편은 그 DAG를 언제, 무엇을 기준으로 실행할지를 다룹니다. 다음 편은 XCom & 데이터 전달로 이어집니다.

이 글에서 배우는 것

  • cron 표현식 · Timetable · Asset 기반 스케줄링의 차이와 선택 기준
  • Asset(과거 Dataset)으로 생산자-소비자 DAG를 자동 연결하는 법
  • logical_date·data_interval_start/end의 정확한 의미 (execution_date는 사라졌습니다)
  • catchup 기본값 변경과 스케줄러가 관리하는 backfill
  • 시간대(timezone) 때문에 새벽에 깨지 않는 법

1. 스케줄을 거는 세 가지 방식

Airflow 3에서 DAG의 schedule 인자에 줄 수 있는 값은 크게 세 갈래입니다. 각각이 어떤 질문에 답하는지 먼저 감을 잡아 두면 선택이 쉬워집니다.

방식거는 값답하는 질문
cron / 프리셋"0 2 * * *", @daily, timedelta(hours=1)"언제 돌릴까?" (시계 기준)
TimetableCronTriggerTimetable, 커스텀 Timetable 객체"시계 기준인데 규칙이 복잡하다"
Asset 기반schedule=[my_asset]"무엇이 갱신되면 돌릴까?" (데이터 기준)

cron은 가장 익숙하지만, "월~금 오전 9시, 단 공휴일 제외" 같은 규칙은 cron 한 줄로 표현하기 어렵습니다. 이럴 때 Timetable이 시계 기반 스케줄의 유연한 상위 도구가 됩니다. 그리고 시계가 아니라 데이터의 도착을 트리거로 삼고 싶을 때 Asset이 등장합니다.

from datetime import timedelta
from airflow.sdk import dag, task
from airflow.timetables.trigger import CronTriggerTimetable
 
# 1) cron 문자열 — 가장 단순
@dag(schedule="0 2 * * *", catchup=False)
def daily_cron():
    ...
 
# 2) 프리셋 / 인터벌
@dag(schedule="@daily", catchup=False)
def daily_preset():
    ...
 
# 3) Timetable — cron보다 정밀한 제어 (예: 실행 시각과 데이터 구간 분리)
@dag(
    schedule=CronTriggerTimetable("0 2 * * *", timezone="Asia/Seoul"),
    catchup=False,
)
def with_timetable():
    ...

airflow.sdk에서 임포트하는 점에 주목하세요. Airflow 3에서는 DAG 작성 API가 Task SDK(airflow.sdk)로 정리되었습니다.

시계로 충분하면 cron, 규칙이 복잡하면 Timetable, 데이터가 기준이면 Asset.

다음 다이어그램은 시간 기반과 Asset 기반이 어떻게 갈라지는지 보여줍니다.

Loading diagram…

2. Asset — 데이터 인지 스케줄링의 핵심

Airflow 3의 Asset은 2.x의 Dataset을 이름과 기능 모두 확장한 개념입니다(이제 "Dataset"이라는 용어는 쓰지 않습니다). Asset은 파이프라인이 만들어 내는 데이터의 논리적 단위 — 테이블, S3 경로, 파일 등 — 를 가리키는 이름표입니다.

핵심 아이디어는 단순합니다. 어떤 DAG가 특정 Asset을 "생산"하고, 다른 DAG가 그 Asset을 "소비"하면, 생산이 끝나는 순간 소비 DAG가 자동으로 트리거됩니다. 시간을 맞출 필요가 없습니다. 데이터가 준비되면 다음 단계가 돌아갑니다.

from airflow.sdk import dag, task, Asset
 
# 데이터의 논리적 단위를 Asset으로 선언
sales_raw = Asset("s3://warehouse/sales/raw")
 
# (A) 생산자 DAG: outlets에 Asset을 적으면, 성공 시 이 Asset을 "갱신"으로 표시
@dag(schedule="0 1 * * *", catchup=False)
def ingest_sales():
    @task(outlets=[sales_raw])
    def load():
        # ... 원천에서 적재 ...
        return "done"
    load()
 
ingest_sales()
 
# (B) 소비자 DAG: schedule에 Asset을 넣으면, 그 Asset이 갱신될 때마다 실행
@dag(schedule=[sales_raw], catchup=False)
def aggregate_sales():
    @task
    def aggregate():
        # ... sales_raw가 막 갱신됐으니 안심하고 집계 ...
        ...
    aggregate()
 
aggregate_sales()

여기서 aggregate_sales에는 cron이 전혀 없습니다. ingest_salesload 태스크가 성공해 sales_raw를 갱신할 때만 깨어납니다. 데이터가 늦게 도착하면 소비 DAG도 늦게 돌고, 데이터가 안 오면 소비 DAG도 안 돕니다. "빈 테이블을 집계하는" 사고가 구조적으로 사라지는 셈입니다.

함수 자체를 Asset의 생산자로 선언하는 더 간결한 @asset 데코레이터도 있습니다.

from airflow.sdk import asset
 
# 이 함수가 곧 하나의 Asset을 생산하는 DAG가 된다
@asset(schedule="@daily")
def sales_curated():
    # 반환/사이드이펙트가 sales_curated Asset의 갱신을 의미
    ...

Asset 기반으로 묶으면, 여러 DAG가 사슬처럼 자동으로 이어집니다. 아래는 생산자 DAG가 Asset을 갱신하고, 스케줄러가 그 신호를 받아 소비자 DAG를 깨우는 흐름입니다.

Loading diagram…

여러 Asset을 동시에 구독할 수도 있습니다. schedule=[asset_a, asset_b]로 적으면 기본적으로 둘 다 갱신되어야 소비 DAG가 트리거되며, 더 복잡한 조건은 Asset 논리식(예: asset_a & asset_b, asset_a | asset_b)으로 표현합니다.

3. logical_date와 data_interval — execution_date는 사라졌다

Airflow 2.x를 써 본 분이라면 execution_date라는 헷갈리는 이름에 데어 본 적이 있을 겁니다. "실행 날짜"라는데 실제 실행 시각이 아니라 데이터 구간의 시작점을 가리켜서 모두를 혼란에 빠뜨렸죠. Airflow 3는 execution_date를 제거했습니다. 대신 의미가 명확한 이름을 씁니다.

변수의미
logical_date이 실행이 논리적으로 대표하는 시점. cron 스케줄에서는 데이터 구간의 기준점
data_interval_start이번 실행이 처리하는 데이터 구간의 시작
data_interval_end데이터 구간의 끝 (보통 실제 트리거가 일어나는 시각에 가까움)
from airflow.sdk import dag, task
 
@dag(schedule="0 2 * * *", catchup=False)
def windowed():
    @task
    def process(**context):
        start = context["data_interval_start"]
        end = context["data_interval_end"]
        # "이 구간의 데이터만" 처리 — 멱등성의 기반
        print(f"처리 구간: {start} ~ {end}")
    process()
 
windowed()

여기서 가장 중요한 변화 하나. Asset 트리거나 수동 트리거로 실행되면 logical_dateNone일 수 있습니다. 데이터가 갱신돼서 도는 실행에는 "이 실행이 대표하는 시계상의 시점"이라는 개념 자체가 모호하기 때문입니다. 그러니 코드에서 logical_date가 항상 존재한다고 가정하면 안 됩니다.

시계 기반 구간 처리는 data_interval_start/end로, "이 실행의 대표 시각"이 꼭 필요할 때만 logical_date를 — 그것도 None 가능성을 염두에 두고 — 사용하세요.

데이터 구간을 기준으로 처리하도록 짜 두면, 같은 구간을 몇 번 다시 돌려도 결과가 같은 멱등(idempotent) 파이프라인이 됩니다. backfill과 재실행이 안전해지는 토대입니다.

4. catchup과 backfill — 3.x에서 달라진 기본값

Airflow 2.x에서 신규 사용자를 가장 자주 놀라게 한 동작이 catchup이었습니다. start_date를 과거로 잡고 DAG를 켜면, 그 사이의 모든 인터벌에 대해 실행이 우르르 생성돼 스케줄러가 폭주하곤 했죠.

Airflow 3에서는 catchup의 기본값이 False로 바뀌었습니다. 즉 별도 설정 없이 DAG를 켜면, 과거를 메우지 않고 다음 스케줄부터 돕니다. 이게 대부분의 팀이 실제로 원하는 동작입니다.

@dag(
    schedule="0 2 * * *",
    start_date=datetime(2025, 1, 1),  # 과거여도
    catchup=False,                     # 3.x 기본값 — 과거를 메우지 않음
)
def no_backlog():
    ...

그렇다면 과거 구간을 의도적으로 다시 돌리고 싶을 때(backfill)는 어떻게 할까요? 여기도 큰 변화가 있습니다. 2.x에서는 CLI(airflow dags backfill)가 별도 프로세스로 실행을 만들었지만, Airflow 3에서는 backfill을 스케줄러가 관리합니다. UI나 REST API로 "이 기간을 backfill해줘"라고 요청하면, 일반 스케줄과 동일한 경로로 스케줄러가 해당 구간의 run을 생성·관리합니다. 그래서 backfill 실행도 UI에서 일반 run과 똑같이 추적·재시도할 수 있습니다.

Loading diagram…

일상 운영은 catchup=False로 두고, 과거 재처리가 필요하면 그때 UI/API로 backfill을 요청하세요. "켜자마자 과거가 우르르" 사고를 막는 안전한 기본값입니다.

5. 시간대(timezone) 함정

스케줄링에서 새벽에 호출받게 만드는 단골 원인이 시간대입니다. 몇 가지만 기억하면 됩니다.

  • Airflow는 내부적으로 UTC를 기준으로 시각을 저장하고 다룹니다. UI 표시나 cron 해석에서 로컬 시간대를 다룰 수는 있지만, 저장은 UTC입니다.
  • cron 문자열만으로는 "어느 시간대 기준 오전 2시인가"가 모호합니다. 시간대를 명시하려면 CronTriggerTimetabletimezone을 지정하는 것이 안전합니다.
from airflow.timetables.trigger import CronTriggerTimetable
 
# "서울 기준" 매일 오전 2시로 명확히 못 박는다
@dag(
    schedule=CronTriggerTimetable("0 2 * * *", timezone="Asia/Seoul"),
    catchup=False,
)
def seoul_daily():
    ...
  • **DST(서머타임)**가 있는 시간대(예: 미국/유럽)에서는 cron 스케줄이 시계 변경일에 한 번 건너뛰거나 두 번 도는 미묘한 동작을 합니다. 한국(Asia/Seoul)은 DST가 없어 이 문제에서 비교적 자유롭지만, 글로벌 데이터를 다룬다면 원천 시스템의 시간대와 Airflow 스케줄 시간대를 명시적으로 일치시켜 두는 것이 안전합니다.
  • 태스크 코드 안에서는 datetime.now() 같은 naive datetime 대신, 컨텍스트가 주는 data_interval_start/end(시간대 정보가 붙은 aware datetime)를 기준으로 삼으세요.

6. 무엇을 언제 쓸까 — 정리

마지막으로 실무 선택 기준을 정리합니다.

상황권장
매일/매시 정해진 시각, 외부 의존 없음cron 문자열 (@daily, 0 2 * * *)
시계 기반인데 규칙이 복잡(공휴일·영업일)CronTriggerTimetable 또는 커스텀 Timetable
앞 단계 데이터가 준비되면 뒤 단계 실행Asset (schedule=[asset])
여러 원천이 모두 갱신돼야 진행다중 Asset 구독 / Asset 논리식
외부 이벤트로만 돌고 자동 스케줄 없음schedule=None + 수동/API 트리거

데이터 파이프라인의 정합성은 결국 "필요한 데이터가 준비된 뒤에만 다음을 돈다"는 보장에서 나옵니다. Airflow 3의 Asset은 이 보장을 cron 시각 추측이 아니라 데이터 이벤트로 만들어 줍니다. 시간을 맞추느라 새벽에 깨던 운영에서, 데이터가 흐르면 파이프라인이 스스로 따라 흐르는 운영으로 넘어가는 것이 이번 편의 핵심입니다.

다음 편 XCom & 데이터 전달에서는 이렇게 연결된 태스크들 사이에서 실제 데이터를 어떻게 주고받는지를 다룹니다. 공식 문서는 Airflow Authoring & Scheduling을 참고하세요.