Airflow 3 ⑧ 외부 시스템 연동 & 싱크 호출
Connections·Secrets Backend로 자격증명을 안전하게 관리하고, HttpOperator 동기 호출의 재시도·타임아웃·멱등성, Sensor의 poke vs reschedule, 그리고 Deferrable Operator로 외부 작업 완료를 자원 낭비 없이 기다리는 법까지 실전 패턴으로 정리합니다.
파이프라인은 혼자 돌지 않습니다. 결제 게이트웨이에 정산을 요청하고, 사내 ML 서비스에 추론 잡을 던지고, S3에 파일이 도착하기를 기다립니다. Airflow의 태스크 대부분은 결국 "바깥 세상을 호출하고 그 결과를 기다리는" 일입니다. 이 글은 그 "호출하고 기다리는" 패턴을 안전하고 자원 효율적으로 만드는 방법을 다룹니다.
이 글에서 배우는 것
- Connections·Variables·Secrets Backend(Vault, AWS Secrets Manager)로 자격증명을 코드 밖에서 관리하는 법
HttpOperator/HttpHook로 외부 REST를 동기 호출할 때의 재시도·타임아웃·멱등성 함정- Sensor로 외부 상태를 기다릴 때 poke vs reschedule 모드의 차이
- Deferrable Operator + Triggerer로 워커 슬롯을 점유하지 않고 긴 외부 작업을 기다리는 법
- "외부 잡 제출 → 상태 폴링 → 완료" 패턴을 한 DAG로 엮는 법
이 글은 「Airflow 3 실전」 연재의 8편입니다. 직전 ⑦ XCom & 데이터 전달에서 태스크 사이의 작은 데이터 교환을 다뤘다면, 이번엔 그 데이터를 Airflow 바깥으로 보내고 받아옵니다. 다음 ⑨ REST API & 원격 스케줄 변경에서는 반대로 Airflow 자신을 외부에서 호출하는 쪽을 다룹니다.
1. "싱크(동기) 호출"이란 무엇인가
용어부터 맞추고 시작합니다. 이 글에서 싱크(동기) 호출은 외부 시스템을 호출한 뒤 그 결과 또는 완료를 기다리는 패턴을 통칭합니다. 의도에 따라 두 갈래로 나뉩니다.
- (a) 외부 API 동기 호출 — REST 엔드포인트를 때리고 그 요청-응답 한 번 안에서 결과를 받는다. 보통 수 초 안에 끝난다. (예: 환율 조회, 정산 요청)
- (b) 외부 작업 완료 대기 — 외부에 잡을 제출하면 즉시 끝나지 않고, 상태가 완료로 바뀔 때까지 폴링하며 기다린다. 수 분~수 시간 걸린다. (예: 빅쿼리 잡, 스파크 클러스터, 외부 ETL)
(a)는 Operator/Hook으로, (b)는 Sensor 또는 Deferrable Operator로 풉니다. 어느 쪽이든 공통 전제가 하나 있습니다 — 자격증명을 안전하게 다뤄야 한다는 것. 거기서 출발합니다.
2. 자격증명: Connection / Variable / Secrets Backend
외부 호출에는 호스트·토큰·키가 필요합니다. Airflow는 이를 세 가지로 관리합니다.
| 개념 | 용도 | 예시 |
|---|---|---|
| Connection | 외부 시스템 접속 정보(호스트·포트·로그인·비밀번호·extra JSON) | payment_api, s3_default |
| Variable | 런타임 설정값(상수·플래그). 비밀이 아닌 값 | batch_size, target_env |
| Secrets Backend | Connection/Variable을 외부 비밀 저장소에서 조회 | Vault, AWS Secrets Manager |
코드에서는 Conn ID만 참조하고, 실제 값은 환경 밖에 둡니다. 메타데이터 DB에 비밀번호를 평문으로 박아두지 않는 것이 핵심입니다.
from airflow.sdk import dag, task
from airflow.providers.http.hooks.http import HttpHook
@task
def call_external():
# Conn ID만 코드에 두고, 실제 호스트/토큰은 Connection에 저장
hook = HttpHook(method="POST", http_conn_id="payment_api")
resp = hook.run(endpoint="/v1/settlements", json={"order_id": 42})
return resp.json()기본적으로 Airflow는 환경변수 → Secrets Backend → 메타데이터 DB 순서로 Connection/Variable을 찾습니다. Secrets Backend를 붙이면, Conn ID를 조회할 때 Airflow가 Vault나 Secrets Manager에 먼저 물어봅니다.
Secrets Backend는 airflow.cfg의 [secrets] 섹션 또는 환경변수로 지정합니다. 아래는 AWS Secrets Manager 예시입니다.
# airflow.cfg
[secrets]
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"}이렇게 두면 payment_api라는 Conn ID는 airflow/connections/payment_api 시크릿에서 조회됩니다. 회전(rotation)·감사(audit)는 비밀 저장소가 담당하고, Airflow는 사용 시점에만 읽어옵니다.
자격증명은 DAG 코드에도, 메타DB 평문에도 두지 않는다. 코드엔 Conn ID만, 값은 Secrets Backend에.
3. 동기 REST 호출: 재시도 · 타임아웃 · 멱등성
이제 (a) 패턴입니다. 외부 REST를 한 번 때리고 응답을 받는 일은 단순해 보이지만, 분산 환경의 세 가지 함정을 항상 깔고 갑니다.
타임아웃 — 무한 대기를 막는다
HttpHook/HttpOperator는 timeout 인자를 받습니다. 외부가 응답하지 않을 때 워커가 영원히 매달리는 일을 막습니다. 타임아웃을 안 걸면 워커 슬롯 하나가 좀비처럼 잠깁니다.
재시도 — 일시 장애를 흡수한다
네트워크는 가끔 끊깁니다. Airflow 태스크 레벨의 retries/retry_delay로 일시적 실패를 재시도합니다. HTTP 5xx·타임아웃 같은 재시도해도 되는 실패에만 거는 게 원칙입니다(4xx는 보통 재시도 무의미).
from datetime import timedelta
from airflow.providers.http.operators.http import HttpOperator
submit = HttpOperator(
task_id="submit_settlement",
http_conn_id="payment_api",
method="POST",
endpoint="/v1/settlements",
headers={"Idempotency-Key": "{{ run_id }}"}, # 멱등성 키
data='{"order_id": 42}',
# 동기 호출의 안전벨트 3종
retries=3,
retry_delay=timedelta(seconds=30),
retry_exponential_backoff=True,
)멱등성 — 재시도의 대가를 막는다
여기가 가장 자주 사고 나는 지점입니다. 위 시퀀스 다이어그램처럼, 요청은 도달했는데 응답만 유실되면 Airflow는 실패로 보고 재시도합니다. 멱등성을 보장하지 않으면 결제가 두 번 일어납니다.
해법은 요청마다 멱등성 키(Idempotency-Key)를 부여하고, 재시도 시 같은 키를 보내는 것입니다. 위 예시처럼 {{ run_id }}(같은 태스크 실행 내내 고정값)를 키로 쓰면, 외부 서버가 중복을 걸러냅니다. 외부 API가 멱등성 키를 지원하지 않으면, 호출 전에 "이미 처리됐는지" 조회하는 단계를 둬야 합니다.
재시도는 멱등성과 세트로만 안전하다. 둘 중 하나만 있으면 중복 부작용이 난다.
4. 외부 상태 기다리기 ①: Sensor의 poke vs reschedule
이제 (b) 패턴입니다. "S3에 파일이 도착할 때까지", "외부 DB에 행이 생길 때까지" 기다려야 한다면 Sensor를 씁니다. Sensor는 조건이 참이 될 때까지 주기적으로 확인(poke)합니다. 문제는 기다리는 동안 무엇을 점유하느냐입니다.
mode="poke"(기본) — 워커 슬롯을 잡은 채poke_interval마다 확인. 구현은 단순하지만, 몇 시간을 기다리면 그 시간 내내 슬롯 하나가 묶인다.mode="reschedule"— 확인 후 조건이 거짓이면 태스크를 풀어주고(슬롯 반납) 다음 확인 시각에 다시 스케줄. 슬롯을 비우므로 긴 대기에 유리. 단, 매번 태스크를 재기동하는 오버헤드가 있어 poke 간격이 짧으면(수 초) 부적합.
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
wait_file = S3KeySensor(
task_id="wait_for_input",
bucket_key="incoming/{{ ds }}/data.parquet",
bucket_name="my-bucket",
aws_conn_id="s3_default",
mode="reschedule", # 긴 대기는 슬롯을 반납
poke_interval=300, # 5분마다 확인
timeout=60 * 60 * 6, # 6시간 안에 안 오면 실패
)경험칙: poke 간격이 분 단위 이상이면 reschedule, 초 단위로 자주 확인해야 하면 poke. 그리고 어떤 모드든 timeout을 반드시 건다 — 외부가 영영 안 오는 경우를 대비해서입니다.
5. 외부 상태 기다리기 ②: Deferrable Operator + Triggerer
reschedule도 슬롯을 비우지만, 확인하는 순간에는 여전히 워커를 깨워야 하고 매번 재기동 비용이 듭니다. Airflow 3가 권장하는 더 나은 방식은 Deferrable Operator입니다.
핵심 아이디어: 태스크가 "기다림"에 들어가면 **자신을 defer(연기)**하고 워커에서 완전히 빠집니다. 폴링은 Triggerer라는 별도 컴포넌트가 asyncio 이벤트 루프로 수행합니다. Triggerer 하나가 수천 개의 대기를 동시에 감시할 수 있어, 워커 슬롯을 전혀 쓰지 않습니다. 조건이 충족되면 태스크가 다시 워커로 깨어나 다음 단계를 잇습니다.
poke 센서와 deferrable의 자원 차이를 한눈에 비교하면 이렇습니다.
대기가 길고 동시에 많은 외부 잡을 기다린다면 deferrable이 압도적으로 유리합니다. 많은 프로바이더 오퍼레이터/센서가 deferrable=True 인자를 지원합니다.
from airflow.providers.http.sensors.http import HttpSensor
wait_job = HttpSensor(
task_id="wait_external_job",
http_conn_id="batch_api",
endpoint="/v1/jobs/{{ ti.xcom_pull(task_ids='submit_job') }}/status",
response_check=lambda r: r.json().get("state") == "SUCCEEDED",
deferrable=True, # ← Triggerer로 비동기 대기 (워커 슬롯 반납)
poke_interval=60,
timeout=60 * 60 * 4,
)Deferrable은 마법이 아니다. Triggerer 프로세스가 떠 있어야 동작한다(클러스터 구성은 ② 클러스터로 구성하기 참고).
6. 종합: "제출 → 폴링 → 완료" 패턴 한 DAG로 엮기
(a)와 (b)를 합치면 가장 흔한 실전 패턴이 됩니다 — 외부에 잡을 제출하고(동기 호출), 완료까지 기다린 뒤(deferrable 대기), 결과를 수확합니다.
from airflow.sdk import dag, task
from datetime import datetime, timedelta
from airflow.providers.http.operators.http import HttpOperator
from airflow.providers.http.sensors.http import HttpSensor
@dag(schedule="@daily", start_date=datetime(2026, 7, 1), catchup=False)
def external_job_flow():
submit = HttpOperator(
task_id="submit_job",
http_conn_id="batch_api",
method="POST",
endpoint="/v1/jobs",
headers={"Idempotency-Key": "{{ run_id }}"},
data='{"date": "{{ ds }}"}',
response_filter=lambda r: r.json()["job_id"], # XCom으로 job_id 전달
retries=3,
retry_delay=timedelta(seconds=30),
)
wait = HttpSensor(
task_id="wait_job",
http_conn_id="batch_api",
endpoint="/v1/jobs/{{ ti.xcom_pull(task_ids='submit_job') }}/status",
response_check=lambda r: r.json().get("state") == "SUCCEEDED",
deferrable=True,
poke_interval=60,
timeout=60 * 60 * 4,
)
@task
def fetch_result(job_id: str):
# 완료된 잡의 산출물을 가져와 다운스트림으로
return {"job_id": job_id, "ok": True}
submit >> wait >> fetch_result(submit.output)
external_job_flow()이 한 DAG 안에 이 글의 모든 원칙이 들어 있습니다: Conn ID만 코드에 두기, 제출에 멱등성 키·재시도 걸기, 긴 대기는 deferrable로 슬롯 반납하기, 그리고 모든 대기에 timeout 걸기.
7. 정리 체크리스트
외부 시스템을 붙이기 전에 이 목록을 훑어보세요.
- 자격증명을 코드/메타DB 평문이 아니라 Connection + Secrets Backend에 둔다
- 동기 호출에 **
timeout**을 건다(무한 대기 방지) - **
retries/retry_delay**는 재시도 가능한 실패(5xx·타임아웃)에만 - 부작용이 있는 호출(결제·생성)에는 멱등성 키를 반드시 함께
- 짧은 대기는
poke, 긴 대기는reschedule또는 deferrable - 긴 대기·다수 동시 대기는 deferrable=True (Triggerer 가동 확인)
- 모든 Sensor/대기에 **
timeout**을 걸어 영구 대기 방지
다음 편 ⑨ REST API & 원격 스케줄 변경에서는 이번 글의 반대 방향 — 외부에서 Airflow를 호출해 DAG를 트리거하고 스케줄을 바꾸는 법을 다룹니다.
외부 연동의 본질은 "호출하고 기다리기"다. 기다리는 동안 무엇을 점유하느냐가 클러스터 자원 효율을 가른다.