Blog
kafkadisaster-recoverymirrormaker2offsetsfailover

[Kafka DR ③] DR의 핵심 난제 — Offset 변환과 컨슈머 페일오버

DR 클러스터는 독립된 로그라 같은 레코드라도 offset이 다릅니다. MirrorMaker 2가 offset-syncs와 checkpoint로 이 격차를 어떻게 메우는지, RemoteClusterUtils로 변환된 offset을 읽고 sync.group.offsets로 컨슈머를 무중단 페일오버시키는 방법을 정밀하게 정리합니다.

Data Dynamics2026년 6월 15일25 min read

복제는 쉽습니다. 어려운 건 "그래서 컨슈머를 어디서부터 다시 읽게 할 것인가"입니다. 1번 클러스터에서 offset 1,000,000까지 읽던 컨슈머가 장애를 만나 DR 클러스터로 옮겨갔을 때, 똑같이 offset 1,000,000부터 읽으면 될까요? 절대 안 됩니다. DR 클러스터는 원본의 복사본이 아니라 독립적으로 채번된 별개의 로그이기 때문입니다. 이 한 줄이 Kafka DR 전체에서 가장 까다로운 문제를 만들어 냅니다. 이번 편은 그 문제 — offset 변환과 컨슈머 페일오버 — 만 집중해서 파헤칩니다.

이 글에서 배우는 것

  • 왜 DR 클러스터의 offset은 원본과 다른가 (독립 로그 문제)
  • MirrorMaker 2가 offset-syncs와 checkpoint로 offset 격차를 메우는 원리
  • RemoteClusterUtils / MirrorClient.translateOffsets()로 변환된 offset을 읽는 법
  • sync.group.offsets.enabled로 앱 수정 없이 컨슈머 그룹을 페일오버시키는 법
  • 변환은 근사값이고 at-least-once라는 것 — RPO와 멱등성의 의미
  • checkpoint 주기 튜닝과 운영상의 함정

이 글은 "Kafka DR 구축" 시리즈의 3편입니다. 1편(DR 아키텍처와 RPO/RTO 설계)과 2편(MirrorMaker 2로 토픽·데이터를 복제하기)에서 우리는 "데이터를 DR 클러스터로 흘려보내는" 데까지 왔습니다. 이번 편은 그 데이터를 컨슈머가 실제로 이어받는 마지막 한 걸음을 다룹니다.


1. 핵심 문제 — DR 클러스터는 독립된 로그다

같은 레코드, 다른 offset

Kafka의 offset은 파티션 로그 안에서의 물리적 위치입니다. 글로벌 ID가 아니라, 각 브로커가 자기 로그에 레코드를 append하면서 0부터 순차적으로 매기는 로컬 번호입니다. 핵심은 이것입니다: offset은 레코드의 신원(identity)이 아니라 위치(position)다.

MirrorMaker 2(이하 MM2)가 원본(primary)의 토픽을 DR 클러스터(dr)로 복제하면, DR 쪽에는 primary.orders라는 토픽이 새로 생기고 거기에 레코드가 다시 produce됩니다. 이 과정에서 DR 토픽의 offset은 DR 브로커가 새로 채번합니다. 원본 offset을 그대로 가져오는 게 아닙니다.

Loading diagram…

왜 어긋날까요? 여러 이유가 겹칩니다.

원인설명
복제 시작 시점MM2가 토픽 중간부터 복제를 시작하면 DR offset은 0부터, 원본은 이미 수백만
로그 보존/압축원본에서 retention으로 앞부분이 지워진 뒤 복제되면 시작 offset이 다름
메시지 누락/재시도producer 재시도, 트랜잭션 마커 등으로 양쪽 append 카운트가 미세하게 다를 수 있음
토픽 재생성원본 토픽이 재생성되면 offset이 리셋되지만 DR은 이어서 증가

그래서 무슨 일이 벌어지나

컨슈머 그룹은 자기가 어디까지 읽었는지를 committed offset으로 __consumer_offsets에 기록합니다. 이 값은 원본 클러스터 기준의 숫자입니다. 장애가 나서 컨슈머를 DR로 옮길 때, 이 숫자를 그대로 DR에서 seek하면:

  • DR offset이 더 크면 → 아직 처리 안 한 레코드를 건너뜀 (데이터 유실, 사실상 처리 누락)
  • DR offset이 더 작으면 → 이미 처리한 레코드를 다시 읽음 (대량 중복 처리)

둘 다 운영 사고입니다. 그래서 필요한 게 **offset 변환(offset translation)**입니다. "원본에서 offset X였던 그 레코드는, DR에서는 offset Y다"라는 매핑을 만들어 두고, 페일오버 시 X를 Y로 바꿔서 컨슈머가 올바른 위치에서 이어받게 해야 합니다.


2. MM2는 어떻게 푸는가 — offset-syncs와 checkpoint

MM2는 이 매핑을 자동으로 만들어 줍니다. 두 개의 커넥터와 두 개의 내부 토픽이 협업합니다.

MirrorSourceConnector → mm2-offset-syncs 토픽

데이터 복제를 담당하는 MirrorSourceConnector는 레코드를 DR로 produce하면서, "원본 offset ↔ DR offset" 매핑 쌍을 별도의 내부 토픽 mm2-offset-syncs.<target>.internal(흔히 mm2-offset-syncs로 줄여 부름)에 주기적으로 기록합니다. 모든 레코드마다 기록하는 게 아니라, 일정 간격마다 앵커(anchor) 역할을 하는 매핑을 남깁니다.

이 토픽의 레코드는 본질적으로 이런 의미입니다:

topic-partition = primary.orders-0
upstreamOffset  = 1_000_000   (원본에서의 offset)
downstreamOffset =   742_318  (DR에서의 offset)

즉 "원본 offset 1,000,000인 레코드가 DR에서는 offset 742,318이다"라는 닻을 박아 둡니다.

MirrorCheckpointConnector → <source>.checkpoints.internal 토픽

MirrorCheckpointConnector는 한 단계 더 나아갑니다. 이 커넥터는 원본 클러스터의 컨슈머 그룹들이 어디까지 commit했는지를 주기적으로 읽어서, offset-syncs의 매핑을 이용해 DR 기준 offset으로 변환한 결과를 <source>.checkpoints.internal 토픽에 기록합니다.

checkpoint 레코드 하나는 대략 이런 정보를 담습니다:

필드의미
consumerGroupId어떤 컨슈머 그룹인지 (예: order-processor)
topicPartition어떤 토픽-파티션인지 (DR 쪽 이름, primary.orders-0)
upstreamOffset원본에서 그 그룹이 commit한 offset
downstreamOffset변환된 DR offset (= 페일오버 시 seek할 위치)
metadata부가 정보

정리하면 데이터 흐름은 이렇습니다.

Loading diagram…

설정 예시

2편에서 띄운 MM2 설정에 checkpoint 관련 옵션을 더하면 다음과 같습니다.

# mm2.properties (발췌)
clusters = primary, dr
primary->dr.enabled = true
 
# 데이터 복제 (MirrorSourceConnector) — 2편에서 다룸
primary->dr.topics = orders, payments, .*
 
# offset-syncs 토픽 위치 (기본은 target=dr)
primary->dr.offset-syncs.topic.location = target
 
# checkpoint (컨슈머 그룹 offset 변환) 활성화
primary->dr.emit.checkpoints.enabled = true
primary->dr.groups = order-processor, payment-.*
 
# checkpoint 발행 주기 — 작을수록 변환 정확도가 올라가지만 부하 증가
primary->dr.emit.checkpoints.interval.seconds = 30
 
# offset-syncs 발행 주기
primary->dr.sync.offsets.interval.seconds = 30

groups는 변환 대상 컨슈머 그룹을 정규식으로 지정합니다. 비워 두면 checkpoint가 만들어지지 않으니, DR로 페일오버시킬 그룹을 반드시 명시해야 합니다.

숫자로 따라가 보기

추상적인 설명만으로는 감이 잘 안 오니, 구체적인 수치로 변환 한 사이클을 따라가 봅시다. 파티션 orders-0 하나만 보겠습니다.

  1. 원본에서 복제는 원본 offset 250,000부터 시작됐고, 그 레코드가 DR primary.orders-0의 offset 0에 들어갔습니다. 즉 상수 차이(delta) = 250,000입니다.

  2. MirrorSourceConnector가 주기적으로 앵커를 남깁니다.

    upstreamOffset(원본)downstreamOffset(DR)
    250,0000
    300,00050,000
    1,000,000750,000
  3. 컨슈머 그룹 order-processor가 원본에서 offset 1,000,000까지 commit했습니다.

  4. MirrorCheckpointConnector가 이 commit을 읽고, 가장 가까운 앵커(1,000,000 → 750,000)를 이용해 downstreamOffset = 750,000으로 변환해 checkpoint에 기록합니다.

  5. 페일오버 시 컨슈머는 DR primary.orders-0의 offset 750,000부터 재개합니다. 원본의 1,000,000을 그대로 썼다면 DR 로그를 한참 넘어가 버렸을 것입니다.

여기서 주의: 만약 commit 위치(예: 1,000,000)가 정확한 앵커가 아니라 앵커들 사이라면, MM2는 더 작은(과거) 앵커를 골라 약간 앞쪽으로 매핑합니다. 위 예에서 그룹이 320,000을 commit했다면, MM2는 300,000→50,000 앵커를 기준으로 보수적으로 변환합니다. 결과적으로 약간의 **재처리(중복)**가 생기는데, 이것이 바로 뒤(4장)에서 다룰 at-least-once의 본질입니다.


3. 변환된 offset 읽기 — RemoteClusterUtils와 sync.group.offsets

checkpoint 토픽이 채워졌다면, 페일오버 시 그 값을 꺼내 쓰는 방법은 두 가지입니다.

방법 A — RemoteClusterUtils로 직접 조회

RemoteClusterUtils(내부적으로 MirrorClient 사용)는 checkpoint 토픽을 읽어 "이 그룹이 DR에서 어디부터 읽어야 하는가"를 계산해 줍니다.

import org.apache.kafka.connect.mirror.RemoteClusterUtils;
 
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.TopicPartition;
 
// DR 클러스터에 접속하는 설정
Properties props = new Properties();
props.put("bootstrap.servers", "dr-broker1:9092,dr-broker2:9092");
 
// "primary"에서 넘어온 컨슈머 그룹의 DR 기준 재개 위치를 계산
Map<TopicPartition, OffsetAndMetadata> translated =
    RemoteClusterUtils.translateOffsets(
        props,
        "primary",          // source 클러스터 별칭 (DR 토픽 prefix)
        "order-processor",  // 변환할 컨슈머 그룹
        java.time.Duration.ofSeconds(10));
 
// translated 의 key 는 DR 토픽-파티션(primary.orders-0 등),
// value 는 그 그룹이 DR에서 seek/commit 해야 할 offset.

MirrorClient를 직접 쓰면 더 세밀하게 제어할 수 있습니다.

import org.apache.kafka.connect.mirror.MirrorClient;
import org.apache.kafka.connect.mirror.MirrorClientConfig;
 
MirrorClient client = new MirrorClient(new MirrorClientConfig(/* configs */));
 
// 원본 토픽명을 DR 쪽 remote 토픽명으로
String remoteTopic = client.remoteTopic("orders", "primary"); // -> "primary.orders"
 
// 그룹의 변환된 offset 조회
Map<TopicPartition, OffsetAndMetadata> offsets =
    client.translateOffsets(props, "primary", "order-processor", Duration.ofSeconds(10));

이렇게 얻은 offset을 페일오버 스크립트에서 DR 클러스터의 __consumer_offsets에 commit하거나, 컨슈머가 시작할 때 seek()에 넣어 주면 됩니다.

방법 B — sync.group.offsets로 자동 동기화 (앱 수정 0)

방법 A는 코드를 짜야 합니다. 더 매끄러운 방법은 MM2가 변환된 offset을 DR 클러스터의 __consumer_offsets에 직접 써 주게 하는 것입니다.

# 변환된 컨슈머 그룹 offset을 DR의 __consumer_offsets에 자동 기록
primary->dr.sync.group.offsets.enabled = true
primary->dr.sync.group.offsets.interval.seconds = 30

이 옵션을 켜면 MirrorCheckpointConnector가 checkpoint를 만드는 데서 멈추지 않고, 같은 그룹 ID로 DR 클러스터에 committed offset을 미리 동기화해 둡니다. 그 결과:

  • 페일오버 시 애플리케이션을 전혀 고치지 않아도 됩니다. 컨슈머의 bootstrap.servers만 DR로 바꿔 재시작하면, 같은 group.id로 붙는 순간 DR의 __consumer_offsets에 이미 변환된 위치가 들어 있어 거기서부터 이어집니다.
  • 단, 그 그룹이 DR에서 활성 상태가 아닐 때만 동기화됩니다. 이미 DR에서 같은 그룹이 돌고 있으면(예: active-active) MM2는 덮어쓰지 않습니다 — 정상 동작 중인 컨슈머의 진행 상황을 망가뜨리지 않기 위함입니다.
비교방법 A (translateOffsets)방법 B (sync.group.offsets)
앱 수정페일오버 로직/스크립트 필요불필요 (bootstrap만 변경)
제어 수준세밀 (그룹·파티션별 결정)자동 (전체 일괄)
적용 시점페일오버 순간 명시적 호출주기적으로 미리 동기화
추천 상황커스텀 페일오버 오케스트레이션단순·표준 active-passive DR

