Blog
airflowxcomtaskflowdata-pipeline

Airflow 3 XCom — 태스크 간 데이터 전달의 정석

XCom의 동작 원리부터 TaskFlow 자동 XCom, Custom XCom Backend로 대용량 페이로드를 우회하는 법까지 실전 중심으로 정리합니다.

Data Dynamics2026年7月1日16 min read
This post is not yet translated. The original Korean version is shown below.

이 글은 "Airflow 3 실전 연재"의 7편입니다. 직전 편 스케줄링 & Asset에서는 언제 DAG를 돌릴지를 다뤘다면, 이번에는 DAG 안에서 태스크들이 서로 데이터를 주고받는 방법을 이야기합니다. 다음 편 외부 시스템 연동 & 싱크 호출로 자연스럽게 이어집니다.

태스크는 서로 독립된 프로세스(때로는 독립된 머신)에서 실행됩니다. 그래서 태스크 A가 계산한 값을 태스크 B가 그냥 변수처럼 참조할 수 없습니다. 같은 파이썬 함수 안이라면 return으로 끝날 일이, 태스크 경계를 넘는 순간 "어떻게 전달하지?"라는 문제가 됩니다. 이 다리를 놓아주는 게 바로 XCom(cross-communication) 입니다.

이 글에서 배우는 것

  • XCom이 메타데이터 DB에 key-value로 저장되는 원리와, 그래서 생기는 한계
  • xcom_push/xcom_pull과 TaskFlow API 반환값이 자동으로 XCom이 되는 흐름
  • Custom XCom Backend(S3/GCS 등)로 대용량 페이로드를 우회하는 법
  • Airflow 3에서 XCom이 Task Execution API를 거쳐 처리되는 변화
  • 무엇을 XCom으로 넘기면 안 되는가, 그리고 Asset과의 차이

1. XCom은 결국 메타DB의 작은 메모지

XCom의 실체는 의외로 단순합니다. 메타데이터 DB의 한 테이블(xcom)에 저장되는 key-value 레코드입니다. 하나의 XCom 항목은 대략 이런 좌표로 식별됩니다.

  • dag_id — 어느 DAG인가
  • run_id — 어느 실행(run)인가
  • task_id — 어느 태스크가 남겼나
  • key — 메모의 이름(기본값은 return_value)
  • value — 직렬화된 값

즉 XCom은 "이 실행, 이 태스크가 남긴 메모"를 데이터베이스에 적어두고, 다른 태스크가 같은 좌표로 다시 꺼내 읽는 구조입니다. 태스크 A가 메모를 붙이고(push), 태스크 B가 그 메모를 떼어 읽는(pull) 흐름을 그림으로 보면 이렇습니다.

Loading diagram…

여기서 중요한 두 가지 성질이 드러납니다.

첫째, XCom은 같은 DAG run 범위 안에서만 자연스럽게 통합니다. 다른 실행의 메모를 끌어오려면 명시적으로 좌표를 지정해야 하고, 일반적인 사용 패턴은 아닙니다.

둘째, 값이 DB에 그대로 들어간다는 점이 모든 한계의 출발점입니다. 메타DB는 스케줄러·DAG processor·웹 UI가 끊임없이 두드리는, 클러스터에서 가장 바쁜 자원입니다(메타DB의 역할은 아키텍처 편 참고). 그 테이블에 큰 값을 자주 쓰면 DB 부하와 크기가 빠르게 불어나고, 결국 클러스터 전체가 느려집니다.

XCom은 "메모지"이지 "택배 상자"가 아닙니다. 작은 값(파일 경로, ID, 카운트, 설정 플래그)을 넘기는 용도로 설계됐습니다.

2. push와 pull — 그리고 TaskFlow의 자동 XCom

전통적인 방식은 컨텍스트의 ti(task instance)를 통해 직접 push/pull 하는 것입니다. Airflow 3에서는 임포트 경로가 airflow.sdk로 정리됐습니다.

from airflow.sdk import dag, task
import pendulum
 
 
@dag(schedule=None, start_date=pendulum.datetime(2026, 1, 1), catchup=False)
def manual_xcom():
    @task
    def extract(**context):
        # 명시적으로 key를 지정해 push
        context["ti"].xcom_push(key="row_count", value=1240)
        return "/data/raw/2026-06-30.parquet"  # return_value 로도 자동 push
 
    @task
    def report(**context):
        ti = context["ti"]
        count = ti.xcom_pull(task_ids="extract", key="row_count")
        path = ti.xcom_pull(task_ids="extract")  # key 생략 시 return_value
        print(f"{count} rows at {path}")
 
    extract() >> report()
 
 
manual_xcom()

xcom_push로 임의의 key에 값을 남기고, xcom_pulltask_idskey를 지정해 꺼냅니다. key를 생략하면 기본 key인 return_value를 읽습니다.

그런데 실무에서 대부분의 데이터 전달은 이렇게 손으로 push/pull 하지 않습니다. TaskFlow API를 쓰면 함수의 반환값이 자동으로 XCom으로 push되고, 다른 태스크 함수의 인자로 그 결과를 넘기면 자동으로 pull됩니다. 코드가 평범한 파이썬 함수 호출처럼 읽혀서 직관적입니다.

from airflow.sdk import dag, task
import pendulum
 
 
@dag(schedule=None, start_date=pendulum.datetime(2026, 1, 1), catchup=False)
def taskflow_xcom():
    @task
    def extract() -> str:
        # 반환값이 자동으로 XCom(return_value)에 push 된다
        return "/data/raw/2026-06-30.parquet"
 
    @task
    def transform(path: str) -> dict:
        # 인자로 받은 값은 자동으로 XCom pull 된 결과
        return {"path": path, "rows": 1240}
 
    @task
    def load(summary: dict) -> None:
        print(f"loading {summary['rows']} rows from {summary['path']}")
 
    raw = extract()
    summary = transform(raw)
    load(summary)
 
 
taskflow_xcom()

extract()가 돌려준 값은 자동으로 XCom에 저장되고, transform(raw)에 넘기는 순간 Airflow가 의존성을 만들고 실행 시점에 값을 꺼내 채워 넣습니다. 명시적인 >> 연결도, xcom_pull 호출도 필요 없습니다. 다만 눈에 안 보일 뿐, 그 값은 여전히 메타DB를 거칩니다. 자동 XCom이라고 해서 한계가 사라지지는 않는다는 점을 기억하세요.

3. Airflow 3에서 달라진 점 — 워커는 메타DB를 직접 만지지 않는다

Airflow 2.x 시절 워커는 XCom을 읽고 쓰기 위해 메타DB에 직접 SQL을 날렸습니다. 워커 수가 수백 개로 늘면 DB 커넥션 폭주와 자격증명 노출이 골칫거리였죠.

Airflow 3에서는 이 구조가 바뀌었습니다. 워커(태스크)는 더 이상 메타DB에 직접 접속하지 않고, API server의 Task Execution API를 통해 XCom을 push/pull 합니다. 즉 xcom_push/xcom_pull이 내부적으로 API 호출이 되고, 실제 DB 쓰기는 API server가 대행합니다.

Loading diagram…

이 덕분에 원격·엣지 워커나 비(非)파이썬 태스크에서도 동일한 방식으로 XCom을 다룰 수 있습니다(Task SDK / EdgeExecutor 배경은 아키텍처 편 참고). 작성자 입장에서 코드는 그대로지만, 데이터 경로가 한 단계 추상화됐다는 점만 알아두면 충분합니다.

4. 무엇을 XCom으로 넘기면 안 되는가

XCom 사고의 8할은 "여기에 너무 큰 걸 넣었다"에서 나옵니다. 다음은 넘기면 안 되는 것과, 대신 넘겨야 할 것입니다.

넘기지 말 것대신 넘길 것
큰 DataFrame, 전체 데이터셋메타DB 비대화, 직렬화 비용 폭증저장 위치(예: s3://.../part-0001.parquet)
이미지·파일 바이트DB는 바이너리 보관소가 아님오브젝트 스토리지 경로
수천 건의 레코드 리스트한 XCom 항목이 수 MB로테이블/파티션 식별자
비밀번호·토큰DB·UI·로그에 노출Connection/Variable, 시크릿 매니저

핵심 원칙은 하나입니다.

데이터 자체가 아니라 "데이터를 가리키는 포인터"를 넘긴다. 태스크 A는 큰 결과를 스토리지에 쓰고 그 경로만 XCom으로 전달하고, 태스크 B는 그 경로를 받아 직접 읽는다.

@task
def extract() -> str:
    df = fetch_large_dataset()          # 큰 데이터
    path = "s3://warehouse/staging/2026-06-30/data.parquet"
    df.to_parquet(path)                 # 스토리지에 쓰고
    return path                          # XCom 으로는 경로만 (포인터)
 
 
@task
def transform(path: str) -> str:
    df = read_parquet(path)             # 경로를 받아 직접 읽고
    out = "s3://warehouse/curated/2026-06-30/data.parquet"
    process(df).to_parquet(out)
    return out

XCom과 Asset은 무엇이 다른가

이쯤에서 Asset이 떠오를 수 있습니다. "데이터 위치를 가리킨다"는 점은 비슷하지만 목적이 다릅니다.

  • XCom: 같은 DAG run 안에서 태스크들끼리 작은 값을 주고받는 통신 수단. 스케줄링에는 관여하지 않습니다.
  • Asset: DAG와 DAG 사이의 데이터 의존성을 표현하고, 한 DAG가 데이터를 갱신하면 그것을 소비하는 다른 DAG가 트리거되도록 하는 스케줄링 신호입니다.

정리하면, 한 파이프라인 내부의 손발 맞추기는 XCom, 파이프라인끼리의 바통 터치는 Asset입니다.

5. Custom XCom Backend — 대용량 페이로드 우회하기

"포인터를 넘겨라"는 원칙을 따르더라도, 여전히 중간 결과를 통째로 넘기고 싶은 경우가 있습니다. 매번 손으로 스토리지에 쓰고 경로를 돌려주는 보일러플레이트가 번거롭기도 하고요. 이때 등장하는 게 Custom XCom Backend입니다.

아이디어는 이렇습니다. XCom 값을 메타DB가 아니라 오브젝트 스토리지(S3/GCS 등)에 저장하고, 메타DB에는 그 위치를 가리키는 작은 참조만 남깁니다. 작성자는 평소처럼 return df만 하면 되고, 직렬화·업로드·다운로드는 backend가 알아서 처리합니다.

Loading diagram…

가장 손쉬운 방법은 공식 Common IO 제공자의 backend를 쓰는 것입니다. 이 backend는 설정한 임계치를 넘는 값만 스토리지로 보내고, 작은 값은 그대로 메타DB에 둡니다.

# airflow.cfg (또는 환경변수)
[core]
# XCom 직렬화/저장을 담당할 backend 클래스 지정
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
 
[common.io]
# 값을 저장할 오브젝트 스토리지 경로 (Connection으로 자격증명 구성)
xcom_objectstorage_path = s3://my-airflow-xcom/xcom
# 이 바이트 수를 넘는 값만 스토리지로, 그보다 작으면 메타DB에 그대로
xcom_objectstorage_threshold = 1048576

환경변수로 줄 때는 AIRFLOW__CORE__XCOM_BACKEND, AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH 같은 형태가 됩니다(설정을 환경변수로 주입하는 패턴은 환경설정 & 최적화 편 참고). 직접 backend를 구현하고 싶다면 BaseXCom을 상속해 serialize_value/deserialize_value를 오버라이드하면 됩니다.

from airflow.models.xcom import BaseXCom
 
 
class MyS3XComBackend(BaseXCom):
    @staticmethod
    def serialize_value(value, **kwargs):
        # 큰 값이면 S3에 올리고, 메타DB에는 s3 경로(참조)만 직렬화해 반환
        ...
 
    @staticmethod
    def deserialize_value(result):
        # 메타DB의 참조를 보고 S3에서 실제 값을 내려받아 복원
        ...

다만 Custom Backend도 만능은 아닙니다. 임계치를 너무 낮게 잡으면 작은 값까지 매번 스토리지를 왕복해 오히려 느려지고, 스토리지에 쌓인 XCom 객체의 수명 관리(만료·정리) 도 직접 신경 써야 합니다. 임계치 위에서만 스토리지로 보내는 하이브리드 구성이 보통 합리적입니다.

마무리

XCom은 "태스크 사이에 붙이는 작은 메모"라는 본질만 잡으면 헷갈릴 게 없습니다.

  • XCom 값은 메타DB에 저장되므로, 작은 값(포인터)만 넘긴다.
  • TaskFlow API를 쓰면 반환값이 자동으로 XCom이 되어 코드가 깔끔해진다 — 단, 한계는 그대로다.
  • 대용량은 스토리지에 두고 경로만 넘기거나, Custom XCom Backend로 backend 차원에서 우회한다.
  • Airflow 3에서는 워커가 메타DB를 직접 만지지 않고 Task Execution API를 거친다.
  • 같은 run 내부 통신은 XCom, DAG 간 데이터 의존성은 Asset으로 구분한다.

다음 편 외부 시스템 연동 & 싱크 호출에서는 이렇게 다듬은 데이터를 데이터베이스·API·메시지 큐 같은 바깥 세계와 안전하게 주고받는 법을 다룹니다.

더 깊이 보려면 공식 문서 XComsObject Storage XCom backend를 참고하세요.