@@ -133,16 +133,15 @@ const (
133133ReadCommitted IsolationLevel = 1
134134)
135135
136- var (
137- // DefaultClientID is the default value used as ClientID of kafka
138- // connections.
139- DefaultClientID string
140- )
136+ // DefaultClientID is the default value used as ClientID of kafka
137+ // connections.
138+ var DefaultClientID string
141139
142140func init () {
143141progname := filepath .Base (os .Args [0 ])
144142hostname ,_ := os .Hostname ()
145143DefaultClientID = fmt .Sprintf ("%s@%s (github.com/segmentio/kafka-go)" ,progname ,hostname )
144+ DefaultTransport .(* Transport ).ClientID = DefaultClientID
146145}
147146
148147// NewConn returns a new kafka connection for the given topic and partition.
@@ -263,10 +262,12 @@ func (c *Conn) Controller() (broker Broker, err error) {
263262}
264263for _ ,brokerMeta := range res .Brokers {
265264if brokerMeta .NodeID == res .ControllerID {
266- broker = Broker {ID :int (brokerMeta .NodeID ),
265+ broker = Broker {
266+ ID :int (brokerMeta .NodeID ),
267267Port :int (brokerMeta .Port ),
268268Host :brokerMeta .Host ,
269- Rack :brokerMeta .Rack }
269+ Rack :brokerMeta .Rack ,
270+ }
270271break
271272}
272273}
@@ -322,7 +323,6 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
322323err := c .readOperation (
323324func (deadline time.Time ,id int32 )error {
324325return c .writeRequest (findCoordinator ,v0 ,id ,request )
325-
326326},
327327func (deadline time.Time ,size int )error {
328328return expectZeroSize (func () (remain int ,err error ) {
@@ -340,32 +340,6 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
340340return response ,nil
341341}
342342
343- // heartbeat sends a heartbeat message required by consumer groups
344- //
345- // See http://kafka.apache.org/protocol.html#The_Messages_Heartbeat
346- func (c * Conn )heartbeat (request heartbeatRequestV0 ) (heartbeatResponseV0 ,error ) {
347- var response heartbeatResponseV0
348-
349- err := c .writeOperation (
350- func (deadline time.Time ,id int32 )error {
351- return c .writeRequest (heartbeat ,v0 ,id ,request )
352- },
353- func (deadline time.Time ,size int )error {
354- return expectZeroSize (func () (remain int ,err error ) {
355- return (& response ).readFrom (& c .rbuf ,size )
356- }())
357- },
358- )
359- if err != nil {
360- return heartbeatResponseV0 {},err
361- }
362- if response .ErrorCode != 0 {
363- return heartbeatResponseV0 {},Error (response .ErrorCode )
364- }
365-
366- return response ,nil
367- }
368-
369343// joinGroup attempts to join a consumer group
370344//
371345// See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
@@ -752,9 +726,8 @@ func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch {
752726// ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured
753727// with the default values in ReadBatchConfig except for minBytes and maxBytes.
754728func (c * Conn )ReadBatchWith (cfg ReadBatchConfig )* Batch {
755-
756729var adjustedDeadline time.Time
757- var maxFetch = int (c .fetchMaxBytes )
730+ maxFetch : =int (c .fetchMaxBytes )
758731
759732if cfg .MinBytes < 0 || cfg .MinBytes > maxFetch {
760733return & Batch {err :fmt .Errorf ("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds" ,cfg .MinBytes ,maxFetch )}
@@ -960,7 +933,6 @@ func (c *Conn) readOffset(t int64) (offset int64, err error) {
960933// connection. If there are none, the method fetches all partitions of the kafka
961934// cluster.
962935func (c * Conn )ReadPartitions (topics ... string ) (partitions []Partition ,err error ) {
963-
964936if len (topics )== 0 {
965937if len (c .topic )!= 0 {
966938defaultTopics := [... ]string {c .topic }
@@ -1107,11 +1079,10 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
11071079deadline = adjustDeadlineForRTT (deadline ,now ,defaultRTT )
11081080switch produceVersion {
11091081case v7 :
1110- recordBatch ,err :=
1111- newRecordBatch (
1112- codec ,
1113- msgs ... ,
1114- )
1082+ recordBatch ,err := newRecordBatch (
1083+ codec ,
1084+ msgs ... ,
1085+ )
11151086if err != nil {
11161087return err
11171088}
@@ -1126,11 +1097,10 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
11261097recordBatch ,
11271098)
11281099case v3 :
1129- recordBatch ,err :=
1130- newRecordBatch (
1131- codec ,
1132- msgs ... ,
1133- )
1100+ recordBatch ,err := newRecordBatch (
1101+ codec ,
1102+ msgs ... ,
1103+ )
11341104if err != nil {
11351105return err
11361106}
@@ -1195,7 +1165,6 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
11951165}
11961166return size ,err
11971167}
1198-
11991168})
12001169if err != nil {
12011170return size ,err
@@ -1555,7 +1524,7 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {
15551524return nil ,err
15561525}
15571526if version == v1 {
1558- var request = saslAuthenticateRequestV0 {Data :data }
1527+ request : =saslAuthenticateRequestV0 {Data :data }
15591528var response saslAuthenticateResponseV0
15601529
15611530err := c .writeOperation (