대부분의 active-passive DR에서는 방법 B가 운영 부담이 가장 적습니다.

active-passive vs active-active에서의 차이

offset 변환은 **단방향 흐름(active-passive)**을 가정할 때 가장 깔끔합니다. 원본에서만 컨슈머가 돌고, DR은 대기 상태이며, 페일오버 순간에만 DR 컨슈머가 깨어납니다. 이때 DR의 그룹은 "비활성"이므로 sync.group.offsets가 안전하게 offset을 미리 채워 둘 수 있습니다.

**active-active(양방향)**에서는 이야기가 복잡해집니다.

상황고려 사항
양쪽에서 같은 group.id가 동작MM2는 활성 그룹의 offset을 덮어쓰지 않음 → 동기화가 의도적으로 스킵됨
양방향 복제 토픽primary.ordersdr.orders가 동시에 존재, 컨슈머가 어느 쪽을 읽는지 명확히 설계 필요
순환 복제 방지MM2의 토픽 prefix(primary., dr.)가 무한 루프를 막지만, 그룹 offset 동기화는 단방향으로만 신뢰

active-active를 운영한다면, 그룹 offset 동기화에 의존하기보다 페일오버 시점에 방법 A로 명시적으로 변환·commit하는 편이 예측 가능성이 높습니다. 양방향 복제와 컨슈머 토폴로지 설계는 시리즈 후속 편(또는 1편의 토폴로지 절)에서 더 깊이 다룹니다.

end offset과의 정합성 점검

변환 결과를 맹신하지 마세요. 페일오버 직전에 반드시 DR 파티션의 end offset(log end offset)과 비교해 sanity check를 해야 합니다.

변환 offset > DR end offset  → 잘못된 매핑(미래 위치). 그대로 seek하면 OffsetOutOfRange.
                              → end offset 또는 가장 가까운 유효 앵커로 clamp.
변환 offset < DR start offset → retention으로 이미 지워진 구간을 가리킴.
                              → earliest로 reset (대량 재처리 각오).

RemoteClusterUtils가 정상적으로 동작하면 이런 경계 위반은 드물지만, 복제 지연이 크거나 토픽이 재생성된 직후에는 충분히 발생할 수 있습니다. 페일오버 스크립트에 clamp 로직을 넣어 두면 운영 사고를 한 단계 막을 수 있습니다.


4. 반드시 알아야 할 한계 — 변환은 근사값이고 at-least-once다

여기서 솔직해질 필요가 있습니다. offset 변환은 정확하지 않습니다. 그리고 그것은 버그가 아니라 설계상의 본질입니다.

왜 근사값인가

offset-syncs는 모든 레코드가 아니라 주기적인 앵커만 기록합니다(부하 때문에). 따라서 변환은 "가장 가까운 알려진 앵커"를 기준으로 계산되고, 앵커와 앵커 사이의 레코드에 대해서는 약간 보수적인(앞쪽) 위치로 매핑됩니다. checkpoint 역시 emit.checkpoints.interval.seconds마다 한 번씩 찍히므로, 마지막 checkpoint 이후에 원본 컨슈머가 더 진행한 부분은 반영되지 않습니다.

그 결과 변환된 위치는 컨슈머의 **실제 마지막 처리 지점보다 약간 뒤(과거)**일 가능성이 높습니다. 이게 의미하는 바는:

페일오버 후 일부 레코드는 다시 처리된다 (중복). 이것은 정상이며, MM2의 의미론은 명시적으로 **at-least-once(최소 한 번)**입니다. exactly-once가 아닙니다.

그래서 컨슈머는 멱등(idempotent)해야 한다

페일오버 후 중복 처리를 안전하게 흡수하려면, 다운스트림 로직이 멱등해야 합니다.

나쁜 예 (멱등 X):
  주문 이벤트 수신 → 잔고에서 금액 "차감"
  → 같은 이벤트 재처리 시 이중 차감 (사고)
 
좋은 예 (멱등 O):
  주문 이벤트 수신 → 멱등 키(order_id)로 UPSERT
  → 같은 이벤트 재처리해도 결과 동일

전형적인 멱등화 기법:

기법설명
멱등 키 + UPSERT비즈니스 키로 중복 INSERT를 무시/갱신
처리 기록 테이블처리한 이벤트 ID를 저장하고 재등장 시 skip
조건부 쓰기"현재 상태가 X일 때만" 갱신하는 CAS류 연산
외부 시스템의 멱등 API결제 등은 idempotency-key 헤더 지원

