- Notifications
You must be signed in to change notification settings - Fork1k
fix(coderd): ensure agent WebSocket conn is cleaned up#19711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
35a7440
5997a47
c881ccb
1c4204c
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -59,10 +59,145 @@ func (fakeAgentProvider) Close() error { | ||
return nil | ||
} | ||
type channelCloser struct { | ||
closeFn func() | ||
} | ||
func (c *channelCloser) Close() error { | ||
c.closeFn() | ||
return nil | ||
} | ||
func TestWatchAgentContainers(t *testing.T) { | ||
t.Parallel() | ||
t.Run("CoderdWebSocketCanHandleClientClosing", func(t *testing.T) { | ||
t.Parallel() | ||
// This test ensures that the agent containers `/watch` websocket can gracefully | ||
// handle the client websocket closing. This test was created in | ||
// response to this issue: https://github.com/coder/coder/issues/19449 | ||
var ( | ||
ctx = testutil.Context(t, testutil.WaitLong) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Unfortunately this test uses There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. That's unfortunate 😔. Perhaps we should have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. I definitely agree although I think that is out-of-scope for this PR. Should we create an issue to track that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. We could do something like typeHeartbeatOptionfunc(...)funcWithHeartbeatInterval(d time.Duration)HeartbeatOptionfuncHeartbeat(ctx context.Context,conn*websocket.Conn,opts...HeartbeatOption) {...} It might be worthwhile also unexporting | ||
logger = slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug).Named("coderd") | ||
mCtrl = gomock.NewController(t) | ||
mDB = dbmock.NewMockStore(mCtrl) | ||
mCoordinator = tailnettest.NewMockCoordinator(mCtrl) | ||
mAgentConn = agentconnmock.NewMockAgentConn(mCtrl) | ||
fAgentProvider = fakeAgentProvider{ | ||
agentConn: func(ctx context.Context, agentID uuid.UUID) (_ workspacesdk.AgentConn, release func(), _ error) { | ||
return mAgentConn, func() {}, nil | ||
}, | ||
} | ||
workspaceID = uuid.New() | ||
agentID = uuid.New() | ||
resourceID = uuid.New() | ||
jobID = uuid.New() | ||
buildID = uuid.New() | ||
containersCh = make(chan codersdk.WorkspaceAgentListContainersResponse) | ||
r = chi.NewMux() | ||
api = API{ | ||
ctx: ctx, | ||
Options: &Options{ | ||
AgentInactiveDisconnectTimeout: testutil.WaitShort, | ||
Database: mDB, | ||
Logger: logger, | ||
DeploymentValues: &codersdk.DeploymentValues{}, | ||
TailnetCoordinator: tailnettest.NewFakeCoordinator(), | ||
}, | ||
} | ||
) | ||
var tailnetCoordinator tailnet.Coordinator = mCoordinator | ||
api.TailnetCoordinator.Store(&tailnetCoordinator) | ||
api.agentProvider = fAgentProvider | ||
// Setup: Allow `ExtractWorkspaceAgentParams` to complete. | ||
mDB.EXPECT().GetWorkspaceAgentByID(gomock.Any(), agentID).Return(database.WorkspaceAgent{ | ||
ID: agentID, | ||
ResourceID: resourceID, | ||
LifecycleState: database.WorkspaceAgentLifecycleStateReady, | ||
FirstConnectedAt: sql.NullTime{Valid: true, Time: dbtime.Now()}, | ||
LastConnectedAt: sql.NullTime{Valid: true, Time: dbtime.Now()}, | ||
}, nil) | ||
mDB.EXPECT().GetWorkspaceResourceByID(gomock.Any(), resourceID).Return(database.WorkspaceResource{ | ||
ID: resourceID, | ||
JobID: jobID, | ||
}, nil) | ||
mDB.EXPECT().GetProvisionerJobByID(gomock.Any(), jobID).Return(database.ProvisionerJob{ | ||
ID: jobID, | ||
Type: database.ProvisionerJobTypeWorkspaceBuild, | ||
}, nil) | ||
mDB.EXPECT().GetWorkspaceBuildByJobID(gomock.Any(), jobID).Return(database.WorkspaceBuild{ | ||
WorkspaceID: workspaceID, | ||
ID: buildID, | ||
}, nil) | ||
// And: Allow `db2dsk.WorkspaceAgent` to complete. | ||
mCoordinator.EXPECT().Node(gomock.Any()).Return(nil) | ||
// And: Allow `WatchContainers` to be called, returing our `containersCh` channel. | ||
mAgentConn.EXPECT().WatchContainers(gomock.Any(), gomock.Any()). | ||
DoAndReturn(func(_ context.Context, _ slog.Logger) (<-chan codersdk.WorkspaceAgentListContainersResponse, io.Closer, error) { | ||
return containersCh, &channelCloser{closeFn: func() { | ||
close(containersCh) | ||
}}, nil | ||
}) | ||
// And: We mount the HTTP Handler | ||
r.With(httpmw.ExtractWorkspaceAgentParam(mDB)). | ||
Get("/workspaceagents/{workspaceagent}/containers/watch", api.watchWorkspaceAgentContainers) | ||
// Given: We create the HTTP server | ||
srv := httptest.NewServer(r) | ||
defer srv.Close() | ||
// And: Dial the WebSocket | ||
wsURL := strings.Replace(srv.URL, "http://", "ws://", 1) | ||
conn, resp, err := websocket.Dial(ctx, fmt.Sprintf("%s/workspaceagents/%s/containers/watch", wsURL, agentID), nil) | ||
require.NoError(t, err) | ||
if resp.Body != nil { | ||
defer resp.Body.Close() | ||
} | ||
// And: Create a streaming decoder | ||
decoder := wsjson.NewDecoder[codersdk.WorkspaceAgentListContainersResponse](conn, websocket.MessageText, logger) | ||
defer decoder.Close() | ||
decodeCh := decoder.Chan() | ||
// And: We can successfully send through the channel. | ||
testutil.RequireSend(ctx, t, containersCh, codersdk.WorkspaceAgentListContainersResponse{ | ||
Containers: []codersdk.WorkspaceAgentContainer{{ | ||
ID: "test-container-id", | ||
}}, | ||
}) | ||
// And: Receive the data. | ||
containerResp := testutil.RequireReceive(ctx, t, decodeCh) | ||
require.Len(t, containerResp.Containers, 1) | ||
require.Equal(t, "test-container-id", containerResp.Containers[0].ID) | ||
// When: We close the WebSocket | ||
conn.Close(websocket.StatusNormalClosure, "test closing connection") | ||
// Then: We expect `containersCh` to be closed. | ||
select { | ||
case <-ctx.Done(): | ||
t.Fail() | ||
case _, ok := <-containersCh: | ||
require.False(t, ok, "channel is expected to be closed") | ||
} | ||
}) | ||
t.Run("CoderdWebSocketCanHandleAgentClosing", func(t *testing.T) { | ||
t.Parallel() | ||
// This test ensures that the agent containers `/watch` websocket can gracefully | ||
Uh oh!
There was an error while loading.Please reload this page.