- Notifications
You must be signed in to change notification settings - Fork1.1k
fix(coderd): ensure agent WebSocket conn is cleaned up#19711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
35a74405997a47c881ccb1c4204cFile filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -59,10 +59,145 @@ func (fakeAgentProvider) Close() error { | ||
| return nil | ||
| } | ||
| type channelCloser struct { | ||
| closeFn func() | ||
| } | ||
| func (c *channelCloser) Close() error { | ||
| c.closeFn() | ||
| return nil | ||
| } | ||
| func TestWatchAgentContainers(t *testing.T) { | ||
| t.Parallel() | ||
| t.Run("CoderdWebSocketCanHandleClientClosing", func(t *testing.T) { | ||
| t.Parallel() | ||
| // This test ensures that the agent containers `/watch` websocket can gracefully | ||
| // handle the client websocket closing. This test was created in | ||
| // response to this issue: https://github.com/coder/coder/issues/19449 | ||
| var ( | ||
| ctx = testutil.Context(t, testutil.WaitLong) | ||
ContributorAuthor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Unfortunately this test uses Member There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. That's unfortunate 😔. Perhaps we should have a ContributorAuthor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. I definitely agree although I think that is out-of-scope for this PR. Should we create an issue to track that? Member There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. We could do something like typeHeartbeatOptionfunc(...)funcWithHeartbeatInterval(d time.Duration)HeartbeatOptionfuncHeartbeat(ctx context.Context,conn*websocket.Conn,opts...HeartbeatOption) {...} It might be worthwhile also unexporting | ||
| logger = slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug).Named("coderd") | ||
| mCtrl = gomock.NewController(t) | ||
| mDB = dbmock.NewMockStore(mCtrl) | ||
| mCoordinator = tailnettest.NewMockCoordinator(mCtrl) | ||
| mAgentConn = agentconnmock.NewMockAgentConn(mCtrl) | ||
| fAgentProvider = fakeAgentProvider{ | ||
| agentConn: func(ctx context.Context, agentID uuid.UUID) (_ workspacesdk.AgentConn, release func(), _ error) { | ||
| return mAgentConn, func() {}, nil | ||
| }, | ||
| } | ||
| workspaceID = uuid.New() | ||
| agentID = uuid.New() | ||
| resourceID = uuid.New() | ||
| jobID = uuid.New() | ||
| buildID = uuid.New() | ||
| containersCh = make(chan codersdk.WorkspaceAgentListContainersResponse) | ||
| r = chi.NewMux() | ||
| api = API{ | ||
| ctx: ctx, | ||
| Options: &Options{ | ||
| AgentInactiveDisconnectTimeout: testutil.WaitShort, | ||
| Database: mDB, | ||
| Logger: logger, | ||
| DeploymentValues: &codersdk.DeploymentValues{}, | ||
| TailnetCoordinator: tailnettest.NewFakeCoordinator(), | ||
| }, | ||
| } | ||
| ) | ||
| var tailnetCoordinator tailnet.Coordinator = mCoordinator | ||
| api.TailnetCoordinator.Store(&tailnetCoordinator) | ||
| api.agentProvider = fAgentProvider | ||
| // Setup: Allow `ExtractWorkspaceAgentParams` to complete. | ||
| mDB.EXPECT().GetWorkspaceAgentByID(gomock.Any(), agentID).Return(database.WorkspaceAgent{ | ||
| ID: agentID, | ||
| ResourceID: resourceID, | ||
| LifecycleState: database.WorkspaceAgentLifecycleStateReady, | ||
| FirstConnectedAt: sql.NullTime{Valid: true, Time: dbtime.Now()}, | ||
| LastConnectedAt: sql.NullTime{Valid: true, Time: dbtime.Now()}, | ||
| }, nil) | ||
| mDB.EXPECT().GetWorkspaceResourceByID(gomock.Any(), resourceID).Return(database.WorkspaceResource{ | ||
| ID: resourceID, | ||
| JobID: jobID, | ||
| }, nil) | ||
| mDB.EXPECT().GetProvisionerJobByID(gomock.Any(), jobID).Return(database.ProvisionerJob{ | ||
| ID: jobID, | ||
| Type: database.ProvisionerJobTypeWorkspaceBuild, | ||
| }, nil) | ||
| mDB.EXPECT().GetWorkspaceBuildByJobID(gomock.Any(), jobID).Return(database.WorkspaceBuild{ | ||
| WorkspaceID: workspaceID, | ||
| ID: buildID, | ||
| }, nil) | ||
| // And: Allow `db2dsk.WorkspaceAgent` to complete. | ||
| mCoordinator.EXPECT().Node(gomock.Any()).Return(nil) | ||
| // And: Allow `WatchContainers` to be called, returing our `containersCh` channel. | ||
| mAgentConn.EXPECT().WatchContainers(gomock.Any(), gomock.Any()). | ||
| DoAndReturn(func(_ context.Context, _ slog.Logger) (<-chan codersdk.WorkspaceAgentListContainersResponse, io.Closer, error) { | ||
| return containersCh, &channelCloser{closeFn: func() { | ||
| close(containersCh) | ||
| }}, nil | ||
| }) | ||
| // And: We mount the HTTP Handler | ||
| r.With(httpmw.ExtractWorkspaceAgentParam(mDB)). | ||
| Get("/workspaceagents/{workspaceagent}/containers/watch", api.watchWorkspaceAgentContainers) | ||
| // Given: We create the HTTP server | ||
| srv := httptest.NewServer(r) | ||
| defer srv.Close() | ||
| // And: Dial the WebSocket | ||
| wsURL := strings.Replace(srv.URL, "http://", "ws://", 1) | ||
| conn, resp, err := websocket.Dial(ctx, fmt.Sprintf("%s/workspaceagents/%s/containers/watch", wsURL, agentID), nil) | ||
| require.NoError(t, err) | ||
| if resp.Body != nil { | ||
| defer resp.Body.Close() | ||
| } | ||
| // And: Create a streaming decoder | ||
| decoder := wsjson.NewDecoder[codersdk.WorkspaceAgentListContainersResponse](conn, websocket.MessageText, logger) | ||
| defer decoder.Close() | ||
| decodeCh := decoder.Chan() | ||
| // And: We can successfully send through the channel. | ||
| testutil.RequireSend(ctx, t, containersCh, codersdk.WorkspaceAgentListContainersResponse{ | ||
| Containers: []codersdk.WorkspaceAgentContainer{{ | ||
| ID: "test-container-id", | ||
| }}, | ||
| }) | ||
| // And: Receive the data. | ||
| containerResp := testutil.RequireReceive(ctx, t, decodeCh) | ||
| require.Len(t, containerResp.Containers, 1) | ||
| require.Equal(t, "test-container-id", containerResp.Containers[0].ID) | ||
| // When: We close the WebSocket | ||
| conn.Close(websocket.StatusNormalClosure, "test closing connection") | ||
| // Then: We expect `containersCh` to be closed. | ||
| select { | ||
| case <-ctx.Done(): | ||
| t.Fail() | ||
| case _, ok := <-containersCh: | ||
| require.False(t, ok, "channel is expected to be closed") | ||
| } | ||
| }) | ||
| t.Run("CoderdWebSocketCanHandleAgentClosing", func(t *testing.T) { | ||
| t.Parallel() | ||
| // This test ensures that the agent containers `/watch` websocket can gracefully | ||
Uh oh!
There was an error while loading.Please reload this page.