Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf4ca0b4

Browse files
Support describeacls (segmentio#1166)
* Support describeacls* gofmt -s -w createacl_test.go* make test diff smaller and fix protocl api key* fix another protocol api key* improve test name* protocol fixes* add missing patterntype* fix createacls protocol* fix tags and add tagged fields back in* bump createacls version to v3* wip* just one filter, not a list of filters* add missing patterntype in test* fix patterntype location* add prototests* createacl_test.go -> createacls_test.go* seperate createacls_test and describeacls_test* fix describeaclstest* add comment for ResourcePatternTypeFilter
1 parent6193fa9 commitf4ca0b4

File tree

7 files changed

+560
-18
lines changed

7 files changed

+560
-18
lines changed

‎createacl_test.go‎renamed to ‎createacls_test.go‎

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,18 @@ func TestClientCreateACLs(t *testing.T) {
1515
client,shutdown:=newLocalClient()
1616
defershutdown()
1717

18-
res,err:=client.CreateACLs(context.Background(),&CreateACLsRequest{
18+
topic:=makeTopic()
19+
group:=makeGroupID()
20+
21+
createRes,err:=client.CreateACLs(context.Background(),&CreateACLsRequest{
1922
ACLs: []ACLEntry{
2023
{
2124
Principal:"User:alice",
2225
PermissionType:ACLPermissionTypeAllow,
2326
Operation:ACLOperationTypeRead,
2427
ResourceType:ResourceTypeTopic,
2528
ResourcePatternType:PatternTypeLiteral,
26-
ResourceName:"fake-topic-for-alice",
29+
ResourceName:topic,
2730
Host:"*",
2831
},
2932
{
@@ -32,7 +35,7 @@ func TestClientCreateACLs(t *testing.T) {
3235
Operation:ACLOperationTypeRead,
3336
ResourceType:ResourceTypeGroup,
3437
ResourcePatternType:PatternTypeLiteral,
35-
ResourceName:"fake-group-for-bob",
38+
ResourceName:group,
3639
Host:"*",
3740
},
3841
},
@@ -41,7 +44,7 @@ func TestClientCreateACLs(t *testing.T) {
4144
t.Fatal(err)
4245
}
4346

44-
for_,err:=rangeres.Errors {
47+
for_,err:=rangecreateRes.Errors {
4548
iferr!=nil {
4649
t.Error(err)
4750
}

‎describeacls.go‎

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"time"
8+
9+
"github.com/segmentio/kafka-go/protocol/describeacls"
10+
)
11+
12+
// DescribeACLsRequest represents a request sent to a kafka broker to describe
13+
// existing ACLs.
14+
typeDescribeACLsRequeststruct {
15+
// Address of the kafka broker to send the request to.
16+
Addr net.Addr
17+
18+
// Filter to filter ACLs on.
19+
FilterACLFilter
20+
}
21+
22+
typeACLFilterstruct {
23+
ResourceTypeFilterResourceType
24+
ResourceNameFilterstring
25+
// ResourcePatternTypeFilter was added in v1 and is not available prior to that.
26+
ResourcePatternTypeFilterPatternType
27+
PrincipalFilterstring
28+
HostFilterstring
29+
OperationACLOperationType
30+
PermissionTypeACLPermissionType
31+
}
32+
33+
// DescribeACLsResponse represents a response from a kafka broker to an ACL
34+
// describe request.
35+
typeDescribeACLsResponsestruct {
36+
// The amount of time that the broker throttled the request.
37+
Throttle time.Duration
38+
39+
// Error that occurred while attempting to describe
40+
// the ACLs.
41+
Errorerror
42+
43+
// ACL resources returned from the describe request.
44+
Resources []ACLResource
45+
}
46+
47+
typeACLResourcestruct {
48+
ResourceTypeResourceType
49+
ResourceNamestring
50+
PatternTypePatternType
51+
ACLs []ACLDescription
52+
}
53+
54+
typeACLDescriptionstruct {
55+
Principalstring
56+
Hoststring
57+
OperationACLOperationType
58+
PermissionTypeACLPermissionType
59+
}
60+
61+
func (c*Client)DescribeACLs(ctx context.Context,req*DescribeACLsRequest) (*DescribeACLsResponse,error) {
62+
m,err:=c.roundTrip(ctx,req.Addr,&describeacls.Request{
63+
Filter: describeacls.ACLFilter{
64+
ResourceTypeFilter:int8(req.Filter.ResourceTypeFilter),
65+
ResourceNameFilter:req.Filter.ResourceNameFilter,
66+
ResourcePatternTypeFilter:int8(req.Filter.ResourcePatternTypeFilter),
67+
PrincipalFilter:req.Filter.PrincipalFilter,
68+
HostFilter:req.Filter.HostFilter,
69+
Operation:int8(req.Filter.Operation),
70+
PermissionType:int8(req.Filter.PermissionType),
71+
},
72+
})
73+
iferr!=nil {
74+
returnnil,fmt.Errorf("kafka.(*Client).DescribeACLs: %w",err)
75+
}
76+
77+
res:=m.(*describeacls.Response)
78+
resources:=make([]ACLResource,len(res.Resources))
79+
80+
forresourceIdx,respResource:=rangeres.Resources {
81+
descriptions:=make([]ACLDescription,len(respResource.ACLs))
82+
83+
fordescriptionIdx,respDescription:=rangerespResource.ACLs {
84+
descriptions[descriptionIdx]=ACLDescription{
85+
Principal:respDescription.Principal,
86+
Host:respDescription.Host,
87+
Operation:ACLOperationType(respDescription.Operation),
88+
PermissionType:ACLPermissionType(respDescription.PermissionType),
89+
}
90+
}
91+
92+
resources[resourceIdx]=ACLResource{
93+
ResourceType:ResourceType(respResource.ResourceType),
94+
ResourceName:respResource.ResourceName,
95+
PatternType:PatternType(respResource.PatternType),
96+
ACLs:descriptions,
97+
}
98+
}
99+
100+
ret:=&DescribeACLsResponse{
101+
Throttle:makeDuration(res.ThrottleTimeMs),
102+
Error:makeError(res.ErrorCode,res.ErrorMessage),
103+
Resources:resources,
104+
}
105+
106+
returnret,nil
107+
}

‎describeacls_test.go‎

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
ktesting"github.com/segmentio/kafka-go/testing"
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
funcTestClientDescribeACLs(t*testing.T) {
12+
if!ktesting.KafkaIsAtLeast("2.0.1") {
13+
return
14+
}
15+
16+
client,shutdown:=newLocalClient()
17+
defershutdown()
18+
19+
topic:=makeTopic()
20+
group:=makeGroupID()
21+
22+
createRes,err:=client.CreateACLs(context.Background(),&CreateACLsRequest{
23+
ACLs: []ACLEntry{
24+
{
25+
Principal:"User:alice",
26+
PermissionType:ACLPermissionTypeAllow,
27+
Operation:ACLOperationTypeRead,
28+
ResourceType:ResourceTypeTopic,
29+
ResourcePatternType:PatternTypeLiteral,
30+
ResourceName:topic,
31+
Host:"*",
32+
},
33+
{
34+
Principal:"User:bob",
35+
PermissionType:ACLPermissionTypeAllow,
36+
Operation:ACLOperationTypeRead,
37+
ResourceType:ResourceTypeGroup,
38+
ResourcePatternType:PatternTypeLiteral,
39+
ResourceName:group,
40+
Host:"*",
41+
},
42+
},
43+
})
44+
iferr!=nil {
45+
t.Fatal(err)
46+
}
47+
48+
for_,err:=rangecreateRes.Errors {
49+
iferr!=nil {
50+
t.Error(err)
51+
}
52+
}
53+
54+
describeResp,err:=client.DescribeACLs(context.Background(),&DescribeACLsRequest{
55+
Filter:ACLFilter{
56+
ResourceTypeFilter:ResourceTypeTopic,
57+
ResourceNameFilter:topic,
58+
ResourcePatternTypeFilter:PatternTypeLiteral,
59+
Operation:ACLOperationTypeRead,
60+
PermissionType:ACLPermissionTypeAllow,
61+
},
62+
})
63+
iferr!=nil {
64+
t.Fatal(err)
65+
}
66+
67+
expectedDescribeResp:=DescribeACLsResponse{
68+
Throttle:0,
69+
Error:makeError(0,""),
70+
Resources: []ACLResource{
71+
{
72+
ResourceType:ResourceTypeTopic,
73+
ResourceName:topic,
74+
PatternType:PatternTypeLiteral,
75+
ACLs: []ACLDescription{
76+
{
77+
Principal:"User:alice",
78+
Host:"*",
79+
Operation:ACLOperationTypeRead,
80+
PermissionType:ACLPermissionTypeAllow,
81+
},
82+
},
83+
},
84+
},
85+
}
86+
87+
assert.Equal(t,expectedDescribeResp,*describeResp)
88+
}

‎protocol/createacls/createacls.go‎

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ func init() {
99
typeRequeststruct {
1010
// We need at least one tagged field to indicate that v2+ uses "flexible"
1111
// messages.
12-
_struct{}`kafka:"min=v2,max=v2,tag"`
12+
_struct{}`kafka:"min=v2,max=v3,tag"`
1313

14-
Creations []RequestACLs`kafka:"min=v0,max=v2"`
14+
Creations []RequestACLs`kafka:"min=v0,max=v3"`
1515
}
1616

1717
func (r*Request)ApiKey() protocol.ApiKey {returnprotocol.CreateAcls }
@@ -21,29 +21,37 @@ func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
2121
}
2222

2323
typeRequestACLsstruct {
24-
ResourceTypeint8`kafka:"min=v0,max=v2"`
25-
ResourceNamestring`kafka:"min=v0,max=v2"`
26-
ResourcePatternTypeint8`kafka:"min=v0,max=v2"`
27-
Principalstring`kafka:"min=v0,max=v2"`
28-
Hoststring`kafka:"min=v0,max=v2"`
29-
Operationint8`kafka:"min=v0,max=v2"`
30-
PermissionTypeint8`kafka:"min=v0,max=v2"`
24+
// We need at least one tagged field to indicate that v2+ uses "flexible"
25+
// messages.
26+
_struct{}`kafka:"min=v2,max=v3,tag"`
27+
28+
ResourceTypeint8`kafka:"min=v0,max=v3"`
29+
ResourceNamestring`kafka:"min=v0,max=v1|min=v2,max=v3,compact"`
30+
ResourcePatternTypeint8`kafka:"min=v1,max=v3"`
31+
Principalstring`kafka:"min=v0,max=v1|min=v2,max=v3,compact"`
32+
Hoststring`kafka:"min=v0,max=v1|min=v2,max=v3,compact"`
33+
Operationint8`kafka:"min=v0,max=v3"`
34+
PermissionTypeint8`kafka:"min=v0,max=v3"`
3135
}
3236

3337
typeResponsestruct {
3438
// We need at least one tagged field to indicate that v2+ uses "flexible"
3539
// messages.
36-
_struct{}`kafka:"min=v2,max=v2,tag"`
40+
_struct{}`kafka:"min=v2,max=v3,tag"`
3741

38-
ThrottleTimeMsint32`kafka:"min=v0,max=v2"`
39-
Results []ResponseACLs`kafka:"min=v0,max=v2"`
42+
ThrottleTimeMsint32`kafka:"min=v0,max=v3"`
43+
Results []ResponseACLs`kafka:"min=v0,max=v3"`
4044
}
4145

4246
func (r*Response)ApiKey() protocol.ApiKey {returnprotocol.CreateAcls }
4347

4448
typeResponseACLsstruct {
45-
ErrorCodeint16`kafka:"min=v0,max=v2"`
46-
ErrorMessagestring`kafka:"min=v0,max=v2,nullable"`
49+
// We need at least one tagged field to indicate that v2+ uses "flexible"
50+
// messages.
51+
_struct{}`kafka:"min=v2,max=v3,tag"`
52+
53+
ErrorCodeint16`kafka:"min=v0,max=v3"`
54+
ErrorMessagestring`kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"`
4755
}
4856

4957
var_ protocol.BrokerMessage= (*Request)(nil)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp