@@ -19,6 +19,12 @@ use crate::protocol::list_offset::ListOffsetVersion;
19
19
pub use crate :: utils:: PartitionOffset ;
20
20
use crate :: utils:: TimestampedPartitionOffset ;
21
21
22
+ #[ cfg( feature ="producer_timestamp" ) ]
23
+ pub use crate :: protocol:: produce:: ProducerTimestamp ;
24
+
25
+ #[ cfg( not( feature ="producer_timestamp" ) ) ]
26
+ use crate :: protocol:: produce:: ProducerTimestamp ;
27
+
22
28
#[ cfg( feature ="security" ) ]
23
29
pub use self :: network:: SecurityConfig ;
24
30
@@ -78,6 +84,9 @@ pub const DEFAULT_RETRY_MAX_ATTEMPTS: u32 = 120_000 / DEFAULT_RETRY_BACKOFF_TIME
78
84
/// The default value for `KafkaClient::set_connection_idle_timeout(..)`
79
85
pub const DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS : u64 =540_000 ;
80
86
87
+ /// The default value for `KafkaClient::set_producer_timestamp(..)`
88
+ pub ( crate ) const DEFAULT_PRODUCER_TIMESTAMP : Option < ProducerTimestamp > =None ;
89
+
81
90
/// Client struct keeping track of brokers and topic metadata.
82
91
///
83
92
/// Implements methods described by the [Kafka Protocol](http://kafka.apache.org/protocol.html).
@@ -122,6 +131,9 @@ struct ClientConfig {
122
131
// ~ the number of repeated retry attempts; prevents endless
123
132
// repetition of a retry attempt
124
133
retry_max_attempts : u32 ,
134
+ // ~ producer's message timestamp option CreateTime/LogAppendTime
135
+ #[ allow( unused) ]
136
+ producer_timestamp : Option < ProducerTimestamp > ,
125
137
}
126
138
127
139
// --------------------------------------------------------------------
@@ -408,6 +420,7 @@ impl KafkaClient {
408
420
offset_storage : DEFAULT_GROUP_OFFSET_STORAGE ,
409
421
retry_backoff_time : Duration :: from_millis ( DEFAULT_RETRY_BACKOFF_TIME_MILLIS ) ,
410
422
retry_max_attempts : DEFAULT_RETRY_MAX_ATTEMPTS ,
423
+ producer_timestamp : DEFAULT_PRODUCER_TIMESTAMP ,
411
424
} ,
412
425
conn_pool : network:: Connections :: new (
413
426
default_conn_rw_timeout ( ) ,
@@ -477,6 +490,7 @@ impl KafkaClient {
477
490
offset_storage : DEFAULT_GROUP_OFFSET_STORAGE ,
478
491
retry_backoff_time : Duration :: from_millis ( DEFAULT_RETRY_BACKOFF_TIME_MILLIS ) ,
479
492
retry_max_attempts : DEFAULT_RETRY_MAX_ATTEMPTS ,
493
+ producer_timestamp : DEFAULT_PRODUCER_TIMESTAMP ,
480
494
} ,
481
495
conn_pool : network:: Connections :: new_with_security (
482
496
default_conn_rw_timeout ( ) ,
@@ -722,6 +736,31 @@ impl KafkaClient {
722
736
self . conn_pool . idle_timeout ( )
723
737
}
724
738
739
+ #[ cfg( feature ="producer_timestamp" ) ]
740
+ /// Sets the compression algorithm to use when sending out messages.
741
+ ///
742
+ /// # Example
743
+ ///
744
+ /// ```no_run
745
+ /// use kafka::client::{Compression, KafkaClient};
746
+ ///
747
+ /// let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
748
+ /// client.load_metadata_all().unwrap();
749
+ /// client.set_producer_timestamp(Timestamp::CreateTime);
750
+ /// ```
751
+ #[ inline]
752
+ pub fn set_producer_timestamp ( & mut self , producer_timestamp : Option < ProducerTimestamp > ) {
753
+ self . config . producer_timestamp = producer_timestamp;
754
+ }
755
+
756
+ #[ cfg( feature ="producer_timestamp" ) ]
757
+ /// Retrieves the current `KafkaClient::producer_timestamp` setting.
758
+ #[ inline]
759
+ #[ must_use]
760
+ pub fn producer_timestamp ( & self ) ->Option < ProducerTimestamp > {
761
+ self . config . producer_timestamp
762
+ }
763
+
725
764
/// Provides a view onto the currently loaded metadata of known .
726
765
///
727
766
/// # Examples
@@ -1455,6 +1494,8 @@ impl KafkaClientInternals for KafkaClient {
1455
1494
correlation,
1456
1495
& config. client_id ,
1457
1496
config. compression ,
1497
+ #[ cfg( feature ="producer_timestamp" ) ]
1498
+ config. producer_timestamp ,
1458
1499
)
1459
1500
} )
1460
1501
. add ( msg. topic , msg. partition , msg. key , msg. value ) ,