Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1f3b162

Browse files
Update
1 parent576e866 commit1f3b162

File tree

7 files changed

+195
-348
lines changed

7 files changed

+195
-348
lines changed

‎Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ thiserror = "1.0.31"
2525
tracing ="0.1.34"
2626
integer-encoding ="3.0.3"
2727
uuid = {version ="1.1.0",features = ["v4"] }
28+
num_enum ="0.5.7"
2829

2930
[dev-dependencies]
3031
anyhow ="1.0.57"

‎src/protocol.rs

Lines changed: 19 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,41 @@
1-
use std::io::{Read,Write};
1+
use std::borrow::Cow;
22
use std::mem;
33
use std::time::Duration;
44

5-
usecrate::codecs::{FromByte,ToByte};
65
usecrate::error::{Error,KafkaCode,Result};
76

7+
mod api_key;
88
pubmod consumer;
9-
pubmod decoder;
9+
mod decoder;
10+
mod error;
1011
pubmod metadata;
1112
pubmod offset;
1213
pubmod produce;
1314

1415
pubmod fetch;
15-
mod zreader;
1616

17+
useself::api_key::ApiKey;
1718
// ~ convenient re-exports for request/response types defined in the
1819
// submodules
1920
pubuseself::consumer::{
2021
GroupCoordinatorRequest,GroupCoordinatorResponse,OffsetCommitRequest,OffsetCommitResponse,
2122
OffsetCommitVersion,OffsetFetchRequest,OffsetFetchResponse,OffsetFetchVersion,
2223
};
24+
useself::decoder::Decoder;
2325
pubuseself::fetch::FetchRequest;
2426
pubuseself::metadata::{MetadataRequest,MetadataResponse};
2527
pubuseself::offset::{OffsetRequest,OffsetResponse};
2628
pubuseself::produce::{ProduceRequest,ProduceResponse};
2729

28-
// --------------------------------------------------------------------
29-
30-
constAPI_KEY_PRODUCE:i16 =0;
31-
constAPI_KEY_FETCH:i16 =1;
32-
constAPI_KEY_OFFSET:i16 =2;
33-
constAPI_KEY_METADATA:i16 =3;
34-
// 4-7 reserved for non-public kafka api services
35-
constAPI_KEY_OFFSET_COMMIT:i16 =8;
36-
constAPI_KEY_OFFSET_FETCH:i16 =9;
37-
constAPI_KEY_GROUP_COORDINATOR:i16 =10;
38-
3930
// the default version of Kafka API we are requesting
4031
constAPI_VERSION:i16 =0;
4132

4233
// --------------------------------------------------------------------
4334

35+
pubtraitDecode<'de>{
36+
fndecode(decoder:&mutDecoder<'de>) ->Result<Self, decoder::Error>;
37+
}
38+
4439
/// Provides a way to parse the full raw response data into a
4540
/// particular response structure.
4641
pubtraitResponseParser{
@@ -50,93 +45,22 @@ pub trait ResponseParser {
5045

5146
// --------------------------------------------------------------------
5247

53-
implKafkaCode{
54-
fnfrom_protocol(n:i16) ->Option<KafkaCode>{
55-
if n ==0{
56-
returnNone;
57-
}
58-
if n >=KafkaCode::OffsetOutOfRangeasi16 && n <=KafkaCode::UnsupportedVersionasi16{
59-
returnSome(unsafe{ mem::transmute(nasi8)});
60-
}
61-
Some(KafkaCode::Unknown)
62-
}
63-
}
64-
65-
#[test]
66-
fntest_kafka_code_from_protocol(){
67-
use std::i16;
68-
69-
macro_rules! assert_kafka_code{
70-
($kcode:path, $n:expr) =>{
71-
assert!(ifletSome($kcode) =KafkaCode::from_protocol($n){
72-
true
73-
} else{
74-
false
75-
})
76-
};
77-
}
78-
79-
assert!(KafkaCode::from_protocol(0).is_none());
80-
assert_kafka_code!(
81-
KafkaCode::OffsetOutOfRange,
82-
KafkaCode::OffsetOutOfRangeasi16
83-
);
84-
assert_kafka_code!(
85-
KafkaCode::IllegalGeneration,
86-
KafkaCode::IllegalGenerationasi16
87-
);
88-
assert_kafka_code!(
89-
KafkaCode::UnsupportedVersion,
90-
KafkaCode::UnsupportedVersionasi16
91-
);
92-
assert_kafka_code!(KafkaCode::Unknown,KafkaCode::Unknownasi16);
93-
// ~ test some un mapped non-zero codes; should all map to "unknown"
94-
assert_kafka_code!(KafkaCode::Unknown,i16::MAX);
95-
assert_kafka_code!(KafkaCode::Unknown,i16::MIN);
96-
assert_kafka_code!(KafkaCode::Unknown, -100);
97-
assert_kafka_code!(KafkaCode::Unknown,100);
98-
}
99-
100-
// a (sub-) module private method for error
101-
implError{
102-
fnfrom_protocol(n:i16) ->Option<Error>{
103-
KafkaCode::from_protocol(n).map(Error::Kafka)
104-
}
105-
}
106-
107-
// --------------------------------------------------------------------
108-
10948
#[derive(Debug)]
11049
pubstructHeaderRequest<'a>{
111-
pubapi_key:i16,
50+
pubapi_key:ApiKey,
11251
pubapi_version:i16,
11352
pubcorrelation_id:i32,
11453
pubclient_id:&'astr,
11554
}
11655

117-
impl<'a>HeaderRequest<'a>{
118-
fnnew(
119-
api_key:i16,
120-
api_version:i16,
121-
correlation_id:i32,
122-
client_id:&'astr,
123-
) ->HeaderRequest<'_>{
124-
HeaderRequest{
125-
api_key,
126-
api_version,
127-
correlation_id,
128-
client_id,
129-
}
130-
}
131-
}
132-
133-
impl<'a>ToByteforHeaderRequest<'a>{
134-
fnencode<W:Write>(&self,buffer:&mutW) ->Result<()>{
135-
self.api_key.encode(buffer)?;
136-
self.api_version.encode(buffer)?;
137-
self.correlation_id.encode(buffer)?;
138-
self.client_id.encode(buffer)?;
139-
Ok(())
56+
impl<'de>Decode<'de>forHeaderRequest<'de>{
57+
fndecode(decoder:&mutDecoder<'de>) ->Result<Self>{
58+
Ok(Self{
59+
api_key: decoder.decode_api_key()?,
60+
api_version: decoder.decode_int16()?,
61+
correlation_id: decoder.decode_int32()?,
62+
client_id: decoder.decode_string()?,
63+
})
14064
}
14165
}
14266

@@ -147,15 +71,6 @@ pub struct HeaderResponse {
14771
pubcorrelation:i32,
14872
}
14973

150-
implFromByteforHeaderResponse{
151-
typeR =HeaderResponse;
152-
153-
#[allow(unused_must_use)]
154-
fndecode<T:Read>(&mutself,buffer:&mutT) ->Result<()>{
155-
self.correlation.decode(buffer)
156-
}
157-
}
158-
15974
// --------------------------------------------------------------------
16075

16176
pubfnto_crc(data:&[u8]) ->u32{

‎src/protocol/api_key.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use num_enum::{IntoPrimitive,TryFromPrimitive};
2+
3+
#[repr(i16)]
4+
#[derive(TryFromPrimitive,IntoPrimitive)]
5+
pubenumApiKey{
6+
Produce =0,
7+
Fetch,
8+
ListOffsets,
9+
Metadata,
10+
LeaderAndIsr,
11+
StopReplica,
12+
UpdateMetadata,
13+
ControlledShutdown,
14+
OffsetCommit,
15+
OffsetFetch,
16+
FindCoordinator,
17+
JoinGroup,
18+
Heartbeat,
19+
LeaveGroup,
20+
SyncGroup,
21+
DescribeGroups,
22+
ListGroups,
23+
SaslHandshake,
24+
ApiVersions,
25+
CreateTopics,
26+
DeleteTopics,
27+
DeleteRecords,
28+
InitProducerId,
29+
OffsetForLeaderEpoch,
30+
AddPartitionsToTxn,
31+
AddOffsetsToTxn,
32+
EndTxn,
33+
WriteTxnMarkers,
34+
TxnOffsetCommit,
35+
DescribeAcls,
36+
CreateAcls,
37+
DeleteAcls,
38+
DescribeConfigs,
39+
AlterConfigs,
40+
AlterReplicaLogDirs,
41+
DescribeLogDirs,
42+
SaslAuthenticate,
43+
CreatePartitions,
44+
CreateDelegationToken,
45+
RenewDelegationToken,
46+
ExpireDelegationToken,
47+
DescribeDelegationToken,
48+
DeleteGroups,
49+
ElectLeaders,
50+
IncrementalAlterConfigs,
51+
AlterPartitionReassignments,
52+
ListPartitionReassignments,
53+
OffsetDelete,
54+
DescribeClientQuotas,
55+
AlterClientQuotas,
56+
DescribeUserScramCredentials,
57+
AlterUserScramCredentials,
58+
AlterIsr,
59+
UpdateFeatures,
60+
DescribeCluster,
61+
DescribeProducers,
62+
DescribeTransactions,
63+
ListTransactions,
64+
AllocateProducerIds,
65+
}

‎src/protocol/consumer.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use std::io::{Read,Write};
22

3-
usecrate::codecs::{self,FromByte,ToByte};
43
usecrate::error::{self,Error,KafkaCode,Result};
54
usecrate::utils::PartitionOffset;
65

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp