Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitdf4f362

Browse files
authored
fix(blackhole sink): implement end-to-end acknowledgements (#24283)
Addresses#24281The blackhole sink was consuming events without updating finalizer status,causing sources that depend on acknowledgements (like aws_s3 with SQS) tonever receive delivery confirmation. This resulted in SQS messages neverbeing deleted and queue depth growing indefinitely.Added proper finalizer handling by taking finalizers from events andmarking them as delivered after processing, matching the pattern usedby other sinks like console.Co-authored-by: sanjams2 <sanjams2@users.noreply.github.com>
1 parentb9ad9b3 commitdf4f362

File tree

3 files changed

+11
-4
lines changed

3 files changed

+11
-4
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fixed the blackhole sink to properly implement end-to-end acknowledgements. Previously, the sink consumed events without updating finalizer status, causing sources that depend on acknowledgements (like`aws_s3` with SQS) to never delete processed messages from the queue.
2+
3+
authors: sanjams2

‎src/sinks/blackhole/mod.rs‎

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ mod tests {
1414
blackhole::{config::BlackholeConfig, sink::BlackholeSink},
1515
},
1616
test_util::{
17-
components::run_and_assert_nonsending_sink_compliance, random_events_with_stream,
17+
components::{SINK_TAGS, run_and_assert_sink_compliance},
18+
random_events_with_stream,
1819
},
1920
};
2021

@@ -29,6 +30,6 @@ mod tests {
2930
let sink =VectorSink::Stream(Box::new(sink));
3031

3132
let(_input_lines, events) =random_events_with_stream(100,10,None);
32-
run_and_assert_nonsending_sink_compliance(sink, events,&[]).await;
33+
run_and_assert_sink_compliance(sink, events,&SINK_TAGS).await;
3334
}
3435
}

‎src/sinks/blackhole/sink.rs‎

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use vector_lib::{
2121
};
2222

2323
usecrate::{
24-
event::{EventArray,EventContainer},
24+
event::{EventArray,EventContainer,EventStatus,Finalizable},
2525
sinks::{blackhole::config::BlackholeConfig, util::StreamSink},
2626
};
2727

@@ -82,7 +82,7 @@ impl StreamSink<EventArray> for BlackholeSink {
8282
});
8383
}
8484

85-
whileletSome(events) = input.next().await{
85+
whileletSome(mutevents) = input.next().await{
8686
ifletSome(rate) =self.config.rate{
8787
let factor:f32 =1.0 / rateasf32;
8888
let secs:f32 = factor*(events.len()asf32);
@@ -98,6 +98,9 @@ impl StreamSink<EventArray> for BlackholeSink {
9898
.total_raw_bytes
9999
.fetch_add(message_len.get(),Ordering::AcqRel);
100100

101+
let finalizers = events.take_finalizers();
102+
finalizers.update_status(EventStatus::Delivered);
103+
101104
events_sent.emit(CountByteSize(events.len(), message_len));
102105
bytes_sent.emit(ByteSize(message_len.get()));
103106
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp