- Notifications
You must be signed in to change notification settings - Fork22
Open
Description
Description
Callingpause() during graceful shutdown causes offset corruption whenenable.auto.commit: true. The consumer seeks to stale cached offsets and commits them, overwriting correctauto-committed offsets. This results in already-processed messages being reprocessed after restart or rebalance.
Environment Information
- OS: macOS (reproducible on Linux)
- Node Version: 23.x
- NPM Version: 9.x / 10.x
- C++ Toolchain: clang/g++
- confluent-kafka-javascript version: 1.6.0
Steps to Reproduce
- Create consumer group with multiple workers (
enable.auto.commit: true) - Consume messages actively (e.g., 1000 msg/sec)
- During consumption, trigger a rebalances and initiate graceful shutdown that calls
pause():awaitconsumer.pause([{topic:'my-topic'}]);awaitwaitForInflightMessages();// Wait for handlers to completeawaitconsumer.disconnect();
- Trigger sequential shutdowns (e.g., rolling deployment) causing rebalances
- Remaining/restarted consumers reprocess already-consumed messages
Root Cause
Inlib/kafkajs/_consumer.js:
#pauseInternal()(line 1838-1862): Reads from#lastConsumedOffsetscache- Stale cache:
#lastConsumedOffsetsis never cleaned during rebalances (line 359-369), contains outdated offsets #seekInternal()commits (line 1821-1822): Seeks to stale offset and commits it immediately whenenable.auto.commit: true- Race withauto-commit: Overwrites correct offsets stored via
_offsetsStoreSingle()but not yetauto-committed
// Line 1848-1853: Uses stale cached offsetif(this.#lastConsumedOffsets.has(key)){constseekOffset=this.#lastConsumedOffsets.get(key);// ...seeks to seekOffset.offset + 1}// Line 1821-1822: Commits the stale offsetif(offsetsToCommit.length!==0&&this.#internalConfig['enable.auto.commit']){awaitthis.#commitOffsetsUntilNoStateErr(offsetsToCommit);// ← COMMITS STALE OFFSET}
Expected Behavior
Graceful shutdown should not regress committed offsets.Auto-commit should handle offset management based on_offsetsStoreSingle() calls.
Actual Behavior
pause() immediately commits stale cached offsets, overwriting correct offsets, causing message reprocessing after restart.
confluent-kafka-javascript Configuration Settings
{'group.id':'my-consumer-group','enable.auto.commit':true,'auto.commit.interval.ms':5000,// ... other settings}
Additional context
- Issue occurs most frequently during rolling deployments with sequential shutdowns
- Affects random partitions unpredictably (depends on which cached offsets are stale)
#lastConsumedOffsetsMap has no lifecycle management - entries never expire or get cleaned during rebalances- The
pause()method's seek-and-commit behavior conflicts with graceful shutdown scenarios
Metadata
Metadata
Assignees
Labels
No labels