Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitdc3d39b

Browse files
authored
fix: agent disconnects from coordinator (#7430)
* work around websocket deadline bugSigned-off-by: Spike Curtis <spike@coder.com>* Use test context to hold websocket openSigned-off-by: Spike Curtis <spike@coder.com>* Fix race creating test websocketSigned-off-by: Spike Curtis <spike@coder.com>* set write deadline to time.Time zeroSigned-off-by: Spike Curtis <spike@coder.com>---------Signed-off-by: Spike Curtis <spike@coder.com>
1 parent5ffa6da commitdc3d39b

File tree

2 files changed

+67
-7
lines changed

2 files changed

+67
-7
lines changed

‎tailnet/coordinator.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,10 @@ func (t *TrackedConn) Close() error {
206206
returnt.conn.Close()
207207
}
208208

209+
// WriteTimeout is the amount of time we wait to write a node update to a connection before we declare it hung.
210+
// It is exported so that tests can use it.
211+
constWriteTimeout=time.Second*5
212+
209213
// SendUpdates reads node updates and writes them to the connection. Ends when writes hit an error or context is
210214
// canceled.
211215
func (t*TrackedConn)SendUpdates() {
@@ -223,7 +227,7 @@ func (t *TrackedConn) SendUpdates() {
223227

224228
// Set a deadline so that hung connections don't put back pressure on the system.
225229
// Node updates are tiny, so even the dinkiest connection can handle them if it's not hung.
226-
err=t.conn.SetWriteDeadline(time.Now().Add(5*time.Second))
230+
err=t.conn.SetWriteDeadline(time.Now().Add(WriteTimeout))
227231
iferr!=nil {
228232
// often, this is just because the connection is closed/broken, so only log at debug.
229233
t.logger.Debug(t.ctx,"unable to set write deadline",slog.Error(err))
@@ -238,6 +242,19 @@ func (t *TrackedConn) SendUpdates() {
238242
return
239243
}
240244
t.logger.Debug(t.ctx,"wrote nodes",slog.F("nodes",nodes))
245+
246+
// nhooyr.io/websocket has a bugged implementation of deadlines on a websocket net.Conn. What they are
247+
// *supposed* to do is set a deadline for any subsequent writes to complete, otherwise the call to Write()
248+
// fails. What nhooyr.io/websocket does is set a timer, after which it expires the websocket write context.
249+
// If this timer fires, then the next write will fail *even if we set a new write deadline*. So, after
250+
// our successful write, it is important that we reset the deadline before it fires.
251+
err=t.conn.SetWriteDeadline(time.Time{})
252+
iferr!=nil {
253+
// often, this is just because the connection is closed/broken, so only log at debug.
254+
t.logger.Debug(t.ctx,"unable to extend write deadline",slog.Error(err))
255+
_=t.Close()
256+
return
257+
}
241258
}
242259
}
243260
}

‎tailnet/coordinator_test.go

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
package tailnet_test
22

33
import (
4+
"context"
45
"encoding/json"
56
"net"
7+
"net/http"
8+
"net/http/httptest"
69
"testing"
710
"time"
811

12+
"nhooyr.io/websocket"
13+
914
"cdr.dev/slog"
1015
"cdr.dev/slog/sloggers/slogtest"
1116

@@ -74,7 +79,10 @@ func TestCoordinator(t *testing.T) {
7479
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
7580
coordinator:=tailnet.NewCoordinator(logger)
7681

77-
agentWS,agentServerWS:=net.Pipe()
82+
// in this test we use real websockets to test use of deadlines
83+
ctx,cancel:=context.WithTimeout(context.Background(),testutil.WaitSuperLong)
84+
defercancel()
85+
agentWS,agentServerWS:=websocketConn(ctx,t)
7886
deferagentWS.Close()
7987
agentNodeChan:=make(chan []*tailnet.Node)
8088
sendAgentNode,agentErrChan:=tailnet.ServeCoordinator(agentWS,func(nodes []*tailnet.Node)error {
@@ -93,7 +101,7 @@ func TestCoordinator(t *testing.T) {
93101
returncoordinator.Node(agentID)!=nil
94102
},testutil.WaitShort,testutil.IntervalFast)
95103

96-
clientWS,clientServerWS:=net.Pipe()
104+
clientWS,clientServerWS:=websocketConn(ctx,t)
97105
deferclientWS.Close()
98106
deferclientServerWS.Close()
99107
clientNodeChan:=make(chan []*tailnet.Node)
@@ -108,16 +116,28 @@ func TestCoordinator(t *testing.T) {
108116
assert.NoError(t,err)
109117
close(closeClientChan)
110118
}()
111-
agentNodes:=<-clientNodeChan
112-
require.Len(t,agentNodes,1)
119+
select {
120+
caseagentNodes:=<-clientNodeChan:
121+
require.Len(t,agentNodes,1)
122+
case<-ctx.Done():
123+
t.Fatal("timed out")
124+
}
113125
sendClientNode(&tailnet.Node{})
114126
clientNodes:=<-agentNodeChan
115127
require.Len(t,clientNodes,1)
116128

129+
// wait longer than the internal wait timeout.
130+
// this tests for regression of https://github.com/coder/coder/issues/7428
131+
time.Sleep(tailnet.WriteTimeout*3/2)
132+
117133
// Ensure an update to the agent node reaches the client!
118134
sendAgentNode(&tailnet.Node{})
119-
agentNodes=<-clientNodeChan
120-
require.Len(t,agentNodes,1)
135+
select {
136+
caseagentNodes:=<-clientNodeChan:
137+
require.Len(t,agentNodes,1)
138+
case<-ctx.Done():
139+
t.Fatal("timed out")
140+
}
121141

122142
// Close the agent WebSocket so a new one can connect.
123143
err:=agentWS.Close()
@@ -334,3 +354,26 @@ func TestCoordinator_AgentUpdateWhileClientConnects(t *testing.T) {
334354
require.Len(t,cNodes,1)
335355
require.Equal(t,1,cNodes[0].PreferredDERP)
336356
}
357+
358+
funcwebsocketConn(ctx context.Context,t*testing.T) (client net.Conn,server net.Conn) {
359+
t.Helper()
360+
sc:=make(chan net.Conn,1)
361+
s:=httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter,r*http.Request) {
362+
wss,err:=websocket.Accept(rw,r,nil)
363+
require.NoError(t,err)
364+
conn:=websocket.NetConn(r.Context(),wss,websocket.MessageBinary)
365+
sc<-conn
366+
close(sc)// there can be only one
367+
368+
// hold open until context canceled
369+
<-ctx.Done()
370+
}))
371+
t.Cleanup(s.Close)
372+
// nolint: bodyclose
373+
wsc,_,err:=websocket.Dial(ctx,s.URL,nil)
374+
require.NoError(t,err)
375+
client=websocket.NetConn(ctx,wsc,websocket.MessageBinary)
376+
server,ok:=<-sc
377+
require.True(t,ok)
378+
returnclient,server
379+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp