[Kafka DR 3] The Hard Part of DR — Offset Translation and Consumer Failover
A DR cluster is an independent log, so the same record carries a different offset there. This post dissects how MirrorMaker 2 bridges that gap with offset-syncs and checkpoints, how to read translated offsets via RemoteClusterUtils, and how sync.group.offsets fails consumers over with zero app changes.
Replication is easy. The hard part is "where exactly do we let the consumer resume reading?" A consumer that had read up to offset 1,000,000 on the primary cluster fails over to the DR cluster — can it just resume at offset 1,000,000 there? Absolutely not. The DR cluster is not a byte-for-byte copy of the original; it is a separate log with its own, independently assigned offsets. That single fact creates the single hardest problem in all of Kafka DR. This installment focuses on that one problem only — offset translation and consumer failover.
What you'll learn in this post
- Why DR offsets differ from the primary (the independent-log problem)
- How MirrorMaker 2 bridges the offset gap with offset-syncs and checkpoints
- Reading translated offsets via
RemoteClusterUtils/MirrorClient.translateOffsets()- Failing consumer groups over with zero app changes using
sync.group.offsets.enabled- Why translation is approximate and at-least-once — what that means for RPO and idempotency
- Tuning checkpoint periodicity and the operational traps to avoid
This is Part 3 of the "Building Kafka DR" series. In Part 1 (DR architecture and RPO/RTO design) and Part 2 (replicating topics and data with MirrorMaker 2), we got as far as "streaming data into the DR cluster." This installment covers the final step — letting consumers actually pick up where they left off.
1. The Core Problem — A DR Cluster Is an Independent Log
Same record, different offset
A Kafka offset is a physical position within a partition log. It is not a global ID; it is a local number each broker assigns sequentially from 0 as it appends records to its own log. The key insight: an offset is the record's position, not its identity.
When MirrorMaker 2 (MM2) replicates a topic from the primary (primary) to the DR cluster (dr), a new topic named primary.orders appears on the DR side and records are re-produced into it. During that process, the offsets in the DR topic are freshly assigned by the DR broker. The original offsets are not carried over.
Why do they diverge? Several causes overlap.
| Cause | Explanation |
|---|---|
| Replication start time | If MM2 begins mid-topic, DR offsets start at 0 while the primary is already in the millions |
| Retention / compaction | If the head of the primary log is purged by retention before replication, the start offset differs |
| Gaps / retries | Producer retries, transaction markers, etc. can make the append counts differ slightly on each side |
| Topic recreation | Recreating the primary topic resets its offsets, but DR keeps incrementing |
What actually goes wrong
A consumer group records how far it has read as a committed offset in __consumer_offsets. That value is a number relative to the primary cluster. If, during failover, you seek to that same number on DR:
- If the DR offset is larger → you skip records not yet processed (data loss, effectively dropped processing)
- If the DR offset is smaller → you re-read already-processed records (massive duplicate processing)
Both are production incidents. That is why we need offset translation — a mapping that says "the record that was at offset X on the primary is at offset Y on DR," so that at failover X is rewritten to Y and the consumer resumes at the correct position.
2. How MM2 Solves It — offset-syncs and checkpoints
MM2 builds this mapping automatically. Two connectors and two internal topics cooperate.
MirrorSourceConnector → the mm2-offset-syncs topic
MirrorSourceConnector, which handles data replication, produces records to DR and, as it does so, periodically records "primary offset ↔ DR offset" mapping pairs into a separate internal topic, mm2-offset-syncs.<target>.internal (commonly shortened to mm2-offset-syncs). It does not write a mapping for every record; at fixed intervals it leaves an anchor mapping.
A record in this topic essentially means:
topic-partition = primary.orders-0
upstreamOffset = 1_000_000 (offset on the primary)
downstreamOffset = 742_318 (offset on DR)That is, it drives an anchor stating "the record at primary offset 1,000,000 is at DR offset 742,318."
MirrorCheckpointConnector → the <source>.checkpoints.internal topic
MirrorCheckpointConnector goes one step further. This connector periodically reads how far the primary cluster's consumer groups have committed, uses the offset-syncs mappings to translate those into DR-relative offsets, and writes the result into the <source>.checkpoints.internal topic.
A single checkpoint record carries roughly this information:
| Field | Meaning |
|---|---|
| consumerGroupId | Which consumer group (e.g. order-processor) |
| topicPartition | Which topic-partition (DR-side name, primary.orders-0) |
| upstreamOffset | The offset that group committed on the primary |
| downstreamOffset | The translated DR offset (= the seek position at failover) |
| metadata | Additional info |
Putting it together, the data flow looks like this.
Configuration example
Adding checkpoint-related options to the MM2 configuration from Part 2 looks like this.
# mm2.properties (excerpt)
clusters = primary, dr
primary->dr.enabled = true
# Data replication (MirrorSourceConnector) — covered in Part 2
primary->dr.topics = orders, payments, .*
# Location of the offset-syncs topic (default is target=dr)
primary->dr.offset-syncs.topic.location = target
# Enable checkpoints (consumer group offset translation)
primary->dr.emit.checkpoints.enabled = true
primary->dr.groups = order-processor, payment-.*
# Checkpoint emit interval — smaller is more accurate but heavier
primary->dr.emit.checkpoints.interval.seconds = 30
# offset-syncs emit interval
primary->dr.sync.offsets.interval.seconds = 30
groupsspecifies the consumer groups to translate, as a regex. If left empty, no checkpoints are produced, so you must explicitly name the groups you intend to fail over to DR.
Walking through it with numbers
The abstract description only goes so far, so let's trace one translation cycle with concrete values. We'll look at a single partition, orders-0.
-
On the primary, replication started at primary offset 250,000, and that record landed at offset 0 in DR's
primary.orders-0. So the constant delta = 250,000. -
MirrorSourceConnectorperiodically leaves anchors.upstreamOffset (primary) downstreamOffset (DR) 250,000 0 300,000 50,000 1,000,000 750,000 -
The consumer group
order-processorcommitted up to offset 1,000,000 on the primary. -
MirrorCheckpointConnectorreads this commit and, using the nearest anchor (1,000,000 → 750,000), translates it to downstreamOffset = 750,000 and records it in the checkpoint. -
At failover, the consumer resumes from offset 750,000 in DR's
primary.orders-0. Had it used the primary's 1,000,000 directly, it would have shot far past the end of the DR log.
A caveat: if the commit position (say 1,000,000) is not an exact anchor but lands between anchors, MM2 picks the smaller (earlier) anchor and maps slightly forward of it. In the example above, if the group had committed 320,000, MM2 would translate conservatively against the 300,000 → 50,000 anchor. The result is a bit of reprocessing (duplicates) — which is precisely the at-least-once nature we discuss next (Section 4).
3. Reading Translated Offsets — RemoteClusterUtils and sync.group.offsets
Once the checkpoint topic is populated, there are two ways to consume those values at failover.
Option A — query directly with RemoteClusterUtils
RemoteClusterUtils (which uses MirrorClient under the hood) reads the checkpoint topic and computes "where this group should start reading on DR."
import org.apache.kafka.connect.mirror.RemoteClusterUtils;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.TopicPartition;
// Configuration that connects to the DR cluster
Properties props = new Properties();
props.put("bootstrap.servers", "dr-broker1:9092,dr-broker2:9092");
// Compute the DR-relative resume position for a group that came from "primary"
Map<TopicPartition, OffsetAndMetadata> translated =
RemoteClusterUtils.translateOffsets(
props,
"primary", // source cluster alias (DR topic prefix)
"order-processor", // consumer group to translate
java.time.Duration.ofSeconds(10));
// Keys in `translated` are DR topic-partitions (e.g. primary.orders-0),
// values are the offsets that group should seek/commit on DR.Using MirrorClient directly gives finer control.
import org.apache.kafka.connect.mirror.MirrorClient;
import org.apache.kafka.connect.mirror.MirrorClientConfig;
MirrorClient client = new MirrorClient(new MirrorClientConfig(/* configs */));
// Map a primary topic name to its remote DR topic name
String remoteTopic = client.remoteTopic("orders", "primary"); // -> "primary.orders"
// Query the group's translated offsets
Map<TopicPartition, OffsetAndMetadata> offsets =
client.translateOffsets(props, "primary", "order-processor", Duration.ofSeconds(10));In your failover script, commit these offsets into the DR cluster's __consumer_offsets, or feed them into seek() when the consumer starts.
Option B — auto-sync with sync.group.offsets (zero app changes)
Option A requires writing code. The smoother approach is to let MM2 write the translated offsets directly into the DR cluster's __consumer_offsets.
# Automatically write translated consumer group offsets into DR's __consumer_offsets
primary->dr.sync.group.offsets.enabled = true
primary->dr.sync.group.offsets.interval.seconds = 30With this enabled, MirrorCheckpointConnector does not stop at producing checkpoints; it also pre-syncs committed offsets onto the DR cluster under the same group ID. As a result:
- At failover you need no application changes at all. Just point the consumer's
bootstrap.serversat DR and restart; the moment it connects with the samegroup.id, the translated position is already in DR's__consumer_offsets, and it resumes from there. - However, syncing happens only when that group is not active on DR. If the same group is already running on DR (e.g. active-active), MM2 will not overwrite it — to avoid clobbering the progress of a healthy, running consumer.
| Comparison | Option A (translateOffsets) | Option B (sync.group.offsets) |
|---|---|---|
| App changes | Failover logic/script required | None (just change bootstrap) |
| Control | Fine-grained (per group/partition) | Automatic (bulk) |
| When applied | Explicit call at failover moment | Pre-synced periodically |
| Best for | Custom failover orchestration | Simple, standard active-passive DR |
For most active-passive DR, Option B carries the least operational burden.
The difference under active-passive vs active-active
Offset translation is cleanest when you assume a one-directional flow (active-passive). Consumers run only on the primary, DR is on standby, and DR consumers wake up only at the moment of failover. At that point the group on DR is "inactive," so sync.group.offsets can safely pre-fill its offsets.
Under active-active (bidirectional), the story gets complicated.
| Situation | Consideration |
|---|---|
| Same group.id runs on both sides | MM2 will not overwrite an active group's offsets → the sync is intentionally skipped |
| Bidirectionally replicated topics | primary.orders and dr.orders coexist; you must clearly design which one consumers read |
| Preventing cyclic replication | MM2's topic prefixes (primary., dr.) prevent infinite loops, but group offset sync is only trustworthy one-directionally |
If you run active-active, rather than relying on group offset sync it is more predictable to translate and commit explicitly via Option A at the failover moment. Bidirectional replication and consumer topology design are covered more deeply in a later installment (or in Part 1's topology section).
Sanity-checking against end offsets
Do not blindly trust the translation result. Just before failover you must compare it against the DR partition's end offset (log end offset) as a sanity check.
translated offset > DR end offset → bad mapping (a future position). Seeking there yields OffsetOutOfRange.
→ clamp to the end offset or nearest valid anchor.
translated offset < DR start offset → points at a range already purged by retention.
→ reset to earliest (be ready for heavy reprocessing).When RemoteClusterUtils works normally these boundary violations are rare, but they can certainly occur under large replication lag or right after a topic is recreated. Putting clamp logic in your failover script adds one more layer against a production incident.
4. What You Must Know — Translation Is Approximate and At-Least-Once
Here we need to be honest. Offset translation is not exact. And that is not a bug — it is inherent to the design.
Why it's approximate
offset-syncs records only periodic anchors, not every record (for load reasons). So translation is computed against the "nearest known anchor," and records between anchors are mapped to a slightly conservative (earlier) position. Checkpoints, likewise, are stamped only once per emit.checkpoints.interval.seconds, so any progress the primary consumer made after the last checkpoint is not reflected.
As a result, the translated position is likely to be slightly behind (in the past of) the consumer's true last-processed point. What that means:
After failover, some records will be reprocessed (duplicates). This is normal, and MM2's semantics are explicitly at-least-once — not exactly-once.
So consumers must be idempotent
To safely absorb post-failover duplicates, your downstream logic must be idempotent.
Bad (not idempotent):
receive order event → "subtract" amount from balance
→ reprocessing the same event double-subtracts (incident)
Good (idempotent):
receive order event → UPSERT keyed by idempotency key (order_id)
→ reprocessing the same event yields the same resultTypical idempotency techniques:
| Technique | Description |
|---|---|
| Idempotency key + UPSERT | Ignore/update duplicate INSERTs by business key |
| Processed-events table | Store processed event IDs and skip on reappearance |
| Conditional write | CAS-style "update only if current state is X" |
| Idempotent external API | Payments etc. support an idempotency-key header |
Un-replicated data at failover = your RPO
Another truth: records that had not yet replicated to DR at the moment of failure cannot be rescued by translation. That volume is exactly your RPO (Recovery Point Objective).
RPO ≈ primary's latest produce position − position fully replicated to DR
≈ replication lag × failure incidenceIf average replication lag is 2 seconds, worst case you can lose roughly 2 seconds' worth of data (this ties directly to the RPO design in Part 1). To shrink this loss you must shrink replication lag itself; to drive the loss to zero you have left asynchronous DR entirely and entered synchronous-replication territory.
Tune accuracy via intervals
Translation accuracy (i.e. how much gets reprocessed) is controlled by two intervals.
| Setting | Effect | Trade-off |
|---|---|---|
emit.checkpoints.interval.seconds | Stamp checkpoints more often to keep translation fresh | Too small increases Connect cluster load and internal-topic traffic |
sync.group.offsets.interval.seconds | Sync DR __consumer_offsets more often | Same — increased load |
offset-syncs (sync.offsets.interval.seconds) | Denser anchors → finer translation granularity | More internal-topic writes |
A good starting point is 30 seconds. If your RPO/duplicate tolerance is tight, lower it toward 10s; if you have a very large number of topics and load is a concern, raise it to 60s and monitor. The core principle: denser checkpoints = more accurate failover = fewer duplicates, but higher load.
5. Operational Checklist
Verify the following before a failover rehearsal.
| Check | How to verify |
|---|---|
| Are checkpoint topics being populated? | Consume primary.checkpoints.internal on DR and inspect records |
Are target groups covered by the groups regex? | emit.checkpoints logs / group presence in checkpoints |
| Are offset-syncs flowing? | Confirm mappings land in mm2-offset-syncs.<target>.internal |
Is sync.group.offsets working? | Confirm the group's offsets exist in DR __consumer_offsets |
| Are translation results sane? | Compare translateOffsets() output against DR end offsets (no negatives/overshoot) |
| Consumer idempotency | Validate no side effects by processing the same event twice |
| Replication lag (RPO) | Dashboard the MirrorSourceConnector lag metric |
A common trap: if you enabled
sync.group.offsetsbut no offsets show up on DR, nine times out of ten the group (1) was not matched by thegroupsregex, (2) never committed on the primary, or (3) is already active on DR so MM2 refused to overwrite.
Wrapping up
- A DR cluster is not a copy of the primary — it is an independent log. The same record has a different offset, so seeking on DR with a primary offset causes either dropped records or massive duplicates.
- MM2 automates "primary offset → DR offset" translation through the
mm2-offset-syncsmappings left byMirrorSourceConnectorand the<source>.checkpoints.internalrecords produced byMirrorCheckpointConnector. - Read the translated values directly with
RemoteClusterUtils.translateOffsets(), or pre-write them into DR's__consumer_offsetswithsync.group.offsets.enabled=trueso you can fail over with zero app changes. - Translation is approximate and at-least-once. Some reprocessing after failover is normal, so consumers must be idempotent. Whatever has not yet replicated is exactly your RPO.
- Balance accuracy against load with
emit.checkpoints.interval.secondsandsync.group.offsets.interval.seconds(start at 30s). - In Part 4, we put all this translation and syncing to work in an actual failover/failback runbook — failure detection, traffic cutover, fallback procedure, and rehearsal scenarios — step by step.
References
- KIP-382: MirrorMaker 2.0 — https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0
- Apache Kafka Documentation, Geo-Replication (Cross-Cluster Data Mirroring) — https://kafka.apache.org/documentation/#georeplication
RemoteClusterUtils/MirrorClient(org.apache.kafka.connect.mirror) — Apache Kafka Connect Mirror API- Apache Kafka Documentation, Consumer Offset Management — https://kafka.apache.org/documentation/#consumerconfigs
— The Data Dynamics Engineering Team