- Notifications
You must be signed in to change notification settings - Fork925
fix: Avoid use of r.Context() after r.Hijack()#1978
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Merged
Uh oh!
There was an error while loading.Please reload this page.
Merged
Changes fromall commits
Commits
Show all changes
6 commits Select commitHold shift + click to select a range
877959b
fix: Fix goroutine leak by propagating websocket closure
mafredri0a354b6
fix: Use of r.Context() in workspaceAgentDial
mafredrie0fb30c
fix: Use of rw and r.Context() in workspaceAgentListen
mafredrie90a0c0
fix: Use of r.Context() in workspaceAgentPTY
mafredrie0a2f8e
chore: Fix PR comments
mafredri9e6727e
fix: wsNetConn cancel on any error
mafredriFile filter
Filter by extension
Conversations
Failed to load comments.
Loading
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Jump to file
Failed to load files.
Loading
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
104 changes: 74 additions & 30 deletionscoderd/workspaceagents.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package coderd | ||
import ( | ||
"context" | ||
"database/sql" | ||
"encoding/json" | ||
"fmt" | ||
@@ -16,6 +17,7 @@ import ( | ||
"nhooyr.io/websocket" | ||
"cdr.dev/slog" | ||
"github.com/coder/coder/agent" | ||
"github.com/coder/coder/coderd/database" | ||
"github.com/coder/coder/coderd/httpapi" | ||
@@ -69,17 +71,18 @@ func (api *API) workspaceAgentDial(rw http.ResponseWriter, r *http.Request) { | ||
}) | ||
return | ||
} | ||
ctx, wsNetConn := websocketNetConn(r.Context(), conn, websocket.MessageBinary) | ||
defer wsNetConn.Close() // Also closes conn. | ||
config := yamux.DefaultConfig() | ||
config.LogOutput = io.Discard | ||
session, err := yamux.Server(wsNetConn, config) | ||
if err != nil { | ||
_ = conn.Close(websocket.StatusAbnormalClosure, err.Error()) | ||
return | ||
} | ||
err = peerbroker.ProxyListen(ctx, session, peerbroker.ProxyOptions{ | ||
ChannelID: workspaceAgent.ID.String(), | ||
Logger: api.Logger.Named("peerbroker-proxy-dial"), | ||
Pubsub: api.Pubsub, | ||
@@ -193,13 +196,12 @@ func (api *API) workspaceAgentListen(rw http.ResponseWriter, r *http.Request) { | ||
return | ||
} | ||
ctx, wsNetConn := websocketNetConn(r.Context(), conn, websocket.MessageBinary) | ||
defer wsNetConn.Close() // Also closes conn. | ||
config := yamux.DefaultConfig() | ||
config.LogOutput = io.Discard | ||
session, err := yamux.Server(wsNetConn, config) | ||
if err != nil { | ||
_ = conn.Close(websocket.StatusAbnormalClosure, err.Error()) | ||
return | ||
@@ -229,7 +231,7 @@ func (api *API) workspaceAgentListen(rw http.ResponseWriter, r *http.Request) { | ||
} | ||
disconnectedAt := workspaceAgent.DisconnectedAt | ||
updateConnectionTimes := func() error { | ||
err = api.Database.UpdateWorkspaceAgentConnectionByID(ctx, database.UpdateWorkspaceAgentConnectionByIDParams{ | ||
ID: workspaceAgent.ID, | ||
FirstConnectedAt: firstConnectedAt, | ||
LastConnectedAt: lastConnectedAt, | ||
@@ -255,7 +257,7 @@ func (api *API) workspaceAgentListen(rw http.ResponseWriter, r *http.Request) { | ||
return | ||
} | ||
api.Logger.Info(ctx, "accepting agent", slog.F("resource", resource), slog.F("agent", workspaceAgent)) | ||
ticker := time.NewTicker(api.AgentConnectionUpdateFrequency) | ||
defer ticker.Stop() | ||
@@ -324,16 +326,16 @@ func (api *API) workspaceAgentTurn(rw http.ResponseWriter, r *http.Request) { | ||
}) | ||
return | ||
} | ||
ctx, wsNetConn := websocketNetConn(r.Context(), wsConn, websocket.MessageBinary) | ||
defer wsNetConn.Close() // Also closes conn. | ||
api.Logger.Debug(ctx, "accepting turn connection", slog.F("remote-address", r.RemoteAddr), slog.F("local-address", localAddress)) | ||
select { | ||
case <-api.TURNServer.Accept(wsNetConn, remoteAddress, localAddress).Closed(): | ||
case <-ctx.Done(): | ||
} | ||
api.Logger.Debug(ctx, "completed turn connection", slog.F("remote-address", r.RemoteAddr), slog.F("local-address", localAddress)) | ||
} | ||
// workspaceAgentPTY spawns a PTY and pipes it over a WebSocket. | ||
@@ -384,12 +386,11 @@ func (api *API) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) { | ||
}) | ||
return | ||
} | ||
mafredri marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
ctx, wsNetConn := websocketNetConn(r.Context(), conn, websocket.MessageBinary) | ||
defer wsNetConn.Close() // Also closes conn. | ||
agentConn, err := api.dialWorkspaceAgent(ctx, r, workspaceAgent.ID) | ||
if err != nil { | ||
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial workspace agent: %s", err)) | ||
return | ||
@@ -408,11 +409,13 @@ func (api *API) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) { | ||
_, _ = io.Copy(ptNetConn, wsNetConn) | ||
} | ||
// dialWorkspaceAgent connects to a workspace agent by ID. Only rely on | ||
// r.Context() for cancellation if it's use is safe or r.Hijack() has | ||
// not been performed. | ||
func (api *API) dialWorkspaceAgent(ctx context.Context, r *http.Request, agentID uuid.UUID) (*agent.Conn, error) { | ||
client, server := provisionersdk.TransportPipe() | ||
go func() { | ||
_ = peerbroker.ProxyListen(ctx, server, peerbroker.ProxyOptions{ | ||
ChannelID: agentID.String(), | ||
Logger: api.Logger.Named("peerbroker-proxy-dial"), | ||
Pubsub: api.Pubsub, | ||
@@ -422,7 +425,7 @@ func (api *API) dialWorkspaceAgent(r *http.Request, agentID uuid.UUID) (*agent.C | ||
}() | ||
peerClient := proto.NewDRPCPeerBrokerClient(provisionersdk.Conn(client)) | ||
stream, err := peerClient.NegotiateConnection(ctx) | ||
if err != nil { | ||
return nil, xerrors.Errorf("negotiate: %w", err) | ||
} | ||
@@ -434,7 +437,7 @@ func (api *API) dialWorkspaceAgent(r *http.Request, agentID uuid.UUID) (*agent.C | ||
options.SettingEngine.SetICEProxyDialer(turnconn.ProxyDialer(func() (c net.Conn, err error) { | ||
clientPipe, serverPipe := net.Pipe() | ||
go func() { | ||
<-ctx.Done() | ||
_ = clientPipe.Close() | ||
_ = serverPipe.Close() | ||
}() | ||
@@ -515,3 +518,44 @@ func convertWorkspaceAgent(dbAgent database.WorkspaceAgent, agentUpdateFrequency | ||
return workspaceAgent, nil | ||
} | ||
// wsNetConn wraps net.Conn created by websocket.NetConn(). Cancel func | ||
// is called if a read or write error is encountered. | ||
type wsNetConn struct { | ||
cancel context.CancelFunc | ||
net.Conn | ||
} | ||
func (c *wsNetConn) Read(b []byte) (n int, err error) { | ||
n, err = c.Conn.Read(b) | ||
if err != nil { | ||
c.cancel() | ||
} | ||
return n, err | ||
} | ||
func (c *wsNetConn) Write(b []byte) (n int, err error) { | ||
n, err = c.Conn.Write(b) | ||
if err != nil { | ||
c.cancel() | ||
} | ||
return n, err | ||
} | ||
func (c *wsNetConn) Close() error { | ||
defer c.cancel() | ||
return c.Conn.Close() | ||
} | ||
// websocketNetConn wraps websocket.NetConn and returns a context that | ||
// is tied to the parent context and the lifetime of the conn. Any error | ||
// during read or write will cancel the context, but not close the | ||
// conn. Close should be called to release context resources. | ||
func websocketNetConn(ctx context.Context, conn *websocket.Conn, msgType websocket.MessageType) (context.Context, net.Conn) { | ||
ctx, cancel := context.WithCancel(ctx) | ||
nc := websocket.NetConn(ctx, conn, msgType) | ||
return ctx, &wsNetConn{ | ||
cancel: cancel, | ||
Conn: nc, | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.