손실 구간(un-replicated data) = 당신의 RPO

또 하나의 진실: 장애 순간에 아직 DR로 복제되지 못한 레코드는 변환으로도 구제할 수 없습니다. 그 양이 곧 **RPO(Recovery Point Objective)**입니다.

RPO ≈ 원본의 최신 produce 위치 − DR로 복제 완료된 위치
    ≈ 복제 지연(replication lag) × 장애 발생률

복제 지연이 평균 2초라면, 최악의 경우 약 2초어치 데이터를 잃을 수 있다는 뜻입니다(1편의 RPO 설계와 직결). 이 손실을 줄이려면 복제 지연 자체를 줄여야 하고, 손실을 0으로 만들고 싶다면 그건 더 이상 비동기 DR이 아니라 동기 복제 영역의 이야기입니다.

정확도는 주기로 튜닝한다

변환 정확도(= 중복 처리량)는 두 주기로 조절합니다.

설정효과트레이드오프
emit.checkpoints.interval.secondscheckpoint를 더 자주 찍어 변환을 최신화너무 작으면 connect 클러스터 부하·내부 토픽 트래픽 증가
sync.group.offsets.interval.secondsDR __consumer_offsets 동기화를 더 자주동일하게 부하 증가
offset-syncs(sync.offsets.interval.seconds)앵커를 촘촘히 → 변환 입자 향상내부 토픽 쓰기량 증가

권장 출발점은 30초입니다. RPO/중복 허용치가 빡빡하면 10초까지 낮추고, 토픽 수가 매우 많아 부하가 부담되면 60초로 올리며 모니터링하세요. 핵심 원칙: 더 촘촘한 checkpoint = 더 정확한 페일오버 = 더 적은 중복, 그러나 더 높은 부하.


5. 운영 체크리스트

페일오버 리허설 전에 아래를 점검하세요.

점검 항목확인 방법
checkpoint 토픽이 채워지는가DR에서 primary.checkpoints.internal 컨슘해 레코드 확인
대상 그룹이 groups 정규식에 포함되는가emit.checkpoints 로그 / checkpoint에 그룹 등장 여부
offset-syncs가 흐르는가mm2-offset-syncs.<target>.internal 에 매핑 적재 확인
sync.group.offsets 동작DR __consumer_offsets에 해당 그룹 offset 존재 확인
변환 결과가 합리적인가translateOffsets() 결과를 DR end offset과 비교 (음수/초과 없음)
컨슈머 멱등성동일 이벤트 2회 처리 테스트로 부작용 없음 검증
복제 지연(RPO)MirrorSourceConnector lag 메트릭 대시보드화

자주 겪는 함정: sync.group.offsets를 켰는데 DR에 offset이 안 보인다면, 십중팔구 그 그룹이 (1) groups 정규식에 안 잡혔거나, (2) 원본에서 한 번도 commit한 적이 없거나, (3) 이미 DR에서 활성 상태라 MM2가 덮어쓰기를 거부한 경우입니다.


마치며

  • DR 클러스터는 원본의 복사본이 아니라 독립된 로그입니다. 같은 레코드라도 offset이 다르므로, 원본 offset으로 DR에서 seek하면 누락 또는 대량 중복이 발생합니다.
  • MM2는 MirrorSourceConnector가 남기는 mm2-offset-syncs 매핑과 MirrorCheckpointConnector가 만드는 <source>.checkpoints.internal 을 통해 "원본 offset → DR offset" 변환을 자동화합니다.
  • 변환 값은 RemoteClusterUtils.translateOffsets()로 직접 읽거나, **sync.group.offsets.enabled=true**로 DR의 __consumer_offsets에 미리 써 두어 앱 수정 없이 페일오버할 수 있습니다.
  • 변환은 근사값이며 at-least-once입니다. 페일오버 후 약간의 중복 처리는 정상이므로, 컨슈머는 반드시 멱등해야 합니다. 아직 복제되지 못한 구간이 곧 당신의 RPO입니다.
  • 정확도와 부하는 emit.checkpoints.interval.seconds·sync.group.offsets.interval.seconds로 균형을 잡습니다(권장 출발점 30초).
  • 다음 4편에서는 지금까지 준비한 변환·동기화를 실제로 발동시키는 페일오버/페일백 런북 — 장애 감지, 트래픽 전환, 원복 절차, 리허설 시나리오 — 을 단계별로 다룹니다.

참고 자료


— Data Dynamics 엔지니어링 팀