Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitee37c7f

Browse files
authored
Refactor Consumer Group to use Client (segmentio#947)
* refactor ConsumerGroup to use Client* add test for readers sharing the default transport
1 parentd4b89e7 commitee37c7f

File tree

11 files changed

+565
-704
lines changed

11 files changed

+565
-704
lines changed

‎conn.go‎

Lines changed: 18 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -133,16 +133,15 @@ const (
133133
ReadCommittedIsolationLevel=1
134134
)
135135

136-
var (
137-
// DefaultClientID is the default value used as ClientID of kafka
138-
// connections.
139-
DefaultClientIDstring
140-
)
136+
// DefaultClientID is the default value used as ClientID of kafka
137+
// connections.
138+
varDefaultClientIDstring
141139

142140
funcinit() {
143141
progname:=filepath.Base(os.Args[0])
144142
hostname,_:=os.Hostname()
145143
DefaultClientID=fmt.Sprintf("%s@%s (github.com/segmentio/kafka-go)",progname,hostname)
144+
DefaultTransport.(*Transport).ClientID=DefaultClientID
146145
}
147146

148147
// NewConn returns a new kafka connection for the given topic and partition.
@@ -263,10 +262,12 @@ func (c *Conn) Controller() (broker Broker, err error) {
263262
}
264263
for_,brokerMeta:=rangeres.Brokers {
265264
ifbrokerMeta.NodeID==res.ControllerID {
266-
broker=Broker{ID:int(brokerMeta.NodeID),
265+
broker=Broker{
266+
ID:int(brokerMeta.NodeID),
267267
Port:int(brokerMeta.Port),
268268
Host:brokerMeta.Host,
269-
Rack:brokerMeta.Rack}
269+
Rack:brokerMeta.Rack,
270+
}
270271
break
271272
}
272273
}
@@ -322,7 +323,6 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
322323
err:=c.readOperation(
323324
func(deadline time.Time,idint32)error {
324325
returnc.writeRequest(findCoordinator,v0,id,request)
325-
326326
},
327327
func(deadline time.Time,sizeint)error {
328328
returnexpectZeroSize(func() (remainint,errerror) {
@@ -340,32 +340,6 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
340340
returnresponse,nil
341341
}
342342

343-
// heartbeat sends a heartbeat message required by consumer groups
344-
//
345-
// See http://kafka.apache.org/protocol.html#The_Messages_Heartbeat
346-
func (c*Conn)heartbeat(requestheartbeatRequestV0) (heartbeatResponseV0,error) {
347-
varresponseheartbeatResponseV0
348-
349-
err:=c.writeOperation(
350-
func(deadline time.Time,idint32)error {
351-
returnc.writeRequest(heartbeat,v0,id,request)
352-
},
353-
func(deadline time.Time,sizeint)error {
354-
returnexpectZeroSize(func() (remainint,errerror) {
355-
return (&response).readFrom(&c.rbuf,size)
356-
}())
357-
},
358-
)
359-
iferr!=nil {
360-
returnheartbeatResponseV0{},err
361-
}
362-
ifresponse.ErrorCode!=0 {
363-
returnheartbeatResponseV0{},Error(response.ErrorCode)
364-
}
365-
366-
returnresponse,nil
367-
}
368-
369343
// joinGroup attempts to join a consumer group
370344
//
371345
// See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
@@ -752,9 +726,8 @@ func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch {
752726
// ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured
753727
// with the default values in ReadBatchConfig except for minBytes and maxBytes.
754728
func (c*Conn)ReadBatchWith(cfgReadBatchConfig)*Batch {
755-
756729
varadjustedDeadline time.Time
757-
varmaxFetch=int(c.fetchMaxBytes)
730+
maxFetch:=int(c.fetchMaxBytes)
758731

759732
ifcfg.MinBytes<0||cfg.MinBytes>maxFetch {
760733
return&Batch{err:fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds",cfg.MinBytes,maxFetch)}
@@ -960,7 +933,6 @@ func (c *Conn) readOffset(t int64) (offset int64, err error) {
960933
// connection. If there are none, the method fetches all partitions of the kafka
961934
// cluster.
962935
func (c*Conn)ReadPartitions(topics...string) (partitions []Partition,errerror) {
963-
964936
iflen(topics)==0 {
965937
iflen(c.topic)!=0 {
966938
defaultTopics:= [...]string{c.topic}
@@ -1107,11 +1079,10 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
11071079
deadline=adjustDeadlineForRTT(deadline,now,defaultRTT)
11081080
switchproduceVersion {
11091081
casev7:
1110-
recordBatch,err:=
1111-
newRecordBatch(
1112-
codec,
1113-
msgs...,
1114-
)
1082+
recordBatch,err:=newRecordBatch(
1083+
codec,
1084+
msgs...,
1085+
)
11151086
iferr!=nil {
11161087
returnerr
11171088
}
@@ -1126,11 +1097,10 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
11261097
recordBatch,
11271098
)
11281099
casev3:
1129-
recordBatch,err:=
1130-
newRecordBatch(
1131-
codec,
1132-
msgs...,
1133-
)
1100+
recordBatch,err:=newRecordBatch(
1101+
codec,
1102+
msgs...,
1103+
)
11341104
iferr!=nil {
11351105
returnerr
11361106
}
@@ -1195,7 +1165,6 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
11951165
}
11961166
returnsize,err
11971167
}
1198-
11991168
})
12001169
iferr!=nil {
12011170
returnsize,err
@@ -1555,7 +1524,7 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {
15551524
returnnil,err
15561525
}
15571526
ifversion==v1 {
1558-
varrequest=saslAuthenticateRequestV0{Data:data}
1527+
request:=saslAuthenticateRequestV0{Data:data}
15591528
varresponsesaslAuthenticateResponseV0
15601529

15611530
err:=c.writeOperation(

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp