Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6c9c1f2

Browse files
authored
Merge pull request#246 from yra1029/Add_producer_timestamp
feat: Added the support for the new format of producer message with timestamp support
2 parents16d964a +a8889fc commit6c9c1f2

File tree

4 files changed

+216
-2
lines changed

4 files changed

+216
-2
lines changed

‎Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ flate2 = { version = "1.0", optional = true }
2222
openssl = {version ="0.10",optional =true }
2323
openssl-sys = {version ="0.9",optional =true }
2424
snap = {version ="1.1",optional =true }
25+
chrono = {version ="0.4",optional =true}
2526
thiserror ="1.0"
2627
tracing ="0.1"
2728

@@ -38,5 +39,6 @@ default = ["snappy", "gzip", "security"]
3839
snappy = ["snap"]
3940
gzip = ["flate2"]
4041
security = ["openssl","openssl-sys"]
42+
producer_timestamp = ["chrono"]
4143
nightly = []
4244
integration_tests = []

‎src/client/mod.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ use crate::protocol::list_offset::ListOffsetVersion;
1919
pubusecrate::utils::PartitionOffset;
2020
usecrate::utils::TimestampedPartitionOffset;
2121

22+
#[cfg(feature ="producer_timestamp")]
23+
pubusecrate::protocol::produce::ProducerTimestamp;
24+
25+
#[cfg(not(feature ="producer_timestamp"))]
26+
usecrate::protocol::produce::ProducerTimestamp;
27+
2228
#[cfg(feature ="security")]
2329
pubuseself::network::SecurityConfig;
2430

@@ -78,6 +84,9 @@ pub const DEFAULT_RETRY_MAX_ATTEMPTS: u32 = 120_000 / DEFAULT_RETRY_BACKOFF_TIME
7884
/// The default value for `KafkaClient::set_connection_idle_timeout(..)`
7985
pubconstDEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS:u64 =540_000;
8086

87+
/// The default value for `KafkaClient::set_producer_timestamp(..)`
88+
pub(crate)constDEFAULT_PRODUCER_TIMESTAMP:Option<ProducerTimestamp> =None;
89+
8190
/// Client struct keeping track of brokers and topic metadata.
8291
///
8392
/// Implements methods described by the [Kafka Protocol](http://kafka.apache.org/protocol.html).
@@ -122,6 +131,9 @@ struct ClientConfig {
122131
// ~ the number of repeated retry attempts; prevents endless
123132
// repetition of a retry attempt
124133
retry_max_attempts:u32,
134+
// ~ producer's message timestamp option CreateTime/LogAppendTime
135+
#[allow(unused)]
136+
producer_timestamp:Option<ProducerTimestamp>,
125137
}
126138

127139
// --------------------------------------------------------------------
@@ -408,6 +420,7 @@ impl KafkaClient {
408420
offset_storage:DEFAULT_GROUP_OFFSET_STORAGE,
409421
retry_backoff_time:Duration::from_millis(DEFAULT_RETRY_BACKOFF_TIME_MILLIS),
410422
retry_max_attempts:DEFAULT_RETRY_MAX_ATTEMPTS,
423+
producer_timestamp:DEFAULT_PRODUCER_TIMESTAMP,
411424
},
412425
conn_pool: network::Connections::new(
413426
default_conn_rw_timeout(),
@@ -477,6 +490,7 @@ impl KafkaClient {
477490
offset_storage:DEFAULT_GROUP_OFFSET_STORAGE,
478491
retry_backoff_time:Duration::from_millis(DEFAULT_RETRY_BACKOFF_TIME_MILLIS),
479492
retry_max_attempts:DEFAULT_RETRY_MAX_ATTEMPTS,
493+
producer_timestamp:DEFAULT_PRODUCER_TIMESTAMP,
480494
},
481495
conn_pool: network::Connections::new_with_security(
482496
default_conn_rw_timeout(),
@@ -722,6 +736,31 @@ impl KafkaClient {
722736
self.conn_pool.idle_timeout()
723737
}
724738

739+
#[cfg(feature ="producer_timestamp")]
740+
/// Sets the compression algorithm to use when sending out messages.
741+
///
742+
/// # Example
743+
///
744+
/// ```no_run
745+
/// use kafka::client::{Compression, KafkaClient};
746+
///
747+
/// let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
748+
/// client.load_metadata_all().unwrap();
749+
/// client.set_producer_timestamp(Timestamp::CreateTime);
750+
/// ```
751+
#[inline]
752+
pubfnset_producer_timestamp(&mutself,producer_timestamp:Option<ProducerTimestamp>){
753+
self.config.producer_timestamp = producer_timestamp;
754+
}
755+
756+
#[cfg(feature ="producer_timestamp")]
757+
/// Retrieves the current `KafkaClient::producer_timestamp` setting.
758+
#[inline]
759+
#[must_use]
760+
pubfnproducer_timestamp(&self) ->Option<ProducerTimestamp>{
761+
self.config.producer_timestamp
762+
}
763+
725764
/// Provides a view onto the currently loaded metadata of known .
726765
///
727766
/// # Examples
@@ -1455,6 +1494,8 @@ impl KafkaClientInternals for KafkaClient {
14551494
correlation,
14561495
&config.client_id,
14571496
config.compression,
1497+
#[cfg(feature ="producer_timestamp")]
1498+
config.producer_timestamp,
14581499
)
14591500
})
14601501
.add(msg.topic, msg.partition, msg.key, msg.value),

‎src/producer.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ use std::slice::from_ref;
6868
use std::time::Duration;
6969
use twox_hash::XxHash32;
7070

71+
#[cfg(feature ="producer_timestamp")]
72+
usecrate::protocol::produce::ProducerTimestamp;
73+
7174
#[cfg(feature ="security")]
7275
usecrate::client::SecurityConfig;
7376

@@ -360,6 +363,8 @@ pub struct Builder<P = DefaultPartitioner> {
360363
partitioner:P,
361364
security_config:Option<SecurityConfig>,
362365
client_id:Option<String>,
366+
#[cfg(feature ="producer_timestamp")]
367+
producer_timestamp:Option<ProducerTimestamp>,
363368
}
364369

365370
implBuilder{
@@ -376,6 +381,8 @@ impl Builder {
376381
partitioner:DefaultPartitioner::default(),
377382
security_config:None,
378383
client_id:None,
384+
#[cfg(feature ="producer_timestamp")]
385+
producer_timestamp:None,
379386
};
380387
ifletSome(ref c) = b.client{
381388
b.compression = c.compression();
@@ -437,6 +444,16 @@ impl Builder {
437444
self.client_id =Some(client_id);
438445
self
439446
}
447+
448+
#[cfg(feature ="producer_timestamp")]
449+
/// Sets the compression algorithm to use when sending out data.
450+
///
451+
/// See `KafkaClient::set_producer_timestamp`.
452+
#[must_use]
453+
pubfnwith_timestamp(mutself,timestamp:ProducerTimestamp) ->Self{
454+
self.producer_timestamp =Some(timestamp);
455+
self
456+
}
440457
}
441458

442459
impl<P>Builder<P>{
@@ -453,6 +470,8 @@ impl<P> Builder<P> {
453470
partitioner,
454471
security_config:None,
455472
client_id:None,
473+
#[cfg(feature ="producer_timestamp")]
474+
producer_timestamp:None,
456475
}
457476
}
458477

@@ -484,6 +503,8 @@ impl<P> Builder<P> {
484503
// ~ apply configuration settings
485504
client.set_compression(self.compression);
486505
client.set_connection_idle_timeout(self.conn_idle_timeout);
506+
#[cfg(feature ="producer_timestamp")]
507+
client.set_producer_timestamp(self.producer_timestamp);
487508
ifletSome(client_id) =self.client_id{
488509
client.set_client_id(client_id);
489510
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp