@@ -71,6 +71,7 @@ use crate::protocol;
71
71
72
72
// public re-exports
73
73
pub use self :: builder:: Builder ;
74
+ use self :: state:: TopicPartition ;
74
75
pub use crate :: client:: fetch:: Message ;
75
76
pub use crate :: client:: FetchOffset ;
76
77
pub use crate :: client:: GroupOffsetStorage ;
@@ -179,6 +180,61 @@ impl Consumer {
179
180
& self . config . group
180
181
}
181
182
183
+ /// Convenient method to allow consumer to manually reposition to a set of
184
+ /// topic, partition and offset.
185
+ /// let mut client: KafkaClient = KafkaClient::new(vec!["kafka.test.fio.drw:9092".to_string()]);
186
+ /// # Examples
187
+ ///
188
+ /// ```no_run
189
+ ///
190
+ /// let mut consumer: Consumer = Consumer::from_hosts(vec!["localhost:9092".to_string()])
191
+ /// .with_topic("test-topic".to_string())
192
+ /// .with_fallback_offset(FetchOffset::Latest)
193
+ /// .with_offset_storage(Some(GroupOffsetStorage::Kafka))
194
+ /// .create()
195
+ /// .unwrap();
196
+ ///
197
+ /// let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
198
+ /// client.load_metadata_all().unwrap();
199
+ /// let tpos = client.list_offsets(&s, FetchOffset::ByTime(1698425676797)).unwrap();
200
+ /// let seek_results: Vec<Result<(), Error>> = res
201
+ /// .unwrap()
202
+ /// .into_iter()
203
+ /// .flat_map(|topic, partition_offsets) {
204
+ /// partition_offsets.into_iter()
205
+ /// .map(move |po| topic.clone(), po.partition, po.offset))
206
+ /// })
207
+ /// .map(|topic, partition, offset| consumer.seek(topic.as_str(), partition, offset))
208
+ /// .collect()
209
+ /// ```
210
+ ///
211
+ /// This makes the consumer resets its fetch offsets to the nearest offsets after the
212
+ /// timestamp.
213
+ pub fn seek ( & mut self , topic : & str , partition : i32 , offset : i64 ) ->Result < ( ) > {
214
+ let topic_ref =self . state . topic_ref ( topic) ;
215
+ match topic_ref{
216
+ Some ( topic_ref) =>{
217
+ let tp =TopicPartition {
218
+ topic_ref,
219
+ partition,
220
+ } ;
221
+ let maybe_entry =self . state . fetch_offsets . entry ( tp) ;
222
+ match maybe_entry{
223
+ Entry :: Occupied ( mut e) =>{
224
+ e. get_mut ( ) . offset = offset;
225
+ Ok ( ( ) )
226
+ }
227
+ Entry :: Vacant ( _) =>Err ( Error :: TopicPartitionError {
228
+ topic_name : topic. to_string ( ) ,
229
+ partition_id : partition,
230
+ error_code : KafkaCode :: UnknownTopicOrPartition ,
231
+ } ) ,
232
+ }
233
+ }
234
+ None =>Err ( Error :: Kafka ( KafkaCode :: UnknownTopicOrPartition ) ) ,
235
+ }
236
+ }
237
+
182
238
// ~ returns (number partitions queried, fecth responses)
183
239
fn fetch_messages ( & mut self ) ->( u32 , Result < Vec < fetch:: Response > > ) {
184
240
// ~ if there's a retry partition ... fetch messages just for