Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit03a853b

Browse files
committed
replacing command key exit with Notify
1 parent423f0f0 commit03a853b

File tree

2 files changed

+8
-6
lines changed

2 files changed

+8
-6
lines changed

‎rust-stream/src/bin/receive_offset_tracking.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use futures::StreamExt;
22
use rabbitmq_stream_client::error::StreamCreateError;
33
use rabbitmq_stream_client::types::{ByteCapacity,OffsetSpecification,ResponseCode};
4-
use std::io::stdin;
54
use std::sync::atomic::{AtomicI64,Ordering};
65
use std::sync::Arc;
6+
use tokio::sync::Notify;
77
use tokio::task;
88

99
#[tokio::main]
@@ -14,6 +14,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
1414
let received_messages =Arc::new(AtomicI64::new(-1));
1515
let first_offset =Arc::new(AtomicI64::new(-1));
1616
let last_offset =Arc::new(AtomicI64::new(-1));
17+
let notify_on_close =Arc::new(Notify::new());
1718
let create_response = environment
1819
.stream_creator()
1920
.max_length(ByteCapacity::GB(2))
@@ -41,8 +42,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4142
.await
4243
.unwrap();
4344

44-
println!("Starting consuming");
45-
println!("Press any key to close the consumer");
45+
println!("Started consuming");
4646

4747
letmut stored_offset:u64 = consumer.query_offset().await.unwrap_or_else(|_|0);
4848

@@ -59,13 +59,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5959

6060
let first_cloned_offset = first_offset.clone();
6161
let last_cloned_offset = last_offset.clone();
62+
let notify_on_close_cloned = notify_on_close.clone();
6263

6364
task::spawn(asyncmove{
6465
whileletSome(delivery) = consumer.next().await{
6566
let d = delivery.unwrap();
6667

6768
if first_offset.load(Ordering::Relaxed) == -1{
68-
println!("consuming firstmessage");
69+
println!("Firstmessage received");
6970
_ = first_offset.compare_exchange(
7071
first_offset.load(Ordering::Relaxed),
7172
d.offset()asi64,
@@ -85,13 +86,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
8586
last_offset.store(d.offset()asi64,Ordering::Relaxed);
8687
let handle = consumer.handle();
8788
_ = handle.close().await;
89+
notify_on_close_cloned.notify_one();
8890

8991
}
9092
}
9193
}
9294
});
9395

94-
_ =stdin().read_line(&mut"".to_string());
96+
notify_on_close.notified().await;
9597

9698
if first_cloned_offset.load(Ordering::Relaxed) != -1{
9799
println!(

‎rust-stream/src/bin/send_offset_tracking.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
6060
}
6161

6262
notify_on_send.notified().await;
63-
println!("Messages confirmed.");
63+
println!("Messages confirmed: True");
6464
producer.close().await?;
6565
Ok(())
6666
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp