- Notifications
You must be signed in to change notification settings - Fork1k
fix(coderd): ensure agent WebSocket conn is cleaned up#19711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Changes fromall commits
35a7440
5997a47
c881ccb
1c4204c
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -59,10 +59,145 @@ func (fakeAgentProvider) Close() error { | ||
returnnil | ||
} | ||
typechannelCloserstruct { | ||
closeFnfunc() | ||
} | ||
func (c*channelCloser)Close()error { | ||
c.closeFn() | ||
returnnil | ||
} | ||
funcTestWatchAgentContainers(t*testing.T) { | ||
t.Parallel() | ||
t.Run("CoderdWebSocketCanHandleClientClosing",func(t*testing.T) { | ||
t.Parallel() | ||
// This test ensures that the agent containers `/watch` websocket can gracefully | ||
// handle the client websocket closing. This test was created in | ||
// response to this issue: https://github.com/coder/coder/issues/19449 | ||
var ( | ||
ctx=testutil.Context(t,testutil.WaitLong) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Unfortunately this test uses There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. That's unfortunate 😔. Perhaps we should have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. I definitely agree although I think that is out-of-scope for this PR. Should we create an issue to track that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. We could do something like typeHeartbeatOptionfunc(...)funcWithHeartbeatInterval(d time.Duration)HeartbeatOptionfuncHeartbeat(ctx context.Context,conn*websocket.Conn,opts...HeartbeatOption) {...} It might be worthwhile also unexporting | ||
logger=slogtest.Make(t,&slogtest.Options{IgnoreErrors:true}).Leveled(slog.LevelDebug).Named("coderd") | ||
mCtrl=gomock.NewController(t) | ||
mDB=dbmock.NewMockStore(mCtrl) | ||
mCoordinator=tailnettest.NewMockCoordinator(mCtrl) | ||
mAgentConn=agentconnmock.NewMockAgentConn(mCtrl) | ||
fAgentProvider=fakeAgentProvider{ | ||
agentConn:func(ctx context.Context,agentID uuid.UUID) (_ workspacesdk.AgentConn,releasefunc(),_error) { | ||
returnmAgentConn,func() {},nil | ||
}, | ||
} | ||
workspaceID=uuid.New() | ||
agentID=uuid.New() | ||
resourceID=uuid.New() | ||
jobID=uuid.New() | ||
buildID=uuid.New() | ||
containersCh=make(chan codersdk.WorkspaceAgentListContainersResponse) | ||
r=chi.NewMux() | ||
api=API{ | ||
ctx:ctx, | ||
Options:&Options{ | ||
AgentInactiveDisconnectTimeout:testutil.WaitShort, | ||
Database:mDB, | ||
Logger:logger, | ||
DeploymentValues:&codersdk.DeploymentValues{}, | ||
TailnetCoordinator:tailnettest.NewFakeCoordinator(), | ||
}, | ||
} | ||
) | ||
vartailnetCoordinator tailnet.Coordinator=mCoordinator | ||
api.TailnetCoordinator.Store(&tailnetCoordinator) | ||
api.agentProvider=fAgentProvider | ||
// Setup: Allow `ExtractWorkspaceAgentParams` to complete. | ||
mDB.EXPECT().GetWorkspaceAgentByID(gomock.Any(),agentID).Return(database.WorkspaceAgent{ | ||
ID:agentID, | ||
ResourceID:resourceID, | ||
LifecycleState:database.WorkspaceAgentLifecycleStateReady, | ||
FirstConnectedAt: sql.NullTime{Valid:true,Time:dbtime.Now()}, | ||
LastConnectedAt: sql.NullTime{Valid:true,Time:dbtime.Now()}, | ||
},nil) | ||
mDB.EXPECT().GetWorkspaceResourceByID(gomock.Any(),resourceID).Return(database.WorkspaceResource{ | ||
ID:resourceID, | ||
JobID:jobID, | ||
},nil) | ||
mDB.EXPECT().GetProvisionerJobByID(gomock.Any(),jobID).Return(database.ProvisionerJob{ | ||
ID:jobID, | ||
Type:database.ProvisionerJobTypeWorkspaceBuild, | ||
},nil) | ||
mDB.EXPECT().GetWorkspaceBuildByJobID(gomock.Any(),jobID).Return(database.WorkspaceBuild{ | ||
WorkspaceID:workspaceID, | ||
ID:buildID, | ||
},nil) | ||
// And: Allow `db2dsk.WorkspaceAgent` to complete. | ||
mCoordinator.EXPECT().Node(gomock.Any()).Return(nil) | ||
// And: Allow `WatchContainers` to be called, returing our `containersCh` channel. | ||
mAgentConn.EXPECT().WatchContainers(gomock.Any(),gomock.Any()). | ||
DoAndReturn(func(_ context.Context,_ slog.Logger) (<-chan codersdk.WorkspaceAgentListContainersResponse, io.Closer,error) { | ||
returncontainersCh,&channelCloser{closeFn:func() { | ||
close(containersCh) | ||
}},nil | ||
}) | ||
// And: We mount the HTTP Handler | ||
r.With(httpmw.ExtractWorkspaceAgentParam(mDB)). | ||
Get("/workspaceagents/{workspaceagent}/containers/watch",api.watchWorkspaceAgentContainers) | ||
// Given: We create the HTTP server | ||
srv:=httptest.NewServer(r) | ||
defersrv.Close() | ||
// And: Dial the WebSocket | ||
wsURL:=strings.Replace(srv.URL,"http://","ws://",1) | ||
conn,resp,err:=websocket.Dial(ctx,fmt.Sprintf("%s/workspaceagents/%s/containers/watch",wsURL,agentID),nil) | ||
require.NoError(t,err) | ||
ifresp.Body!=nil { | ||
deferresp.Body.Close() | ||
} | ||
// And: Create a streaming decoder | ||
decoder:=wsjson.NewDecoder[codersdk.WorkspaceAgentListContainersResponse](conn,websocket.MessageText,logger) | ||
deferdecoder.Close() | ||
decodeCh:=decoder.Chan() | ||
// And: We can successfully send through the channel. | ||
testutil.RequireSend(ctx,t,containersCh, codersdk.WorkspaceAgentListContainersResponse{ | ||
Containers: []codersdk.WorkspaceAgentContainer{{ | ||
ID:"test-container-id", | ||
}}, | ||
}) | ||
// And: Receive the data. | ||
containerResp:=testutil.RequireReceive(ctx,t,decodeCh) | ||
require.Len(t,containerResp.Containers,1) | ||
require.Equal(t,"test-container-id",containerResp.Containers[0].ID) | ||
// When: We close the WebSocket | ||
conn.Close(websocket.StatusNormalClosure,"test closing connection") | ||
// Then: We expect `containersCh` to be closed. | ||
select { | ||
case<-ctx.Done(): | ||
t.Fail() | ||
case_,ok:=<-containersCh: | ||
require.False(t,ok,"channel is expected to be closed") | ||
} | ||
}) | ||
t.Run("CoderdWebSocketCanHandleAgentClosing",func(t*testing.T) { | ||
t.Parallel() | ||
// This test ensures that the agent containers `/watch` websocket can gracefully | ||
Uh oh!
There was an error while loading.Please reload this page.