1
- use std:: io :: { Read , Write } ;
1
+ use std:: borrow :: Cow ;
2
2
use std:: mem;
3
3
use std:: time:: Duration ;
4
4
5
- use crate :: codecs:: { FromByte , ToByte } ;
6
5
use crate :: error:: { Error , KafkaCode , Result } ;
7
6
7
+ mod api_key;
8
8
pub mod consumer;
9
- pub mod decoder;
9
+ mod decoder;
10
+ mod error;
10
11
pub mod metadata;
11
12
pub mod offset;
12
13
pub mod produce;
13
14
14
15
pub mod fetch;
15
- mod zreader;
16
16
17
+ use self :: api_key:: ApiKey ;
17
18
// ~ convenient re-exports for request/response types defined in the
18
19
// submodules
19
20
pub use self :: consumer:: {
20
21
GroupCoordinatorRequest , GroupCoordinatorResponse , OffsetCommitRequest , OffsetCommitResponse ,
21
22
OffsetCommitVersion , OffsetFetchRequest , OffsetFetchResponse , OffsetFetchVersion ,
22
23
} ;
24
+ use self :: decoder:: Decoder ;
23
25
pub use self :: fetch:: FetchRequest ;
24
26
pub use self :: metadata:: { MetadataRequest , MetadataResponse } ;
25
27
pub use self :: offset:: { OffsetRequest , OffsetResponse } ;
26
28
pub use self :: produce:: { ProduceRequest , ProduceResponse } ;
27
29
28
- // --------------------------------------------------------------------
29
-
30
- const API_KEY_PRODUCE : i16 =0 ;
31
- const API_KEY_FETCH : i16 =1 ;
32
- const API_KEY_OFFSET : i16 =2 ;
33
- const API_KEY_METADATA : i16 =3 ;
34
- // 4-7 reserved for non-public kafka api services
35
- const API_KEY_OFFSET_COMMIT : i16 =8 ;
36
- const API_KEY_OFFSET_FETCH : i16 =9 ;
37
- const API_KEY_GROUP_COORDINATOR : i16 =10 ;
38
-
39
30
// the default version of Kafka API we are requesting
40
31
const API_VERSION : i16 =0 ;
41
32
42
33
// --------------------------------------------------------------------
43
34
35
+ pub trait Decode < ' de > {
36
+ fn decode ( decoder : & mut Decoder < ' de > ) ->Result < Self , decoder:: Error > ;
37
+ }
38
+
44
39
/// Provides a way to parse the full raw response data into a
45
40
/// particular response structure.
46
41
pub trait ResponseParser {
@@ -50,93 +45,22 @@ pub trait ResponseParser {
50
45
51
46
// --------------------------------------------------------------------
52
47
53
- impl KafkaCode {
54
- fn from_protocol ( n : i16 ) ->Option < KafkaCode > {
55
- if n ==0 {
56
- return None ;
57
- }
58
- if n >=KafkaCode :: OffsetOutOfRange as i16 && n <=KafkaCode :: UnsupportedVersion as i16 {
59
- return Some ( unsafe { mem:: transmute ( nas i8 ) } ) ;
60
- }
61
- Some ( KafkaCode :: Unknown )
62
- }
63
- }
64
-
65
- #[ test]
66
- fn test_kafka_code_from_protocol ( ) {
67
- use std:: i16;
68
-
69
- macro_rules! assert_kafka_code{
70
- ( $kcode: path, $n: expr) =>{
71
- assert!( if let Some ( $kcode) =KafkaCode :: from_protocol( $n) {
72
- true
73
- } else{
74
- false
75
- } )
76
- } ;
77
- }
78
-
79
- assert ! ( KafkaCode :: from_protocol( 0 ) . is_none( ) ) ;
80
- assert_kafka_code ! (
81
- KafkaCode :: OffsetOutOfRange ,
82
- KafkaCode :: OffsetOutOfRange as i16
83
- ) ;
84
- assert_kafka_code ! (
85
- KafkaCode :: IllegalGeneration ,
86
- KafkaCode :: IllegalGeneration as i16
87
- ) ;
88
- assert_kafka_code ! (
89
- KafkaCode :: UnsupportedVersion ,
90
- KafkaCode :: UnsupportedVersion as i16
91
- ) ;
92
- assert_kafka_code ! ( KafkaCode :: Unknown , KafkaCode :: Unknown as i16 ) ;
93
- // ~ test some un mapped non-zero codes; should all map to "unknown"
94
- assert_kafka_code ! ( KafkaCode :: Unknown , i16 :: MAX ) ;
95
- assert_kafka_code ! ( KafkaCode :: Unknown , i16 :: MIN ) ;
96
- assert_kafka_code ! ( KafkaCode :: Unknown , -100 ) ;
97
- assert_kafka_code ! ( KafkaCode :: Unknown , 100 ) ;
98
- }
99
-
100
- // a (sub-) module private method for error
101
- impl Error {
102
- fn from_protocol ( n : i16 ) ->Option < Error > {
103
- KafkaCode :: from_protocol ( n) . map ( Error :: Kafka )
104
- }
105
- }
106
-
107
- // --------------------------------------------------------------------
108
-
109
48
#[ derive( Debug ) ]
110
49
pub struct HeaderRequest < ' a > {
111
- pub api_key : i16 ,
50
+ pub api_key : ApiKey ,
112
51
pub api_version : i16 ,
113
52
pub correlation_id : i32 ,
114
53
pub client_id : & ' a str ,
115
54
}
116
55
117
- impl < ' a > HeaderRequest < ' a > {
118
- fn new (
119
- api_key : i16 ,
120
- api_version : i16 ,
121
- correlation_id : i32 ,
122
- client_id : & ' a str ,
123
- ) ->HeaderRequest < ' _ > {
124
- HeaderRequest {
125
- api_key,
126
- api_version,
127
- correlation_id,
128
- client_id,
129
- }
130
- }
131
- }
132
-
133
- impl < ' a > ToByte for HeaderRequest < ' a > {
134
- fn encode < W : Write > ( & self , buffer : & mut W ) ->Result < ( ) > {
135
- self . api_key . encode ( buffer) ?;
136
- self . api_version . encode ( buffer) ?;
137
- self . correlation_id . encode ( buffer) ?;
138
- self . client_id . encode ( buffer) ?;
139
- Ok ( ( ) )
56
+ impl < ' de > Decode < ' de > for HeaderRequest < ' de > {
57
+ fn decode ( decoder : & mut Decoder < ' de > ) ->Result < Self > {
58
+ Ok ( Self {
59
+ api_key : decoder. decode_api_key ( ) ?,
60
+ api_version : decoder. decode_int16 ( ) ?,
61
+ correlation_id : decoder. decode_int32 ( ) ?,
62
+ client_id : decoder. decode_string ( ) ?,
63
+ } )
140
64
}
141
65
}
142
66
@@ -147,15 +71,6 @@ pub struct HeaderResponse {
147
71
pub correlation : i32 ,
148
72
}
149
73
150
- impl FromByte for HeaderResponse {
151
- type R =HeaderResponse ;
152
-
153
- #[ allow( unused_must_use) ]
154
- fn decode < T : Read > ( & mut self , buffer : & mut T ) ->Result < ( ) > {
155
- self . correlation . decode ( buffer)
156
- }
157
- }
158
-
159
74
// --------------------------------------------------------------------
160
75
161
76
pub fn to_crc ( data : & [ u8 ] ) ->u32 {