Blog
kafkadisaster-recoverymirrormaker2offsetsfailover

[Kafka DR ③] DR의 핵심 난제 — Offset 변환과 컨슈머 페일오버

DR 클러스터는 독립된 로그라 같은 레코드라도 offset이 다릅니다. MirrorMaker 2가 offset-syncs와 checkpoint로 이 격차를 어떻게 메우는지, RemoteClusterUtils로 변환된 offset을 읽고 sync.group.offsets로 컨슈머를 무중단 페일오버시키는 방법을 정밀하게 정리합니다.

Data Dynamics2026年6月15日25 min read
This post is not yet translated. The original Korean version is shown below.

복제는 쉽습니다. 어려운 건 "그래서 컨슈머를 어디서부터 다시 읽게 할 것인가"입니다. 1번 클러스터에서 offset 1,000,000까지 읽던 컨슈머가 장애를 만나 DR 클러스터로 옮겨갔을 때, 똑같이 offset 1,000,000부터 읽으면 될까요? 절대 안 됩니다. DR 클러스터는 원본의 복사본이 아니라 독립적으로 채번된 별개의 로그이기 때문입니다. 이 한 줄이 Kafka DR 전체에서 가장 까다로운 문제를 만들어 냅니다. 이번 편은 그 문제 — offset 변환과 컨슈머 페일오버 — 만 집중해서 파헤칩니다.

이 글에서 배우는 것

  • 왜 DR 클러스터의 offset은 원본과 다른가 (독립 로그 문제)
  • MirrorMaker 2가 offset-syncs와 checkpoint로 offset 격차를 메우는 원리
  • RemoteClusterUtils / MirrorClient.translateOffsets()로 변환된 offset을 읽는 법
  • sync.group.offsets.enabled로 앱 수정 없이 컨슈머 그룹을 페일오버시키는 법
  • 변환은 근사값이고 at-least-once라는 것 — RPO와 멱등성의 의미
  • checkpoint 주기 튜닝과 운영상의 함정

이 글은 "Kafka DR 구축" 시리즈의 3편입니다. 1편(DR 아키텍처와 RPO/RTO 설계)과 2편(MirrorMaker 2로 토픽·데이터를 복제하기)에서 우리는 "데이터를 DR 클러스터로 흘려보내는" 데까지 왔습니다. 이번 편은 그 데이터를 컨슈머가 실제로 이어받는 마지막 한 걸음을 다룹니다.


1. 핵심 문제 — DR 클러스터는 독립된 로그다

같은 레코드, 다른 offset

Kafka의 offset은 파티션 로그 안에서의 물리적 위치입니다. 글로벌 ID가 아니라, 각 브로커가 자기 로그에 레코드를 append하면서 0부터 순차적으로 매기는 로컬 번호입니다. 핵심은 이것입니다: offset은 레코드의 신원(identity)이 아니라 위치(position)다.

MirrorMaker 2(이하 MM2)가 원본(primary)의 토픽을 DR 클러스터(dr)로 복제하면, DR 쪽에는 primary.orders라는 토픽이 새로 생기고 거기에 레코드가 다시 produce됩니다. 이 과정에서 DR 토픽의 offset은 DR 브로커가 새로 채번합니다. 원본 offset을 그대로 가져오는 게 아닙니다.

Loading diagram…

왜 어긋날까요? 여러 이유가 겹칩니다.

원인설명
복제 시작 시점MM2가 토픽 중간부터 복제를 시작하면 DR offset은 0부터, 원본은 이미 수백만
로그 보존/압축원본에서 retention으로 앞부분이 지워진 뒤 복제되면 시작 offset이 다름
메시지 누락/재시도producer 재시도, 트랜잭션 마커 등으로 양쪽 append 카운트가 미세하게 다를 수 있음
토픽 재생성원본 토픽이 재생성되면 offset이 리셋되지만 DR은 이어서 증가

그래서 무슨 일이 벌어지나

컨슈머 그룹은 자기가 어디까지 읽었는지를 committed offset으로 __consumer_offsets에 기록합니다. 이 값은 원본 클러스터 기준의 숫자입니다. 장애가 나서 컨슈머를 DR로 옮길 때, 이 숫자를 그대로 DR에서 seek하면:

  • DR offset이 더 크면 → 아직 처리 안 한 레코드를 건너뜀 (데이터 유실, 사실상 처리 누락)
  • DR offset이 더 작으면 → 이미 처리한 레코드를 다시 읽음 (대량 중복 처리)

둘 다 운영 사고입니다. 그래서 필요한 게 **offset 변환(offset translation)**입니다. "원본에서 offset X였던 그 레코드는, DR에서는 offset Y다"라는 매핑을 만들어 두고, 페일오버 시 X를 Y로 바꿔서 컨슈머가 올바른 위치에서 이어받게 해야 합니다.


2. MM2는 어떻게 푸는가 — offset-syncs와 checkpoint

MM2는 이 매핑을 자동으로 만들어 줍니다. 두 개의 커넥터와 두 개의 내부 토픽이 협업합니다.

MirrorSourceConnector → mm2-offset-syncs 토픽

데이터 복제를 담당하는 MirrorSourceConnector는 레코드를 DR로 produce하면서, "원본 offset ↔ DR offset" 매핑 쌍을 별도의 내부 토픽 mm2-offset-syncs.<target>.internal(흔히 mm2-offset-syncs로 줄여 부름)에 주기적으로 기록합니다. 모든 레코드마다 기록하는 게 아니라, 일정 간격마다 앵커(anchor) 역할을 하는 매핑을 남깁니다.

이 토픽의 레코드는 본질적으로 이런 의미입니다:

topic-partition = primary.orders-0
upstreamOffset  = 1_000_000   (원본에서의 offset)
downstreamOffset =   742_318  (DR에서의 offset)

즉 "원본 offset 1,000,000인 레코드가 DR에서는 offset 742,318이다"라는 닻을 박아 둡니다.

MirrorCheckpointConnector → <source>.checkpoints.internal 토픽

MirrorCheckpointConnector는 한 단계 더 나아갑니다. 이 커넥터는 원본 클러스터의 컨슈머 그룹들이 어디까지 commit했는지를 주기적으로 읽어서, offset-syncs의 매핑을 이용해 DR 기준 offset으로 변환한 결과를 <source>.checkpoints.internal 토픽에 기록합니다.

checkpoint 레코드 하나는 대략 이런 정보를 담습니다:

필드의미
consumerGroupId어떤 컨슈머 그룹인지 (예: order-processor)
topicPartition어떤 토픽-파티션인지 (DR 쪽 이름, primary.orders-0)
upstreamOffset원본에서 그 그룹이 commit한 offset
downstreamOffset변환된 DR offset (= 페일오버 시 seek할 위치)
metadata부가 정보

정리하면 데이터 흐름은 이렇습니다.

Loading diagram…

설정 예시

2편에서 띄운 MM2 설정에 checkpoint 관련 옵션을 더하면 다음과 같습니다.

# mm2.properties (발췌)
clusters = primary, dr
primary->dr.enabled = true
 
# 데이터 복제 (MirrorSourceConnector) — 2편에서 다룸
primary->dr.topics = orders, payments, .*
 
# offset-syncs 토픽 위치 (기본은 target=dr)
primary->dr.offset-syncs.topic.location = target
 
# checkpoint (컨슈머 그룹 offset 변환) 활성화
primary->dr.emit.checkpoints.enabled = true
primary->dr.groups = order-processor, payment-.*
 
# checkpoint 발행 주기 — 작을수록 변환 정확도가 올라가지만 부하 증가
primary->dr.emit.checkpoints.interval.seconds = 30
 
# offset-syncs 발행 주기
primary->dr.sync.offsets.interval.seconds = 30

groups는 변환 대상 컨슈머 그룹을 정규식으로 지정합니다. 비워 두면 checkpoint가 만들어지지 않으니, DR로 페일오버시킬 그룹을 반드시 명시해야 합니다.

숫자로 따라가 보기

추상적인 설명만으로는 감이 잘 안 오니, 구체적인 수치로 변환 한 사이클을 따라가 봅시다. 파티션 orders-0 하나만 보겠습니다.

  1. 원본에서 복제는 원본 offset 250,000부터 시작됐고, 그 레코드가 DR primary.orders-0의 offset 0에 들어갔습니다. 즉 상수 차이(delta) = 250,000입니다.

  2. MirrorSourceConnector가 주기적으로 앵커를 남깁니다.

    upstreamOffset(원본)downstreamOffset(DR)
    250,0000
    300,00050,000
    1,000,000750,000
  3. 컨슈머 그룹 order-processor가 원본에서 offset 1,000,000까지 commit했습니다.

  4. MirrorCheckpointConnector가 이 commit을 읽고, 가장 가까운 앵커(1,000,000 → 750,000)를 이용해 downstreamOffset = 750,000으로 변환해 checkpoint에 기록합니다.

  5. 페일오버 시 컨슈머는 DR primary.orders-0의 offset 750,000부터 재개합니다. 원본의 1,000,000을 그대로 썼다면 DR 로그를 한참 넘어가 버렸을 것입니다.

