Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

pause() during graceful shutdown corrupts committed offsets causing message reprocessing #404

Open
@C0mp4ct

Description

@C0mp4ct

Description

Callingpause() during graceful shutdown causes offset corruption whenenable.auto.commit: true. The consumer seeks to stale cached offsets and commits them, overwriting correctauto-committed offsets. This results in already-processed messages being reprocessed after restart or rebalance.

Environment Information

  • OS: macOS (reproducible on Linux)
  • Node Version: 23.x
  • NPM Version: 9.x / 10.x
  • C++ Toolchain: clang/g++
  • confluent-kafka-javascript version: 1.6.0

Steps to Reproduce

  1. Create consumer group with multiple workers (enable.auto.commit: true)
  2. Consume messages actively (e.g., 1000 msg/sec)
  3. During consumption, trigger a rebalances and initiate graceful shutdown that callspause():
    awaitconsumer.pause([{topic:'my-topic'}]);awaitwaitForInflightMessages();// Wait for handlers to completeawaitconsumer.disconnect();
  4. Trigger sequential shutdowns (e.g., rolling deployment) causing rebalances
  5. Remaining/restarted consumers reprocess already-consumed messages

Root Cause

Inlib/kafkajs/_consumer.js:

  1. #pauseInternal() (line 1838-1862): Reads from#lastConsumedOffsets cache
  2. Stale cache:#lastConsumedOffsets is never cleaned during rebalances (line 359-369), contains outdated offsets
  3. #seekInternal() commits (line 1821-1822): Seeks to stale offset and commits it immediately whenenable.auto.commit: true
  4. Race withauto-commit: Overwrites correct offsets stored via_offsetsStoreSingle() but not yetauto-committed
// Line 1848-1853: Uses stale cached offsetif(this.#lastConsumedOffsets.has(key)){constseekOffset=this.#lastConsumedOffsets.get(key);// ...seeks to seekOffset.offset + 1}// Line 1821-1822: Commits the stale offsetif(offsetsToCommit.length!==0&&this.#internalConfig['enable.auto.commit']){awaitthis.#commitOffsetsUntilNoStateErr(offsetsToCommit);// ← COMMITS STALE OFFSET}

Expected Behavior

Graceful shutdown should not regress committed offsets.Auto-commit should handle offset management based on_offsetsStoreSingle() calls.

Actual Behavior

pause() immediately commits stale cached offsets, overwriting correct offsets, causing message reprocessing after restart.

confluent-kafka-javascript Configuration Settings

{'group.id':'my-consumer-group','enable.auto.commit':true,'auto.commit.interval.ms':5000,// ... other settings}

Additional context

  • Issue occurs most frequently during rolling deployments with sequential shutdowns
  • Affects random partitions unpredictably (depends on which cached offsets are stale)
  • #lastConsumedOffsets Map has no lifecycle management - entries never expire or get cleaned during rebalances
  • Thepause() method's seek-and-commit behavior conflicts with graceful shutdown scenarios

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions


      [8]ページ先頭

      ©2009-2025 Movatter.jp