Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb79785c

Browse files
authored
feat: move agent v2 API connection monitoring to yamux layer (#11910)
Moves monitoring of the agent v2 API connection to the yamux layer.Present behavior monitors this at the websocket layer, and closes the websocket on completion. This can cause yamux to hit unexpected errors since the connection is closed underneath it.This might be the cause of yamux errors that some customers are seeing![image.png](https://graphite-user-uploaded-assets-prod.s3.amazonaws.com/tCz4CxRU9jhAJ7zH8RTi/53b8b5ef-e9e5-44a5-b559-99c37c136071.png)In any case, it's more graceful to close yamux first and let yamux close the underlying websocket. That should limit yamux error logging to truly unexpected/error cases.The only downside is that the yamux `Close()` doesn't accept a reason, so if the agent becomes outdated and we close the API connection, the agent just sees the connection close without a reason. I'm not sure we log this at the agent anyway, but it would be nice. I think more accurate logging on Coderd are more important.I've also added some logging when the monitor disconnects for reasons other than the context being canceled (e.g. agent outdated, failed pings).
1 parent13e214f commitb79785c

File tree

2 files changed

+71
-23
lines changed

2 files changed

+71
-23
lines changed

‎coderd/workspaceagentsrpc.go‎

Lines changed: 61 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,9 @@ func (api *API) workspaceAgentRPC(rw http.ResponseWriter, r *http.Request) {
113113
)
114114
api.Logger.Debug(ctx,"accepting agent details",slog.F("agent",workspaceAgent))
115115

116-
deferconn.Close(websocket.StatusNormalClosure,"")
117-
118116
closeCtx,closeCtxCancel:=context.WithCancel(ctx)
119117
defercloseCtxCancel()
120-
monitor:=api.startAgentWebsocketMonitor(closeCtx,workspaceAgent,build,conn)
118+
monitor:=api.startAgentYamuxMonitor(closeCtx,workspaceAgent,build,mux)
121119
defermonitor.close()
122120

123121
agentAPI:=agentapi.New(agentapi.Options{
@@ -214,8 +212,8 @@ func checkBuildIsLatest(ctx context.Context, db database.Store, build database.W
214212
func (api*API)startAgentWebsocketMonitor(ctx context.Context,
215213
workspaceAgent database.WorkspaceAgent,workspaceBuild database.WorkspaceBuild,
216214
conn*websocket.Conn,
217-
)*agentWebsocketMonitor {
218-
monitor:=&agentWebsocketMonitor{
215+
)*agentConnectionMonitor {
216+
monitor:=&agentConnectionMonitor{
219217
apiCtx:api.ctx,
220218
workspaceAgent:workspaceAgent,
221219
workspaceBuild:workspaceBuild,
@@ -236,6 +234,53 @@ func (api *API) startAgentWebsocketMonitor(ctx context.Context,
236234
returnmonitor
237235
}
238236

237+
typeyamuxPingerCloserstruct {
238+
mux*yamux.Session
239+
}
240+
241+
func (y*yamuxPingerCloser)Close(websocket.StatusCode,string)error {
242+
returny.mux.Close()
243+
}
244+
245+
func (y*yamuxPingerCloser)Ping(ctx context.Context)error {
246+
errCh:=make(chanerror,1)
247+
gofunc() {
248+
_,err:=y.mux.Ping()
249+
errCh<-err
250+
}()
251+
select {
252+
case<-ctx.Done():
253+
returnctx.Err()
254+
caseerr:=<-errCh:
255+
returnerr
256+
}
257+
}
258+
259+
func (api*API)startAgentYamuxMonitor(ctx context.Context,
260+
workspaceAgent database.WorkspaceAgent,workspaceBuild database.WorkspaceBuild,
261+
mux*yamux.Session,
262+
)*agentConnectionMonitor {
263+
monitor:=&agentConnectionMonitor{
264+
apiCtx:api.ctx,
265+
workspaceAgent:workspaceAgent,
266+
workspaceBuild:workspaceBuild,
267+
conn:&yamuxPingerCloser{mux:mux},
268+
pingPeriod:api.AgentConnectionUpdateFrequency,
269+
db:api.Database,
270+
replicaID:api.ID,
271+
updater:api,
272+
disconnectTimeout:api.AgentInactiveDisconnectTimeout,
273+
logger:api.Logger.With(
274+
slog.F("workspace_id",workspaceBuild.WorkspaceID),
275+
slog.F("agent_id",workspaceAgent.ID),
276+
),
277+
}
278+
monitor.init()
279+
monitor.start(ctx)
280+
281+
returnmonitor
282+
}
283+
239284
typeworkspaceUpdaterinterface {
240285
publishWorkspaceUpdate(ctx context.Context,workspaceID uuid.UUID)
241286
}
@@ -245,7 +290,7 @@ type pingerCloser interface {
245290
Close(code websocket.StatusCode,reasonstring)error
246291
}
247292

248-
typeagentWebsocketMonitorstruct {
293+
typeagentConnectionMonitorstruct {
249294
apiCtx context.Context
250295
cancel context.CancelFunc
251296
wg sync.WaitGroup
@@ -272,7 +317,7 @@ type agentWebsocketMonitor struct {
272317
//
273318
// We use a custom heartbeat routine here instead of `httpapi.Heartbeat`
274319
// because we want to log the agent's last ping time.
275-
func (m*agentWebsocketMonitor)sendPings(ctx context.Context) {
320+
func (m*agentConnectionMonitor)sendPings(ctx context.Context) {
276321
t:=time.NewTicker(m.pingPeriod)
277322
defert.Stop()
278323

@@ -295,7 +340,7 @@ func (m *agentWebsocketMonitor) sendPings(ctx context.Context) {
295340
}
296341
}
297342

298-
func (m*agentWebsocketMonitor)updateConnectionTimes(ctx context.Context)error {
343+
func (m*agentConnectionMonitor)updateConnectionTimes(ctx context.Context)error {
299344
//nolint:gocritic // We only update the agent we are minding.
300345
err:=m.db.UpdateWorkspaceAgentConnectionByID(dbauthz.AsSystemRestricted(ctx), database.UpdateWorkspaceAgentConnectionByIDParams{
301346
ID:m.workspaceAgent.ID,
@@ -314,7 +359,7 @@ func (m *agentWebsocketMonitor) updateConnectionTimes(ctx context.Context) error
314359
returnnil
315360
}
316361

317-
func (m*agentWebsocketMonitor)init() {
362+
func (m*agentConnectionMonitor)init() {
318363
now:=dbtime.Now()
319364
m.firstConnectedAt=m.workspaceAgent.FirstConnectedAt
320365
if!m.firstConnectedAt.Valid {
@@ -331,7 +376,7 @@ func (m *agentWebsocketMonitor) init() {
331376
m.lastPing.Store(ptr.Ref(time.Now()))// Since the agent initiated the request, assume it's alive.
332377
}
333378

334-
func (m*agentWebsocketMonitor)start(ctx context.Context) {
379+
func (m*agentConnectionMonitor)start(ctx context.Context) {
335380
ctx,m.cancel=context.WithCancel(ctx)
336381
m.wg.Add(2)
337382
gopprof.Do(ctx,pprof.Labels("agent",m.workspaceAgent.ID.String()),
@@ -346,7 +391,7 @@ func (m *agentWebsocketMonitor) start(ctx context.Context) {
346391
})
347392
}
348393

349-
func (m*agentWebsocketMonitor)monitor(ctx context.Context) {
394+
func (m*agentConnectionMonitor)monitor(ctx context.Context) {
350395
deferfunc() {
351396
// If connection closed then context will be canceled, try to
352397
// ensure our final update is sent. By waiting at most the agent
@@ -384,7 +429,7 @@ func (m *agentWebsocketMonitor) monitor(ctx context.Context) {
384429
}()
385430
reason:="disconnect"
386431
deferfunc() {
387-
m.logger.Debug(ctx,"agentwebsocket monitor is closing connection",
432+
m.logger.Debug(ctx,"agentconnection monitor is closing connection",
388433
slog.F("reason",reason))
389434
_=m.conn.Close(websocket.StatusGoingAway,reason)
390435
}()
@@ -409,6 +454,7 @@ func (m *agentWebsocketMonitor) monitor(ctx context.Context) {
409454
lastPing:=*m.lastPing.Load()
410455
iftime.Since(lastPing)>m.disconnectTimeout {
411456
reason="ping timeout"
457+
m.logger.Warn(ctx,"connection to agent timed out")
412458
return
413459
}
414460
connectionStatusChanged:=m.disconnectedAt.Valid
@@ -421,6 +467,7 @@ func (m *agentWebsocketMonitor) monitor(ctx context.Context) {
421467
err=m.updateConnectionTimes(ctx)
422468
iferr!=nil {
423469
reason=err.Error()
470+
m.logger.Error(ctx,"failed to update agent connection times",slog.Error(err))
424471
return
425472
}
426473
ifconnectionStatusChanged {
@@ -429,12 +476,13 @@ func (m *agentWebsocketMonitor) monitor(ctx context.Context) {
429476
err=checkBuildIsLatest(ctx,m.db,m.workspaceBuild)
430477
iferr!=nil {
431478
reason=err.Error()
479+
m.logger.Info(ctx,"disconnected possibly outdated agent",slog.Error(err))
432480
return
433481
}
434482
}
435483
}
436484

437-
func (m*agentWebsocketMonitor)close() {
485+
func (m*agentConnectionMonitor)close() {
438486
m.cancel()
439487
m.wg.Wait()
440488
}

‎coderd/workspaceagentsrpc_internal_test.go‎

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
"github.com/coder/coder/v2/testutil"
2424
)
2525

26-
funcTestAgentWebsocketMonitor_ContextCancel(t*testing.T) {
26+
funcTestAgentConnectionMonitor_ContextCancel(t*testing.T) {
2727
t.Parallel()
2828
ctx:=testutil.Context(t,testutil.WaitShort)
2929
now:=dbtime.Now()
@@ -45,7 +45,7 @@ func TestAgentWebsocketMonitor_ContextCancel(t *testing.T) {
4545
}
4646
replicaID:=uuid.New()
4747

48-
uut:=&agentWebsocketMonitor{
48+
uut:=&agentConnectionMonitor{
4949
apiCtx:ctx,
5050
workspaceAgent:agent,
5151
workspaceBuild:build,
@@ -97,7 +97,7 @@ func TestAgentWebsocketMonitor_ContextCancel(t *testing.T) {
9797
require.Greater(t,m,n)
9898
}
9999

100-
funcTestAgentWebsocketMonitor_PingTimeout(t*testing.T) {
100+
funcTestAgentConnectionMonitor_PingTimeout(t*testing.T) {
101101
t.Parallel()
102102
ctx:=testutil.Context(t,testutil.WaitShort)
103103
now:=dbtime.Now()
@@ -119,7 +119,7 @@ func TestAgentWebsocketMonitor_PingTimeout(t *testing.T) {
119119
}
120120
replicaID:=uuid.New()
121121

122-
uut:=&agentWebsocketMonitor{
122+
uut:=&agentConnectionMonitor{
123123
apiCtx:ctx,
124124
workspaceAgent:agent,
125125
workspaceBuild:build,
@@ -157,7 +157,7 @@ func TestAgentWebsocketMonitor_PingTimeout(t *testing.T) {
157157
fUpdater.requireEventuallySomeUpdates(t,build.WorkspaceID)
158158
}
159159

160-
funcTestAgentWebsocketMonitor_BuildOutdated(t*testing.T) {
160+
funcTestAgentConnectionMonitor_BuildOutdated(t*testing.T) {
161161
t.Parallel()
162162
ctx:=testutil.Context(t,testutil.WaitShort)
163163
now:=dbtime.Now()
@@ -179,7 +179,7 @@ func TestAgentWebsocketMonitor_BuildOutdated(t *testing.T) {
179179
}
180180
replicaID:=uuid.New()
181181

182-
uut:=&agentWebsocketMonitor{
182+
uut:=&agentConnectionMonitor{
183183
apiCtx:ctx,
184184
workspaceAgent:agent,
185185
workspaceBuild:build,
@@ -217,12 +217,12 @@ func TestAgentWebsocketMonitor_BuildOutdated(t *testing.T) {
217217
fUpdater.requireEventuallySomeUpdates(t,build.WorkspaceID)
218218
}
219219

220-
funcTestAgentWebsocketMonitor_SendPings(t*testing.T) {
220+
funcTestAgentConnectionMonitor_SendPings(t*testing.T) {
221221
t.Parallel()
222222
ctx,cancel:=context.WithTimeout(context.Background(),testutil.WaitShort)
223223
t.Cleanup(cancel)
224224
fConn:=&fakePingerCloser{}
225-
uut:=&agentWebsocketMonitor{
225+
uut:=&agentConnectionMonitor{
226226
pingPeriod:testutil.IntervalFast,
227227
conn:fConn,
228228
}
@@ -238,7 +238,7 @@ func TestAgentWebsocketMonitor_SendPings(t *testing.T) {
238238
require.NotNil(t,lastPing)
239239
}
240240

241-
funcTestAgentWebsocketMonitor_StartClose(t*testing.T) {
241+
funcTestAgentConnectionMonitor_StartClose(t*testing.T) {
242242
t.Parallel()
243243
ctx:=testutil.Context(t,testutil.WaitShort)
244244
fConn:=&fakePingerCloser{}
@@ -259,7 +259,7 @@ func TestAgentWebsocketMonitor_StartClose(t *testing.T) {
259259
WorkspaceID:uuid.New(),
260260
}
261261
replicaID:=uuid.New()
262-
uut:=&agentWebsocketMonitor{
262+
uut:=&agentConnectionMonitor{
263263
apiCtx:ctx,
264264
workspaceAgent:agent,
265265
workspaceBuild:build,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp