Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit35a7440

Browse files
fix(coderd): ensure agent WebSocket conn is cleaned up
1 parent8f72538 commit35a7440

File tree

2 files changed

+143
-6
lines changed

2 files changed

+143
-6
lines changed

‎coderd/workspaceagents.go‎

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -815,10 +815,13 @@ func (api *API) workspaceAgentListeningPorts(rw http.ResponseWriter, r *http.Req
815815
// @Router /workspaceagents/{workspaceagent}/containers/watch [get]
816816
func (api*API)watchWorkspaceAgentContainers(rw http.ResponseWriter,r*http.Request) {
817817
var (
818-
ctx=r.Context()
819818
workspaceAgent=httpmw.WorkspaceAgentParam(r)
819+
logger=api.Logger.Named("agent_container_watcher").With(slog.F("agent_id",workspaceAgent.ID))
820820
)
821821

822+
ctx,cancelCtx:=context.WithCancel(r.Context())
823+
defercancelCtx()
824+
822825
// If the agent is unreachable, the request will hang. Assume that if we
823826
// don't get a response after 30s that the agent is unreachable.
824827
dialCtx,cancel:=context.WithTimeout(ctx,30*time.Second)
@@ -857,8 +860,7 @@ func (api *API) watchWorkspaceAgentContainers(rw http.ResponseWriter, r *http.Re
857860
}
858861
deferrelease()
859862

860-
watcherLogger:=api.Logger.Named("agent_container_watcher").With(slog.F("agent_id",workspaceAgent.ID))
861-
containersCh,closer,err:=agentConn.WatchContainers(ctx,watcherLogger)
863+
containersCh,closer,err:=agentConn.WatchContainers(ctx,logger)
862864
iferr!=nil {
863865
httpapi.Write(ctx,rw,http.StatusInternalServerError, codersdk.Response{
864866
Message:"Internal error watching agent's containers.",
@@ -881,11 +883,11 @@ func (api *API) watchWorkspaceAgentContainers(rw http.ResponseWriter, r *http.Re
881883
// close frames.
882884
_=conn.CloseRead(context.Background())
883885

886+
gohttpapi.HeartbeatClose(ctx,logger,cancelCtx,conn)
887+
884888
ctx,wsNetConn:=codersdk.WebsocketNetConn(ctx,conn,websocket.MessageText)
885889
deferwsNetConn.Close()
886890

887-
gohttpapi.Heartbeat(ctx,conn)
888-
889891
encoder:=json.NewEncoder(wsNetConn)
890892

891893
for {

‎coderd/workspaceagents_internal_test.go‎

Lines changed: 136 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,145 @@ func (fakeAgentProvider) Close() error {
5959
returnnil
6060
}
6161

62+
typechannelCloserstruct {
63+
closeFnfunc()
64+
}
65+
66+
func (c*channelCloser)Close()error {
67+
c.closeFn()
68+
returnnil
69+
}
70+
6271
funcTestWatchAgentContainers(t*testing.T) {
6372
t.Parallel()
6473

65-
t.Run("WebSocketClosesProperly",func(t*testing.T) {
74+
t.Run("CoderdWebSocketCanHandleClientClosing",func(t*testing.T) {
75+
t.Parallel()
76+
77+
// This test ensures that the agent containers `/watch` websocket can gracefully
78+
// handle the client websocket closing. This test was created in
79+
// response to this issue: https://github.com/coder/coder/issues/19449
80+
81+
var (
82+
ctx=testutil.Context(t,testutil.WaitLong)
83+
logger=slogtest.Make(t,&slogtest.Options{IgnoreErrors:true}).Leveled(slog.LevelDebug).Named("coderd")
84+
85+
mCtrl=gomock.NewController(t)
86+
mDB=dbmock.NewMockStore(mCtrl)
87+
mCoordinator=tailnettest.NewMockCoordinator(mCtrl)
88+
mAgentConn=agentconnmock.NewMockAgentConn(mCtrl)
89+
90+
fAgentProvider=fakeAgentProvider{
91+
agentConn:func(ctx context.Context,agentID uuid.UUID) (_ workspacesdk.AgentConn,releasefunc(),_error) {
92+
returnmAgentConn,func() {},nil
93+
},
94+
}
95+
96+
workspaceID=uuid.New()
97+
agentID=uuid.New()
98+
resourceID=uuid.New()
99+
jobID=uuid.New()
100+
buildID=uuid.New()
101+
102+
containersCh=make(chan codersdk.WorkspaceAgentListContainersResponse)
103+
104+
r=chi.NewMux()
105+
106+
api=API{
107+
ctx:ctx,
108+
Options:&Options{
109+
AgentInactiveDisconnectTimeout:testutil.WaitShort,
110+
Database:mDB,
111+
Logger:logger,
112+
DeploymentValues:&codersdk.DeploymentValues{},
113+
TailnetCoordinator:tailnettest.NewFakeCoordinator(),
114+
},
115+
}
116+
)
117+
118+
vartailnetCoordinator tailnet.Coordinator=mCoordinator
119+
api.TailnetCoordinator.Store(&tailnetCoordinator)
120+
api.agentProvider=fAgentProvider
121+
122+
// Setup: Allow `ExtractWorkspaceAgentParams` to complete.
123+
mDB.EXPECT().GetWorkspaceAgentByID(gomock.Any(),agentID).Return(database.WorkspaceAgent{
124+
ID:agentID,
125+
ResourceID:resourceID,
126+
LifecycleState:database.WorkspaceAgentLifecycleStateReady,
127+
FirstConnectedAt: sql.NullTime{Valid:true,Time:dbtime.Now()},
128+
LastConnectedAt: sql.NullTime{Valid:true,Time:dbtime.Now()},
129+
},nil)
130+
mDB.EXPECT().GetWorkspaceResourceByID(gomock.Any(),resourceID).Return(database.WorkspaceResource{
131+
ID:resourceID,
132+
JobID:jobID,
133+
},nil)
134+
mDB.EXPECT().GetProvisionerJobByID(gomock.Any(),jobID).Return(database.ProvisionerJob{
135+
ID:jobID,
136+
Type:database.ProvisionerJobTypeWorkspaceBuild,
137+
},nil)
138+
mDB.EXPECT().GetWorkspaceBuildByJobID(gomock.Any(),jobID).Return(database.WorkspaceBuild{
139+
WorkspaceID:workspaceID,
140+
ID:buildID,
141+
},nil)
142+
143+
// And: Allow `db2dsk.WorkspaceAgent` to complete.
144+
mCoordinator.EXPECT().Node(gomock.Any()).Return(nil)
145+
146+
// And: Allow `WatchContainers` to be called, returing our `containersCh` channel.
147+
mAgentConn.EXPECT().WatchContainers(gomock.Any(),gomock.Any()).
148+
DoAndReturn(func(_ context.Context,_ slog.Logger) (<-chan codersdk.WorkspaceAgentListContainersResponse, io.Closer,error) {
149+
returncontainersCh,&channelCloser{closeFn:func() {
150+
close(containersCh)
151+
}},nil
152+
})
153+
154+
// And: We mount the HTTP Handler
155+
r.With(httpmw.ExtractWorkspaceAgentParam(mDB)).
156+
Get("/workspaceagents/{workspaceagent}/containers/watch",api.watchWorkspaceAgentContainers)
157+
158+
// Given: We create the HTTP server
159+
srv:=httptest.NewServer(r)
160+
defersrv.Close()
161+
162+
// And: Dial the WebSocket
163+
wsURL:=strings.Replace(srv.URL,"http://","ws://",1)
164+
conn,resp,err:=websocket.Dial(ctx,fmt.Sprintf("%s/workspaceagents/%s/containers/watch",wsURL,agentID),nil)
165+
require.NoError(t,err)
166+
ifresp.Body!=nil {
167+
deferresp.Body.Close()
168+
}
169+
170+
// And: Create a streaming decoder
171+
decoder:=wsjson.NewDecoder[codersdk.WorkspaceAgentListContainersResponse](conn,websocket.MessageText,logger)
172+
deferdecoder.Close()
173+
decodeCh:=decoder.Chan()
174+
175+
// And: We can successfully send through the channel.
176+
testutil.RequireSend(ctx,t,containersCh, codersdk.WorkspaceAgentListContainersResponse{
177+
Containers: []codersdk.WorkspaceAgentContainer{{
178+
ID:"test-container-id",
179+
}},
180+
})
181+
182+
// And: Receive the data.
183+
containerResp:=testutil.RequireReceive(ctx,t,decodeCh)
184+
require.Len(t,containerResp.Containers,1)
185+
require.Equal(t,"test-container-id",containerResp.Containers[0].ID)
186+
187+
// When: We close the WebSocket
188+
conn.Close(websocket.StatusNormalClosure,"test closing connection")
189+
190+
// Then: We expect `containersCh` to be closed.
191+
select {
192+
case<-ctx.Done():
193+
t.Fail()
194+
195+
case_,ok:=<-containersCh:
196+
require.False(t,ok,"channel is expected to be closed")
197+
}
198+
})
199+
200+
t.Run("CoderdWebSocketCanHandleAgentClosing",func(t*testing.T) {
66201
t.Parallel()
67202

68203
// This test ensures that the agent containers `/watch` websocket can gracefully

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp