[Kafka DR ③] DR의 핵심 난제 — Offset 변환과 컨슈머 페일오버
DR 클러스터는 독립된 로그라 같은 레코드라도 offset이 다릅니다. MirrorMaker 2가 offset-syncs와 checkpoint로 이 격차를 어떻게 메우는지, RemoteClusterUtils로 변환된 offset을 읽고 sync.group.offsets로 컨슈머를 무중단 페일오버시키는 방법을 정밀하게 정리합니다.
복제는 쉽습니다. 어려운 건 "그래서 컨슈머를 어디서부터 다시 읽게 할 것인가"입니다. 1번 클러스터에서 offset 1,000,000까지 읽던 컨슈머가 장애를 만나 DR 클러스터로 옮겨갔을 때, 똑같이 offset 1,000,000부터 읽으면 될까요? 절대 안 됩니다. DR 클러스터는 원본의 복사본이 아니라 독립적으로 채번된 별개의 로그이기 때문입니다. 이 한 줄이 Kafka DR 전체에서 가장 까다로운 문제를 만들어 냅니다. 이번 편은 그 문제 — offset 변환과 컨슈머 페일오버 — 만 집중해서 파헤칩니다.
이 글에서 배우는 것
- 왜 DR 클러스터의 offset은 원본과 다른가 (독립 로그 문제)
- MirrorMaker 2가 offset-syncs와 checkpoint로 offset 격차를 메우는 원리
RemoteClusterUtils/MirrorClient.translateOffsets()로 변환된 offset을 읽는 법sync.group.offsets.enabled로 앱 수정 없이 컨슈머 그룹을 페일오버시키는 법- 변환은 근사값이고 at-least-once라는 것 — RPO와 멱등성의 의미
- checkpoint 주기 튜닝과 운영상의 함정
이 글은 "Kafka DR 구축" 시리즈의 3편입니다. 1편(DR 아키텍처와 RPO/RTO 설계)과 2편(MirrorMaker 2로 토픽·데이터를 복제하기)에서 우리는 "데이터를 DR 클러스터로 흘려보내는" 데까지 왔습니다. 이번 편은 그 데이터를 컨슈머가 실제로 이어받는 마지막 한 걸음을 다룹니다.
1. 핵심 문제 — DR 클러스터는 독립된 로그다
같은 레코드, 다른 offset
Kafka의 offset은 파티션 로그 안에서의 물리적 위치입니다. 글로벌 ID가 아니라, 각 브로커가 자기 로그에 레코드를 append하면서 0부터 순차적으로 매기는 로컬 번호입니다. 핵심은 이것입니다: offset은 레코드의 신원(identity)이 아니라 위치(position)다.
MirrorMaker 2(이하 MM2)가 원본(primary)의 토픽을 DR 클러스터(dr)로 복제하면, DR 쪽에는 primary.orders라는 토픽이 새로 생기고 거기에 레코드가 다시 produce됩니다. 이 과정에서 DR 토픽의 offset은 DR 브로커가 새로 채번합니다. 원본 offset을 그대로 가져오는 게 아닙니다.
왜 어긋날까요? 여러 이유가 겹칩니다.
| 원인 | 설명 |
|---|---|
| 복제 시작 시점 | MM2가 토픽 중간부터 복제를 시작하면 DR offset은 0부터, 원본은 이미 수백만 |
| 로그 보존/압축 | 원본에서 retention으로 앞부분이 지워진 뒤 복제되면 시작 offset이 다름 |
| 메시지 누락/재시도 | producer 재시도, 트랜잭션 마커 등으로 양쪽 append 카운트가 미세하게 다를 수 있음 |
| 토픽 재생성 | 원본 토픽이 재생성되면 offset이 리셋되지만 DR은 이어서 증가 |
그래서 무슨 일이 벌어지나
컨슈머 그룹은 자기가 어디까지 읽었는지를 committed offset으로 __consumer_offsets에 기록합니다. 이 값은 원본 클러스터 기준의 숫자입니다. 장애가 나서 컨슈머를 DR로 옮길 때, 이 숫자를 그대로 DR에서 seek하면:
- DR offset이 더 크면 → 아직 처리 안 한 레코드를 건너뜀 (데이터 유실, 사실상 처리 누락)
- DR offset이 더 작으면 → 이미 처리한 레코드를 다시 읽음 (대량 중복 처리)
둘 다 운영 사고입니다. 그래서 필요한 게 **offset 변환(offset translation)**입니다. "원본에서 offset X였던 그 레코드는, DR에서는 offset Y다"라는 매핑을 만들어 두고, 페일오버 시 X를 Y로 바꿔서 컨슈머가 올바른 위치에서 이어받게 해야 합니다.
2. MM2는 어떻게 푸는가 — offset-syncs와 checkpoint
MM2는 이 매핑을 자동으로 만들어 줍니다. 두 개의 커넥터와 두 개의 내부 토픽이 협업합니다.
MirrorSourceConnector → mm2-offset-syncs 토픽
데이터 복제를 담당하는 MirrorSourceConnector는 레코드를 DR로 produce하면서, "원본 offset ↔ DR offset" 매핑 쌍을 별도의 내부 토픽 mm2-offset-syncs.<target>.internal(흔히 mm2-offset-syncs로 줄여 부름)에 주기적으로 기록합니다. 모든 레코드마다 기록하는 게 아니라, 일정 간격마다 앵커(anchor) 역할을 하는 매핑을 남깁니다.
이 토픽의 레코드는 본질적으로 이런 의미입니다:
topic-partition = primary.orders-0
upstreamOffset = 1_000_000 (원본에서의 offset)
downstreamOffset = 742_318 (DR에서의 offset)즉 "원본 offset 1,000,000인 레코드가 DR에서는 offset 742,318이다"라는 닻을 박아 둡니다.
MirrorCheckpointConnector → <source>.checkpoints.internal 토픽
MirrorCheckpointConnector는 한 단계 더 나아갑니다. 이 커넥터는 원본 클러스터의 컨슈머 그룹들이 어디까지 commit했는지를 주기적으로 읽어서, offset-syncs의 매핑을 이용해 DR 기준 offset으로 변환한 결과를 <source>.checkpoints.internal 토픽에 기록합니다.
checkpoint 레코드 하나는 대략 이런 정보를 담습니다:
| 필드 | 의미 |
|---|---|
| consumerGroupId | 어떤 컨슈머 그룹인지 (예: order-processor) |
| topicPartition | 어떤 토픽-파티션인지 (DR 쪽 이름, primary.orders-0) |
| upstreamOffset | 원본에서 그 그룹이 commit한 offset |
| downstreamOffset | 변환된 DR offset (= 페일오버 시 seek할 위치) |
| metadata | 부가 정보 |
정리하면 데이터 흐름은 이렇습니다.
설정 예시
2편에서 띄운 MM2 설정에 checkpoint 관련 옵션을 더하면 다음과 같습니다.
# mm2.properties (발췌)
clusters = primary, dr
primary->dr.enabled = true
# 데이터 복제 (MirrorSourceConnector) — 2편에서 다룸
primary->dr.topics = orders, payments, .*
# offset-syncs 토픽 위치 (기본은 target=dr)
primary->dr.offset-syncs.topic.location = target
# checkpoint (컨슈머 그룹 offset 변환) 활성화
primary->dr.emit.checkpoints.enabled = true
primary->dr.groups = order-processor, payment-.*
# checkpoint 발행 주기 — 작을수록 변환 정확도가 올라가지만 부하 증가
primary->dr.emit.checkpoints.interval.seconds = 30
# offset-syncs 발행 주기
primary->dr.sync.offsets.interval.seconds = 30
groups는 변환 대상 컨슈머 그룹을 정규식으로 지정합니다. 비워 두면 checkpoint가 만들어지지 않으니, DR로 페일오버시킬 그룹을 반드시 명시해야 합니다.
숫자로 따라가 보기
추상적인 설명만으로는 감이 잘 안 오니, 구체적인 수치로 변환 한 사이클을 따라가 봅시다. 파티션 orders-0 하나만 보겠습니다.
-
원본에서 복제는 원본 offset 250,000부터 시작됐고, 그 레코드가 DR
primary.orders-0의 offset 0에 들어갔습니다. 즉 상수 차이(delta) = 250,000입니다. -
MirrorSourceConnector가 주기적으로 앵커를 남깁니다.upstreamOffset(원본) downstreamOffset(DR) 250,000 0 300,000 50,000 1,000,000 750,000 -
컨슈머 그룹
order-processor가 원본에서 offset 1,000,000까지 commit했습니다. -
MirrorCheckpointConnector가 이 commit을 읽고, 가장 가까운 앵커(1,000,000 → 750,000)를 이용해 downstreamOffset = 750,000으로 변환해 checkpoint에 기록합니다. -
페일오버 시 컨슈머는 DR
primary.orders-0의 offset 750,000부터 재개합니다. 원본의 1,000,000을 그대로 썼다면 DR 로그를 한참 넘어가 버렸을 것입니다.
여기서 주의: 만약 commit 위치(예: 1,000,000)가 정확한 앵커가 아니라 앵커들 사이라면, MM2는 더 작은(과거) 앵커를 골라 약간 앞쪽으로 매핑합니다. 위 예에서 그룹이 320,000을 commit했다면, MM2는 300,000→50,000 앵커를 기준으로 보수적으로 변환합니다. 결과적으로 약간의 **재처리(중복)**가 생기는데, 이것이 바로 뒤(4장)에서 다룰 at-least-once의 본질입니다.
3. 변환된 offset 읽기 — RemoteClusterUtils와 sync.group.offsets
checkpoint 토픽이 채워졌다면, 페일오버 시 그 값을 꺼내 쓰는 방법은 두 가지입니다.
방법 A — RemoteClusterUtils로 직접 조회
RemoteClusterUtils(내부적으로 MirrorClient 사용)는 checkpoint 토픽을 읽어 "이 그룹이 DR에서 어디부터 읽어야 하는가"를 계산해 줍니다.
import org.apache.kafka.connect.mirror.RemoteClusterUtils;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.TopicPartition;
// DR 클러스터에 접속하는 설정
Properties props = new Properties();
props.put("bootstrap.servers", "dr-broker1:9092,dr-broker2:9092");
// "primary"에서 넘어온 컨슈머 그룹의 DR 기준 재개 위치를 계산
Map<TopicPartition, OffsetAndMetadata> translated =
RemoteClusterUtils.translateOffsets(
props,
"primary", // source 클러스터 별칭 (DR 토픽 prefix)
"order-processor", // 변환할 컨슈머 그룹
java.time.Duration.ofSeconds(10));
// translated 의 key 는 DR 토픽-파티션(primary.orders-0 등),
// value 는 그 그룹이 DR에서 seek/commit 해야 할 offset.MirrorClient를 직접 쓰면 더 세밀하게 제어할 수 있습니다.
import org.apache.kafka.connect.mirror.MirrorClient;
import org.apache.kafka.connect.mirror.MirrorClientConfig;
MirrorClient client = new MirrorClient(new MirrorClientConfig(/* configs */));
// 원본 토픽명을 DR 쪽 remote 토픽명으로
String remoteTopic = client.remoteTopic("orders", "primary"); // -> "primary.orders"
// 그룹의 변환된 offset 조회
Map<TopicPartition, OffsetAndMetadata> offsets =
client.translateOffsets(props, "primary", "order-processor", Duration.ofSeconds(10));이렇게 얻은 offset을 페일오버 스크립트에서 DR 클러스터의 __consumer_offsets에 commit하거나, 컨슈머가 시작할 때 seek()에 넣어 주면 됩니다.
방법 B — sync.group.offsets로 자동 동기화 (앱 수정 0)
방법 A는 코드를 짜야 합니다. 더 매끄러운 방법은 MM2가 변환된 offset을 DR 클러스터의 __consumer_offsets에 직접 써 주게 하는 것입니다.
# 변환된 컨슈머 그룹 offset을 DR의 __consumer_offsets에 자동 기록
primary->dr.sync.group.offsets.enabled = true
primary->dr.sync.group.offsets.interval.seconds = 30이 옵션을 켜면 MirrorCheckpointConnector가 checkpoint를 만드는 데서 멈추지 않고, 같은 그룹 ID로 DR 클러스터에 committed offset을 미리 동기화해 둡니다. 그 결과:
- 페일오버 시 애플리케이션을 전혀 고치지 않아도 됩니다. 컨슈머의
bootstrap.servers만 DR로 바꿔 재시작하면, 같은group.id로 붙는 순간 DR의__consumer_offsets에 이미 변환된 위치가 들어 있어 거기서부터 이어집니다. - 단, 그 그룹이 DR에서 활성 상태가 아닐 때만 동기화됩니다. 이미 DR에서 같은 그룹이 돌고 있으면(예: active-active) MM2는 덮어쓰지 않습니다 — 정상 동작 중인 컨슈머의 진행 상황을 망가뜨리지 않기 위함입니다.
| 비교 | 방법 A (translateOffsets) | 방법 B (sync.group.offsets) |
|---|---|---|
| 앱 수정 | 페일오버 로직/스크립트 필요 | 불필요 (bootstrap만 변경) |
| 제어 수준 | 세밀 (그룹·파티션별 결정) | 자동 (전체 일괄) |
| 적용 시점 | 페일오버 순간 명시적 호출 | 주기적으로 미리 동기화 |
| 추천 상황 | 커스텀 페일오버 오케스트레이션 | 단순·표준 active-passive DR |
대부분의 active-passive DR에서는 방법 B가 운영 부담이 가장 적습니다.
active-passive vs active-active에서의 차이
offset 변환은 **단방향 흐름(active-passive)**을 가정할 때 가장 깔끔합니다. 원본에서만 컨슈머가 돌고, DR은 대기 상태이며, 페일오버 순간에만 DR 컨슈머가 깨어납니다. 이때 DR의 그룹은 "비활성"이므로 sync.group.offsets가 안전하게 offset을 미리 채워 둘 수 있습니다.
**active-active(양방향)**에서는 이야기가 복잡해집니다.
| 상황 | 고려 사항 |
|---|---|
| 양쪽에서 같은 group.id가 동작 | MM2는 활성 그룹의 offset을 덮어쓰지 않음 → 동기화가 의도적으로 스킵됨 |
| 양방향 복제 토픽 | primary.orders와 dr.orders가 동시에 존재, 컨슈머가 어느 쪽을 읽는지 명확히 설계 필요 |
| 순환 복제 방지 | MM2의 토픽 prefix(primary., dr.)가 무한 루프를 막지만, 그룹 offset 동기화는 단방향으로만 신뢰 |
active-active를 운영한다면, 그룹 offset 동기화에 의존하기보다 페일오버 시점에 방법 A로 명시적으로 변환·commit하는 편이 예측 가능성이 높습니다. 양방향 복제와 컨슈머 토폴로지 설계는 시리즈 후속 편(또는 1편의 토폴로지 절)에서 더 깊이 다룹니다.
end offset과의 정합성 점검
변환 결과를 맹신하지 마세요. 페일오버 직전에 반드시 DR 파티션의 end offset(log end offset)과 비교해 sanity check를 해야 합니다.
변환 offset > DR end offset → 잘못된 매핑(미래 위치). 그대로 seek하면 OffsetOutOfRange.
→ end offset 또는 가장 가까운 유효 앵커로 clamp.
변환 offset < DR start offset → retention으로 이미 지워진 구간을 가리킴.
→ earliest로 reset (대량 재처리 각오).RemoteClusterUtils가 정상적으로 동작하면 이런 경계 위반은 드물지만, 복제 지연이 크거나 토픽이 재생성된 직후에는 충분히 발생할 수 있습니다. 페일오버 스크립트에 clamp 로직을 넣어 두면 운영 사고를 한 단계 막을 수 있습니다.
4. 반드시 알아야 할 한계 — 변환은 근사값이고 at-least-once다
여기서 솔직해질 필요가 있습니다. offset 변환은 정확하지 않습니다. 그리고 그것은 버그가 아니라 설계상의 본질입니다.
왜 근사값인가
offset-syncs는 모든 레코드가 아니라 주기적인 앵커만 기록합니다(부하 때문에). 따라서 변환은 "가장 가까운 알려진 앵커"를 기준으로 계산되고, 앵커와 앵커 사이의 레코드에 대해서는 약간 보수적인(앞쪽) 위치로 매핑됩니다. checkpoint 역시 emit.checkpoints.interval.seconds마다 한 번씩 찍히므로, 마지막 checkpoint 이후에 원본 컨슈머가 더 진행한 부분은 반영되지 않습니다.
그 결과 변환된 위치는 컨슈머의 **실제 마지막 처리 지점보다 약간 뒤(과거)**일 가능성이 높습니다. 이게 의미하는 바는:
페일오버 후 일부 레코드는 다시 처리된다 (중복). 이것은 정상이며, MM2의 의미론은 명시적으로 **at-least-once(최소 한 번)**입니다. exactly-once가 아닙니다.
그래서 컨슈머는 멱등(idempotent)해야 한다
페일오버 후 중복 처리를 안전하게 흡수하려면, 다운스트림 로직이 멱등해야 합니다.
나쁜 예 (멱등 X):
주문 이벤트 수신 → 잔고에서 금액 "차감"
→ 같은 이벤트 재처리 시 이중 차감 (사고)
좋은 예 (멱등 O):
주문 이벤트 수신 → 멱등 키(order_id)로 UPSERT
→ 같은 이벤트 재처리해도 결과 동일전형적인 멱등화 기법:
| 기법 | 설명 |
|---|---|
| 멱등 키 + UPSERT | 비즈니스 키로 중복 INSERT를 무시/갱신 |
| 처리 기록 테이블 | 처리한 이벤트 ID를 저장하고 재등장 시 skip |
| 조건부 쓰기 | "현재 상태가 X일 때만" 갱신하는 CAS류 연산 |
| 외부 시스템의 멱등 API | 결제 등은 idempotency-key 헤더 지원 |
손실 구간(un-replicated data) = 당신의 RPO
또 하나의 진실: 장애 순간에 아직 DR로 복제되지 못한 레코드는 변환으로도 구제할 수 없습니다. 그 양이 곧 **RPO(Recovery Point Objective)**입니다.
RPO ≈ 원본의 최신 produce 위치 − DR로 복제 완료된 위치
≈ 복제 지연(replication lag) × 장애 발생률복제 지연이 평균 2초라면, 최악의 경우 약 2초어치 데이터를 잃을 수 있다는 뜻입니다(1편의 RPO 설계와 직결). 이 손실을 줄이려면 복제 지연 자체를 줄여야 하고, 손실을 0으로 만들고 싶다면 그건 더 이상 비동기 DR이 아니라 동기 복제 영역의 이야기입니다.
정확도는 주기로 튜닝한다
변환 정확도(= 중복 처리량)는 두 주기로 조절합니다.
| 설정 | 효과 | 트레이드오프 |
|---|---|---|
emit.checkpoints.interval.seconds | checkpoint를 더 자주 찍어 변환을 최신화 | 너무 작으면 connect 클러스터 부하·내부 토픽 트래픽 증가 |
sync.group.offsets.interval.seconds | DR __consumer_offsets 동기화를 더 자주 | 동일하게 부하 증가 |
offset-syncs(sync.offsets.interval.seconds) | 앵커를 촘촘히 → 변환 입자 향상 | 내부 토픽 쓰기량 증가 |
권장 출발점은 30초입니다. RPO/중복 허용치가 빡빡하면 10초까지 낮추고, 토픽 수가 매우 많아 부하가 부담되면 60초로 올리며 모니터링하세요. 핵심 원칙: 더 촘촘한 checkpoint = 더 정확한 페일오버 = 더 적은 중복, 그러나 더 높은 부하.
5. 운영 체크리스트
페일오버 리허설 전에 아래를 점검하세요.
| 점검 항목 | 확인 방법 |
|---|---|
| checkpoint 토픽이 채워지는가 | DR에서 primary.checkpoints.internal 컨슘해 레코드 확인 |
대상 그룹이 groups 정규식에 포함되는가 | emit.checkpoints 로그 / checkpoint에 그룹 등장 여부 |
| offset-syncs가 흐르는가 | mm2-offset-syncs.<target>.internal 에 매핑 적재 확인 |
sync.group.offsets 동작 | DR __consumer_offsets에 해당 그룹 offset 존재 확인 |
| 변환 결과가 합리적인가 | translateOffsets() 결과를 DR end offset과 비교 (음수/초과 없음) |
| 컨슈머 멱등성 | 동일 이벤트 2회 처리 테스트로 부작용 없음 검증 |
| 복제 지연(RPO) | MirrorSourceConnector lag 메트릭 대시보드화 |
자주 겪는 함정:
sync.group.offsets를 켰는데 DR에 offset이 안 보인다면, 십중팔구 그 그룹이 (1)groups정규식에 안 잡혔거나, (2) 원본에서 한 번도 commit한 적이 없거나, (3) 이미 DR에서 활성 상태라 MM2가 덮어쓰기를 거부한 경우입니다.
마치며
- DR 클러스터는 원본의 복사본이 아니라 독립된 로그입니다. 같은 레코드라도 offset이 다르므로, 원본 offset으로 DR에서 seek하면 누락 또는 대량 중복이 발생합니다.
- MM2는
MirrorSourceConnector가 남기는mm2-offset-syncs매핑과MirrorCheckpointConnector가 만드는<source>.checkpoints.internal을 통해 "원본 offset → DR offset" 변환을 자동화합니다. - 변환 값은
RemoteClusterUtils.translateOffsets()로 직접 읽거나, **sync.group.offsets.enabled=true**로 DR의__consumer_offsets에 미리 써 두어 앱 수정 없이 페일오버할 수 있습니다. - 변환은 근사값이며 at-least-once입니다. 페일오버 후 약간의 중복 처리는 정상이므로, 컨슈머는 반드시 멱등해야 합니다. 아직 복제되지 못한 구간이 곧 당신의 RPO입니다.
- 정확도와 부하는
emit.checkpoints.interval.seconds·sync.group.offsets.interval.seconds로 균형을 잡습니다(권장 출발점 30초). - 다음 4편에서는 지금까지 준비한 변환·동기화를 실제로 발동시키는 페일오버/페일백 런북 — 장애 감지, 트래픽 전환, 원복 절차, 리허설 시나리오 — 을 단계별로 다룹니다.
참고 자료
- KIP-382: MirrorMaker 2.0 — https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0
- Apache Kafka 문서, Geo-Replication (Cross-Cluster Data Mirroring) — https://kafka.apache.org/documentation/#georeplication
RemoteClusterUtils/MirrorClient(org.apache.kafka.connect.mirror) — Apache Kafka Connect Mirror API- Apache Kafka 문서, Consumer Offset Management — https://kafka.apache.org/documentation/#consumerconfigs
— Data Dynamics 엔지니어링 팀