여기서 주의: 만약 commit 위치(예: 1,000,000)가 정확한 앵커가 아니라 앵커들 사이라면, MM2는 더 작은(과거) 앵커를 골라 약간 앞쪽으로 매핑합니다. 위 예에서 그룹이 320,000을 commit했다면, MM2는 300,000→50,000 앵커를 기준으로 보수적으로 변환합니다. 결과적으로 약간의 **재처리(중복)**가 생기는데, 이것이 바로 뒤(4장)에서 다룰 at-least-once의 본질입니다.


3. 변환된 offset 읽기 — RemoteClusterUtils와 sync.group.offsets

checkpoint 토픽이 채워졌다면, 페일오버 시 그 값을 꺼내 쓰는 방법은 두 가지입니다.

방법 A — RemoteClusterUtils로 직접 조회

RemoteClusterUtils(내부적으로 MirrorClient 사용)는 checkpoint 토픽을 읽어 "이 그룹이 DR에서 어디부터 읽어야 하는가"를 계산해 줍니다.

import org.apache.kafka.connect.mirror.RemoteClusterUtils;
 
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.TopicPartition;
 
// DR 클러스터에 접속하는 설정
Properties props = new Properties();
props.put("bootstrap.servers", "dr-broker1:9092,dr-broker2:9092");
 
// "primary"에서 넘어온 컨슈머 그룹의 DR 기준 재개 위치를 계산
Map<TopicPartition, OffsetAndMetadata> translated =
    RemoteClusterUtils.translateOffsets(
        props,
        "primary",          // source 클러스터 별칭 (DR 토픽 prefix)
        "order-processor",  // 변환할 컨슈머 그룹
        java.time.Duration.ofSeconds(10));
 
// translated 의 key 는 DR 토픽-파티션(primary.orders-0 등),
// value 는 그 그룹이 DR에서 seek/commit 해야 할 offset.

MirrorClient를 직접 쓰면 더 세밀하게 제어할 수 있습니다.

import org.apache.kafka.connect.mirror.MirrorClient;
import org.apache.kafka.connect.mirror.MirrorClientConfig;
 
MirrorClient client = new MirrorClient(new MirrorClientConfig(/* configs */));
 
// 원본 토픽명을 DR 쪽 remote 토픽명으로
String remoteTopic = client.remoteTopic("orders", "primary"); // -> "primary.orders"
 
// 그룹의 변환된 offset 조회
Map<TopicPartition, OffsetAndMetadata> offsets =
    client.translateOffsets(props, "primary", "order-processor", Duration.ofSeconds(10));

이렇게 얻은 offset을 페일오버 스크립트에서 DR 클러스터의 __consumer_offsets에 commit하거나, 컨슈머가 시작할 때 seek()에 넣어 주면 됩니다.

방법 B — sync.group.offsets로 자동 동기화 (앱 수정 0)

방법 A는 코드를 짜야 합니다. 더 매끄러운 방법은 MM2가 변환된 offset을 DR 클러스터의 __consumer_offsets에 직접 써 주게 하는 것입니다.

# 변환된 컨슈머 그룹 offset을 DR의 __consumer_offsets에 자동 기록
primary->dr.sync.group.offsets.enabled = true
primary->dr.sync.group.offsets.interval.seconds = 30

이 옵션을 켜면 MirrorCheckpointConnector가 checkpoint를 만드는 데서 멈추지 않고, 같은 그룹 ID로 DR 클러스터에 committed offset을 미리 동기화해 둡니다. 그 결과:

  • 페일오버 시 애플리케이션을 전혀 고치지 않아도 됩니다. 컨슈머의 bootstrap.servers만 DR로 바꿔 재시작하면, 같은 group.id로 붙는 순간 DR의 __consumer_offsets에 이미 변환된 위치가 들어 있어 거기서부터 이어집니다.
  • 단, 그 그룹이 DR에서 활성 상태가 아닐 때만 동기화됩니다. 이미 DR에서 같은 그룹이 돌고 있으면(예: active-active) MM2는 덮어쓰지 않습니다 — 정상 동작 중인 컨슈머의 진행 상황을 망가뜨리지 않기 위함입니다.
비교방법 A (translateOffsets)방법 B (sync.group.offsets)
앱 수정페일오버 로직/스크립트 필요불필요 (bootstrap만 변경)
제어 수준세밀 (그룹·파티션별 결정)자동 (전체 일괄)
적용 시점페일오버 순간 명시적 호출주기적으로 미리 동기화
추천 상황커스텀 페일오버 오케스트레이션단순·표준 active-passive DR

대부분의 active-passive DR에서는 방법 B가 운영 부담이 가장 적습니다.

active-passive vs active-active에서의 차이

offset 변환은 **단방향 흐름(active-passive)**을 가정할 때 가장 깔끔합니다. 원본에서만 컨슈머가 돌고, DR은 대기 상태이며, 페일오버 순간에만 DR 컨슈머가 깨어납니다. 이때 DR의 그룹은 "비활성"이므로 sync.group.offsets가 안전하게 offset을 미리 채워 둘 수 있습니다.

**active-active(양방향)**에서는 이야기가 복잡해집니다.

상황고려 사항
양쪽에서 같은 group.id가 동작MM2는 활성 그룹의 offset을 덮어쓰지 않음 → 동기화가 의도적으로 스킵됨
양방향 복제 토픽primary.ordersdr.orders가 동시에 존재, 컨슈머가 어느 쪽을 읽는지 명확히 설계 필요
순환 복제 방지MM2의 토픽 prefix(primary., dr.)가 무한 루프를 막지만, 그룹 offset 동기화는 단방향으로만 신뢰

active-active를 운영한다면, 그룹 offset 동기화에 의존하기보다 페일오버 시점에 방법 A로 명시적으로 변환·commit하는 편이 예측 가능성이 높습니다. 양방향 복제와 컨슈머 토폴로지 설계는 시리즈 후속 편(또는 1편의 토폴로지 절)에서 더 깊이 다룹니다.

end offset과의 정합성 점검

변환 결과를 맹신하지 마세요. 페일오버 직전에 반드시 DR 파티션의 end offset(log end offset)과 비교해 sanity check를 해야 합니다.

변환 offset > DR end offset  → 잘못된 매핑(미래 위치). 그대로 seek하면 OffsetOutOfRange.
                              → end offset 또는 가장 가까운 유효 앵커로 clamp.
변환 offset < DR start offset → retention으로 이미 지워진 구간을 가리킴.
                              → earliest로 reset (대량 재처리 각오).

RemoteClusterUtils가 정상적으로 동작하면 이런 경계 위반은 드물지만, 복제 지연이 크거나 토픽이 재생성된 직후에는 충분히 발생할 수 있습니다. 페일오버 스크립트에 clamp 로직을 넣어 두면 운영 사고를 한 단계 막을 수 있습니다.


4. 반드시 알아야 할 한계 — 변환은 근사값이고 at-least-once다

여기서 솔직해질 필요가 있습니다. offset 변환은 정확하지 않습니다. 그리고 그것은 버그가 아니라 설계상의 본질입니다.

왜 근사값인가

offset-syncs는 모든 레코드가 아니라 주기적인 앵커만 기록합니다(부하 때문에). 따라서 변환은 "가장 가까운 알려진 앵커"를 기준으로 계산되고, 앵커와 앵커 사이의 레코드에 대해서는 약간 보수적인(앞쪽) 위치로 매핑됩니다. checkpoint 역시 emit.checkpoints.interval.seconds마다 한 번씩 찍히므로, 마지막 checkpoint 이후에 원본 컨슈머가 더 진행한 부분은 반영되지 않습니다.

그 결과 변환된 위치는 컨슈머의 **실제 마지막 처리 지점보다 약간 뒤(과거)**일 가능성이 높습니다. 이게 의미하는 바는:

페일오버 후 일부 레코드는 다시 처리된다 (중복). 이것은 정상이며, MM2의 의미론은 명시적으로 **at-least-once(최소 한 번)**입니다. exactly-once가 아닙니다.

그래서 컨슈머는 멱등(idempotent)해야 한다

페일오버 후 중복 처리를 안전하게 흡수하려면, 다운스트림 로직이 멱등해야 합니다.

나쁜 예 (멱등 X):
  주문 이벤트 수신 → 잔고에서 금액 "차감"
  → 같은 이벤트 재처리 시 이중 차감 (사고)
 
좋은 예 (멱등 O):
  주문 이벤트 수신 → 멱등 키(order_id)로 UPSERT
  → 같은 이벤트 재처리해도 결과 동일

전형적인 멱등화 기법:

기법설명
멱등 키 + UPSERT비즈니스 키로 중복 INSERT를 무시/갱신
처리 기록 테이블처리한 이벤트 ID를 저장하고 재등장 시 skip
조건부 쓰기"현재 상태가 X일 때만" 갱신하는 CAS류 연산
외부 시스템의 멱등 API결제 등은 idempotency-key 헤더 지원

손실 구간(un-replicated data) = 당신의 RPO

또 하나의 진실: 장애 순간에 아직 DR로 복제되지 못한 레코드는 변환으로도 구제할 수 없습니다. 그 양이 곧 **RPO(Recovery Point Objective)**입니다.

RPO ≈ 원본의 최신 produce 위치 − DR로 복제 완료된 위치
    ≈ 복제 지연(replication lag) × 장애 발생률

복제 지연이 평균 2초라면, 최악의 경우 약 2초어치 데이터를 잃을 수 있다는 뜻입니다(1편의 RPO 설계와 직결). 이 손실을 줄이려면 복제 지연 자체를 줄여야 하고, 손실을 0으로 만들고 싶다면 그건 더 이상 비동기 DR이 아니라 동기 복제 영역의 이야기입니다.

정확도는 주기로 튜닝한다

변환 정확도(= 중복 처리량)는 두 주기로 조절합니다.

설정효과트레이드오프
emit.checkpoints.interval.secondscheckpoint를 더 자주 찍어 변환을 최신화너무 작으면 connect 클러스터 부하·내부 토픽 트래픽 증가
sync.group.offsets.interval.secondsDR __consumer_offsets 동기화를 더 자주동일하게 부하 증가
offset-syncs(sync.offsets.interval.seconds)앵커를 촘촘히 → 변환 입자 향상내부 토픽 쓰기량 증가

권장 출발점은 30초입니다. RPO/중복 허용치가 빡빡하면 10초까지 낮추고, 토픽 수가 매우 많아 부하가 부담되면 60초로 올리며 모니터링하세요. 핵심 원칙: 더 촘촘한 checkpoint = 더 정확한 페일오버 = 더 적은 중복, 그러나 더 높은 부하.


5. 운영 체크리스트

페일오버 리허설 전에 아래를 점검하세요.

점검 항목확인 방법
checkpoint 토픽이 채워지는가DR에서 primary.checkpoints.internal 컨슘해 레코드 확인
대상 그룹이 groups 정규식에 포함되는가emit.checkpoints 로그 / checkpoint에 그룹 등장 여부
offset-syncs가 흐르는가mm2-offset-syncs.<target>.internal 에 매핑 적재 확인
sync.group.offsets 동작DR __consumer_offsets에 해당 그룹 offset 존재 확인
변환 결과가 합리적인가translateOffsets() 결과를 DR end offset과 비교 (음수/초과 없음)
컨슈머 멱등성동일 이벤트 2회 처리 테스트로 부작용 없음 검증
복제 지연(RPO)MirrorSourceConnector lag 메트릭 대시보드화

자주 겪는 함정: sync.group.offsets를 켰는데 DR에 offset이 안 보인다면, 십중팔구 그 그룹이 (1) groups 정규식에 안 잡혔거나, (2) 원본에서 한 번도 commit한 적이 없거나, (3) 이미 DR에서 활성 상태라 MM2가 덮어쓰기를 거부한 경우입니다.


마치며

  • DR 클러스터는 원본의 복사본이 아니라 독립된 로그입니다. 같은 레코드라도 offset이 다르므로, 원본 offset으로 DR에서 seek하면 누락 또는 대량 중복이 발생합니다.
  • MM2는 MirrorSourceConnector가 남기는 mm2-offset-syncs 매핑과 MirrorCheckpointConnector가 만드는 <source>.checkpoints.internal 을 통해 "원본 offset → DR offset" 변환을 자동화합니다.
  • 변환 값은 RemoteClusterUtils.translateOffsets()로 직접 읽거나, **sync.group.offsets.enabled=true**로 DR의 __consumer_offsets에 미리 써 두어 앱 수정 없이 페일오버할 수 있습니다.
  • 변환은 근사값이며 at-least-once입니다. 페일오버 후 약간의 중복 처리는 정상이므로, 컨슈머는 반드시 멱등해야 합니다. 아직 복제되지 못한 구간이 곧 당신의 RPO입니다.
  • 정확도와 부하는 emit.checkpoints.interval.seconds·sync.group.offsets.interval.seconds로 균형을 잡습니다(권장 출발점 30초).
  • 다음 4편에서는 지금까지 준비한 변환·동기화를 실제로 발동시키는 페일오버/페일백 런북 — 장애 감지, 트래픽 전환, 원복 절차, 리허설 시나리오 — 을 단계별로 다룹니다.

참고 자료


— Data Dynamics 엔지니어링 팀