Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite1d1c51

Browse files
authored
Merge pull request#206 from r3h0/master
Added Kafka 3.1.0 to integration tests and changed misc error handling
2 parentsaf3b346 +03612c0 commite1d1c51

22 files changed

+186
-170
lines changed

‎Cargo.toml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,12 @@ thiserror = "1.0.31"
2626
tracing ="0.1.34"
2727

2828
[dev-dependencies]
29-
anyhow ="1.0.57"
30-
env_logger ="0.9.0"
3129
getopts ="0.2.21"
32-
lazy_static ="1.4.0"
33-
log ="0.4.17"
30+
tracing-subscriber ="0.3"
31+
time ="0.3.7"
3432
rand ="0.8.5"
35-
time ="0.3.9"
33+
lazy_static ="1.4.0"
34+
anyhow ="1.0.55"
3635

3736
[features]
3837
default = ["snappy","gzip","security"]

‎README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ See kafka-rust's `Cargo.toml` and [cargo'sdocumentation](http://doc.crates.io/ma
5959

6060
##Supported Kafka version
6161

62-
`kafka-rust` is tested for compatibility with Kafka 0.8.2and newer. However,
62+
`kafka-rust` is tested for compatibility witha select fewKafkaversions from0.8.2to 3.1.0. However,
6363
not all features from Kafka 0.9 and newer are supported yet.
6464

6565
##Examples

‎examples/console-consumer.rs

Lines changed: 51 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,33 @@ use std::time::Duration;
33
use std::{env, process};
44
//use std::ascii::AsciiExt;
55

6+
use anyhow::{bail,Result};
67
use kafka::consumer::{Consumer,FetchOffset,GroupOffsetStorage};
78

9+
use tracing::{error, info};
10+
use tracing_subscriber;
11+
812
/// This is a very simple command line application reading from a
913
/// specific kafka topic and dumping the messages to standard output.
1014
fnmain(){
11-
env_logger::init();
15+
tracing_subscriber::fmt::init();
1216

1317
let cfg =matchConfig::from_cmdline(){
1418
Ok(cfg) => cfg,
1519
Err(e) =>{
16-
println!("{}", e);
20+
error!("{}", e);
1721
process::exit(1);
1822
}
1923
};
24+
info!("Starting consumer with the following configuration: {:?}", cfg);
25+
2026
ifletErr(e) =process(cfg){
21-
println!("{}", e);
27+
error!("{}", e);
2228
process::exit(1);
2329
}
2430
}
2531

26-
fnprocess(cfg:Config) ->Result<(),&'staticstr>{
32+
fnprocess(cfg:Config) ->Result<()>{
2733
letmut c ={
2834
letmut cb =Consumer::from_hosts(cfg.brokers)
2935
.with_group(cfg.group)
@@ -44,7 +50,6 @@ fn process(cfg: Config) -> Result<(), &'static str> {
4450
letmut stdout = stdout.lock();
4551
letmut buf =Vec::with_capacity(1024);
4652

47-
let do_commit = !cfg.no_commit;
4853
loop{
4954
for msin c.poll().unwrap().iter(){
5055
for min ms.messages(){
@@ -55,38 +60,32 @@ fn process(cfg: Config) -> Result<(), &'static str> {
5560
buf.extend_from_slice(m.value);
5661
buf.push(b'\n');
5762
// ~ write to output channel
58-
stdout.write_all(&buf);
63+
stdout.write_all(&buf)?;
5964
}
6065
let _ = c.consume_messageset(ms);
6166
}
62-
if do_commit{
63-
c.commit_consumed();
67+
if cfg.commit{
68+
c.commit_consumed()?;
69+
}
70+
if !cfg.follow{
71+
returnOk(());
6472
}
6573
}
6674
}
6775

68-
// --------------------------------------------------------------------
69-
// error_chain! {
70-
// foreign_links {
71-
// Kafka(kafka::error::Error);
72-
// Io(io::Error);
73-
// Opt(getopts::Fail);
74-
// }
75-
// }
76-
77-
// --------------------------------------------------------------------
78-
76+
#[derive(Debug)]
7977
structConfig{
8078
brokers:Vec<String>,
8179
group:String,
8280
topics:Vec<String>,
83-
no_commit:bool,
84-
offset_storage:GroupOffsetStorage,
81+
commit:bool,
82+
offset_storage:Option<GroupOffsetStorage>,
8583
fallback_offset:FetchOffset,
84+
follow:bool,
8685
}
8786

8887
implConfig{
89-
fnfrom_cmdline() ->Result<Config,&'staticstr>{
88+
fnfrom_cmdline() ->Result<Config>{
9089
let args:Vec<_> = env::args().collect();
9190
letmut opts = getopts::Options::new();
9291
opts.optflag("h","help","Print this help screen");
@@ -98,26 +97,36 @@ impl Config {
9897
);
9998
opts.optopt("","topics","Specify topics (comma separated)","NAMES");
10099
opts.optopt("","group","Specify the consumer group","NAME");
101-
opts.optflag("","no-commit","Do not commit group offsets");
102-
opts.optopt(
103-
"",
104-
"storage",
105-
"Specify the offset store [zookeeper, kafka]",
106-
"STORE",
107-
);
100+
101+
opts.optflag("","commit","Commit group offsets");
102+
opts.optopt("","storage","Specify the offset store [zookeeper, kafka]","STORE");
103+
108104
opts.optflag(
109105
"",
110106
"earliest",
111107
"Fall back to the earliest offset (when no group offset available)",
112108
);
109+
opts.optflag("","follow","Continue reading from the topic indefinitely");
110+
111+
macro_rules! on_error{
112+
($name:expr) =>{{
113+
let brief = format!("{} [options]", args[0]);
114+
println!("{}", opts.usage(&brief));
115+
bail!($name);
116+
}};
117+
}
118+
if args.len() ==1{
119+
on_error!("no arguments provided");
120+
}
113121

114122
let m =match opts.parse(&args[1..]){
115123
Ok(m) => m,
116-
Err(e) => std::panic::panic_any(e.to_string()),
124+
Err(e) =>{
125+
on_error!(format!("argument parsing encountered an error: {}", e.to_string()))
126+
}
117127
};
118128
if m.opt_present("help"){
119-
let brief =format!("{} [options]", args[0]);
120-
opts.usage(&brief);
129+
on_error!("help requested");
121130
}
122131

123132
macro_rules! required_list{
@@ -132,7 +141,7 @@ impl Config {
132141
.collect(),
133142
};
134143
if xs.is_empty(){
135-
format!("Invalid--{} specified!", opt);
144+
on_error!(format!("missing argument--{}", opt));
136145
}
137146
xs
138147
}};
@@ -141,27 +150,29 @@ impl Config {
141150
let brokers =required_list!("brokers");
142151
let topics =required_list!("topics");
143152

144-
letmut offset_storage =GroupOffsetStorage::Zookeeper;
145-
ifletSome(s) = m.opt_str("storage"){
153+
let offset_storage =ifletSome(s) = m.opt_str("storage"){
146154
if s.eq_ignore_ascii_case("zookeeper"){
147-
offset_storage =GroupOffsetStorage::Zookeeper;
155+
Some(GroupOffsetStorage::Zookeeper)
148156
}elseif s.eq_ignore_ascii_case("kafka"){
149-
offset_storage =GroupOffsetStorage::Kafka;
157+
Some(GroupOffsetStorage::Kafka)
150158
}else{
151-
format!("unknown offset store: {}", s);
159+
on_error!(format!("unknown offset store: {}", s));
152160
}
153-
}
161+
}else{
162+
None
163+
};
154164
Ok(Config{
155165
brokers,
156166
group: m.opt_str("group").unwrap_or_default(),
157167
topics,
158-
no_commit: m.opt_present("no-commit"),
168+
commit: m.opt_present("commit"),
159169
offset_storage,
160170
fallback_offset:if m.opt_present("earliest"){
161171
FetchOffset::Earliest
162172
}else{
163173
FetchOffset::Latest
164174
},
175+
follow: m.opt_present("follow"),
165176
})
166177
}
167178
}

‎examples/console-producer.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
1-
use anyhow::{ensure,Result};
1+
use anyhow::{bail,ensure,Result};
22
use std::fs::File;
3-
use std::io::{self,stderr, stdin,BufRead,BufReader,Write};
3+
use std::io::{stderr, stdin,BufRead,BufReader,Write};
44
use std::ops::{Deref,DerefMut};
55
use std::str::FromStr;
66
use std::time::Duration;
77
use std::{env, process};
88

99
use anyhow::anyhow;
10-
use kafka::Error;
1110

1211
use kafka::client::{
1312
Compression,KafkaClient,RequiredAcks,DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS,
@@ -21,7 +20,7 @@ use kafka::producer::{AsBytes, Producer, Record, DEFAULT_ACK_TIMEOUT_MILLIS};
2120
/// Alternatively, messages can be read from an input file and sent do
2221
/// kafka in batches (the typical use-case).
2322
fnmain(){
24-
env_logger::init();
23+
tracing_subscriber::fmt::init();
2524

2625
let cfg =matchConfig::from_cmdline(){
2726
Ok(cfg) => cfg,
@@ -216,13 +215,20 @@ impl Config {
216215
"MILLIS",
217216
);
218217

218+
macro_rules! on_error{
219+
($name:expr) =>{{
220+
let brief = format!("{} [options]", args[0]);
221+
println!("{}", opts.usage(&brief));
222+
bail!($name);
223+
}};
224+
}
225+
219226
let m =match opts.parse(&args[1..]){
220227
Ok(m) => m,
221-
Err(e) =>returnErr(anyhow!("Error {:?}", e)),
228+
Err(e) =>on_error!(format!("Error {:?}", e)),
222229
};
223230
if m.opt_present("help"){
224-
let brief =format!("{} [options]", args[0]);
225-
returnErr(anyhow!("opts usage: {:?}", opts.usage(&brief)));
231+
on_error!("help")
226232
}
227233
Ok(Config{
228234
brokers: m

‎examples/example-consume.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use kafka::error::Error as KafkaError;
66
/// that messages must be marked and committed as consumed to ensure
77
/// only once delivery.
88
fnmain(){
9-
env_logger::init();
9+
tracing_subscriber::fmt::init();
1010

1111
let broker ="localhost:9092".to_owned();
1212
let topic ="my-topic".to_owned();
@@ -22,7 +22,7 @@ fn consume_messages(group: String, topic: String, brokers: Vec<String>) -> Resul
2222
.with_topic(topic)
2323
.with_group(group)
2424
.with_fallback_offset(FetchOffset::Earliest)
25-
.with_offset_storage(GroupOffsetStorage::Kafka)
25+
.with_offset_storage(Some(GroupOffsetStorage::Kafka))
2626
.create()?;
2727

2828
loop{

‎examples/example-fetch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use kafka::client::{FetchPartition, KafkaClient};
33
/// This program demonstrates the low level api for fetching messages.
44
/// Please look at examles/consume.rs for an easier to use API.
55
fnmain(){
6-
env_logger::init();
6+
tracing_subscriber::fmt::init();
77

88
let broker ="localhost:9092";
99
let topic ="my-topic";

‎examples/example-produce.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use kafka::producer::{Producer, Record, RequiredAcks};
77
/// `Producer`. This is a convenient higher-level client that will
88
/// fit most use cases.
99
fnmain(){
10-
env_logger::init();
10+
tracing_subscriber::fmt::init();
1111

1212
let broker ="localhost:9092";
1313
let topic ="my-topic";

‎examples/example-ssl.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
#[macro_use]
2-
externcrate log;
3-
41
fnmain(){
52
example::main();
63
}
@@ -9,6 +6,7 @@ fn main() {
96
mod example{
107
use kafka;
118
use openssl;
9+
use tracing::info;
1210

1311
use std::env;
1412
use std::process;
@@ -18,7 +16,7 @@ mod example {
1816
useself::openssl::ssl::{SslConnector,SslFiletype,SslMethod,SslVerifyMode};
1917

2018
pubfnmain(){
21-
env_logger::init();
19+
tracing_subscriber::fmt::init();
2220

2321
// ~ parse the command line arguments
2422
let cfg =matchConfig::from_cmdline(){

‎examples/offset-monitor.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
#[macro_use]
21
use std::cmp;
32
use std::env;
4-
use std::io::{self,stderr, stdout,BufWriter,Write};
3+
use std::io::{stderr, stdout,BufWriter,Write};
54
use std::process;
65
use std::thread;
76
//use std::time as stdtime;
@@ -15,7 +14,7 @@ use kafka::client::{FetchOffset, GroupOffsetStorage, KafkaClient};
1514
/// the lag for a particular consumer group. Dumps the offset/lag of
1615
/// the monitored topic/group to stdout every few seconds.
1716
fnmain(){
18-
env_logger::init();
17+
tracing_subscriber::fmt::init();
1918

2019
macro_rules! abort{
2120
($e:expr) =>{{
@@ -38,7 +37,7 @@ fn main() {
3837

3938
fnrun(cfg:Config) ->Result<()>{
4039
letmut client =KafkaClient::new(cfg.brokers.clone());
41-
client.set_group_offset_storage(cfg.offset_storage);
40+
client.set_group_offset_storage(Some(cfg.offset_storage));
4241
client.load_metadata_all()?;
4342

4443
// ~ if no topic specified, print all available and be done.

‎examples/topic-metadata.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use kafka::client::{FetchOffset, KafkaClient};
88

99
/// Dumps available topic metadata to stdout.
1010
fnmain(){
11-
env_logger::init();
11+
tracing_subscriber::fmt::init();
1212

1313
let cfg =matchConfig::from_cmdline(){
1414
Ok(cfg) => cfg,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp