Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd3c1dd5

Browse files
fix(coderd): ensure agent WebSocket conn is cleaned up
1 parent8f72538 commitd3c1dd5

File tree

2 files changed

+144
-6
lines changed

2 files changed

+144
-6
lines changed

‎coderd/workspaceagents.go‎

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -815,10 +815,13 @@ func (api *API) workspaceAgentListeningPorts(rw http.ResponseWriter, r *http.Req
815815
// @Router /workspaceagents/{workspaceagent}/containers/watch [get]
816816
func (api*API)watchWorkspaceAgentContainers(rw http.ResponseWriter,r*http.Request) {
817817
var (
818-
ctx=r.Context()
819818
workspaceAgent=httpmw.WorkspaceAgentParam(r)
819+
logger=api.Logger.Named("agent_container_watcher").With(slog.F("agent_id",workspaceAgent.ID))
820820
)
821821

822+
ctx,cancelCtx:=context.WithCancel(r.Context())
823+
defercancelCtx()
824+
822825
// If the agent is unreachable, the request will hang. Assume that if we
823826
// don't get a response after 30s that the agent is unreachable.
824827
dialCtx,cancel:=context.WithTimeout(ctx,30*time.Second)
@@ -857,8 +860,7 @@ func (api *API) watchWorkspaceAgentContainers(rw http.ResponseWriter, r *http.Re
857860
}
858861
deferrelease()
859862

860-
watcherLogger:=api.Logger.Named("agent_container_watcher").With(slog.F("agent_id",workspaceAgent.ID))
861-
containersCh,closer,err:=agentConn.WatchContainers(ctx,watcherLogger)
863+
containersCh,closer,err:=agentConn.WatchContainers(ctx,logger)
862864
iferr!=nil {
863865
httpapi.Write(ctx,rw,http.StatusInternalServerError, codersdk.Response{
864866
Message:"Internal error watching agent's containers.",
@@ -881,11 +883,11 @@ func (api *API) watchWorkspaceAgentContainers(rw http.ResponseWriter, r *http.Re
881883
// close frames.
882884
_=conn.CloseRead(context.Background())
883885

886+
gohttpapi.HeartbeatClose(ctx,logger,cancelCtx,conn)
887+
884888
ctx,wsNetConn:=codersdk.WebsocketNetConn(ctx,conn,websocket.MessageText)
885889
deferwsNetConn.Close()
886890

887-
gohttpapi.Heartbeat(ctx,conn)
888-
889891
encoder:=json.NewEncoder(wsNetConn)
890892

891893
for {

‎coderd/workspaceagents_internal_test.go‎

Lines changed: 137 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,146 @@ func (fakeAgentProvider) Close() error {
5959
returnnil
6060
}
6161

62+
typechannelCloserstruct {
63+
closeFnfunc()
64+
}
65+
66+
func (c*channelCloser)Close()error {
67+
c.closeFn()
68+
returnnil
69+
}
70+
6271
funcTestWatchAgentContainers(t*testing.T) {
6372
t.Parallel()
6473

65-
t.Run("WebSocketClosesProperly",func(t*testing.T) {
74+
t.Run("CoderdWebSocketCanHandleClientClosing",func(t*testing.T) {
75+
t.Parallel()
76+
77+
// This test ensures that the agent containers `/watch` websocket can gracefully
78+
// handle the client websocket closing. This test was created in
79+
// response to this issue: https://github.com/coder/coder/issues/19449
80+
81+
var (
82+
ctx=testutil.Context(t,testutil.WaitLong)
83+
logger=slogtest.Make(t,&slogtest.Options{IgnoreErrors:true}).Leveled(slog.LevelDebug).Named("coderd")
84+
85+
mCtrl=gomock.NewController(t)
86+
mDB=dbmock.NewMockStore(mCtrl)
87+
mCoordinator=tailnettest.NewMockCoordinator(mCtrl)
88+
mAgentConn=agentconnmock.NewMockAgentConn(mCtrl)
89+
90+
fAgentProvider=fakeAgentProvider{
91+
agentConn:func(ctx context.Context,agentID uuid.UUID) (_ workspacesdk.AgentConn,releasefunc(),_error) {
92+
returnmAgentConn,func() {},nil
93+
},
94+
}
95+
96+
workspaceID=uuid.New()
97+
agentID=uuid.New()
98+
resourceID=uuid.New()
99+
jobID=uuid.New()
100+
buildID=uuid.New()
101+
102+
containersCh=make(chan codersdk.WorkspaceAgentListContainersResponse)
103+
104+
r=chi.NewMux()
105+
106+
api=API{
107+
ctx:ctx,
108+
Options:&Options{
109+
AgentInactiveDisconnectTimeout:testutil.WaitShort,
110+
Database:mDB,
111+
Logger:logger,
112+
DeploymentValues:&codersdk.DeploymentValues{},
113+
TailnetCoordinator:tailnettest.NewFakeCoordinator(),
114+
},
115+
}
116+
)
117+
118+
vartailnetCoordinator tailnet.Coordinator=mCoordinator
119+
api.TailnetCoordinator.Store(&tailnetCoordinator)
120+
api.agentProvider=fAgentProvider
121+
122+
// Setup: Allow `ExtractWorkspaceAgentParams` to complete.
123+
mDB.EXPECT().GetWorkspaceAgentByID(gomock.Any(),agentID).Return(database.WorkspaceAgent{
124+
ID:agentID,
125+
ResourceID:resourceID,
126+
LifecycleState:database.WorkspaceAgentLifecycleStateReady,
127+
FirstConnectedAt: sql.NullTime{Valid:true,Time:dbtime.Now()},
128+
LastConnectedAt: sql.NullTime{Valid:true,Time:dbtime.Now()},
129+
},nil)
130+
mDB.EXPECT().GetWorkspaceResourceByID(gomock.Any(),resourceID).Return(database.WorkspaceResource{
131+
ID:resourceID,
132+
JobID:jobID,
133+
},nil)
134+
mDB.EXPECT().GetProvisionerJobByID(gomock.Any(),jobID).Return(database.ProvisionerJob{
135+
ID:jobID,
136+
Type:database.ProvisionerJobTypeWorkspaceBuild,
137+
},nil)
138+
mDB.EXPECT().GetWorkspaceBuildByJobID(gomock.Any(),jobID).Return(database.WorkspaceBuild{
139+
WorkspaceID:workspaceID,
140+
ID:buildID,
141+
},nil)
142+
143+
// And: Allow `db2dsk.WorkspaceAgent` to complete.
144+
mCoordinator.EXPECT().Node(gomock.Any()).Return(nil)
145+
146+
// And: Allow `WatchContainers` to be called, returing our `containersCh` channel.
147+
mAgentConn.EXPECT().WatchContainers(gomock.Any(),gomock.Any()).
148+
DoAndReturn(func(_ context.Context,_ slog.Logger) (<-chan codersdk.WorkspaceAgentListContainersResponse, io.Closer,error) {
149+
returncontainersCh,&channelCloser{closeFn:func() {
150+
t.Logf("WE HAVE BEEN CLOSED")
151+
close(containersCh)
152+
}},nil
153+
})
154+
155+
// And: We mount the HTTP Handler
156+
r.With(httpmw.ExtractWorkspaceAgentParam(mDB)).
157+
Get("/workspaceagents/{workspaceagent}/containers/watch",api.watchWorkspaceAgentContainers)
158+
159+
// Given: We create the HTTP server
160+
srv:=httptest.NewServer(r)
161+
defersrv.Close()
162+
163+
// And: Dial the WebSocket
164+
wsURL:=strings.Replace(srv.URL,"http://","ws://",1)
165+
conn,resp,err:=websocket.Dial(ctx,fmt.Sprintf("%s/workspaceagents/%s/containers/watch",wsURL,agentID),nil)
166+
require.NoError(t,err)
167+
ifresp.Body!=nil {
168+
deferresp.Body.Close()
169+
}
170+
171+
// And: Create a streaming decoder
172+
decoder:=wsjson.NewDecoder[codersdk.WorkspaceAgentListContainersResponse](conn,websocket.MessageText,logger)
173+
deferdecoder.Close()
174+
decodeCh:=decoder.Chan()
175+
176+
// And: We can successfully send through the channel.
177+
testutil.RequireSend(ctx,t,containersCh, codersdk.WorkspaceAgentListContainersResponse{
178+
Containers: []codersdk.WorkspaceAgentContainer{{
179+
ID:"test-container-id",
180+
}},
181+
})
182+
183+
// And: Receive the data.
184+
containerResp:=testutil.RequireReceive(ctx,t,decodeCh)
185+
require.Len(t,containerResp.Containers,1)
186+
require.Equal(t,"test-container-id",containerResp.Containers[0].ID)
187+
188+
// When: We close the WebSocket
189+
conn.Close(websocket.StatusNormalClosure,"test closing connection")
190+
191+
// Then: We expect `containersCh` to be closed.
192+
select {
193+
case<-ctx.Done():
194+
t.Fail()
195+
196+
case_,ok:=<-containersCh:
197+
require.False(t,ok,"channel is expected to be closed")
198+
}
199+
})
200+
201+
t.Run("CoderdWebSocketCanHandleAgentClosing",func(t*testing.T) {
66202
t.Parallel()
67203

68204
// This test ensures that the agent containers `/watch` websocket can gracefully

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp