- Notifications
You must be signed in to change notification settings - Fork1k
fix: ensure wsproxyMultiAgent
is closed when websocket dies#11414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Changes fromall commits
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -4,6 +4,7 @@ import ( | ||
"context" | ||
"time" | ||
"cdr.dev/slog" | ||
"nhooyr.io/websocket" | ||
) | ||
@@ -26,10 +27,10 @@ func Heartbeat(ctx context.Context, conn *websocket.Conn) { | ||
} | ||
} | ||
// Heartbeat loops to ping a WebSocket to keep it alive. Itcalls `exit` on ping | ||
// failure. | ||
func HeartbeatClose(ctx context.Context,logger slog.Logger,exit func(), conn *websocket.Conn) { | ||
ticker := time.NewTicker(15 * time.Second) | ||
defer ticker.Stop() | ||
for { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. | ||
@@ -41,6 +42,7 @@ func HeartbeatClose(ctx context.Context, exit func(), conn *websocket.Conn) { | ||
err := conn.Ping(ctx) | ||
if err != nil { | ||
_ = conn.Close(websocket.StatusGoingAway, "Ping failed") | ||
logger.Info(ctx, "failed to heartbeat ping", slog.Error(err)) | ||
exit() | ||
return | ||
} | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -431,6 +431,7 @@ type CoordinateNodes struct { | ||
func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, error) { | ||
ctx, cancel := context.WithCancel(ctx) | ||
logger := c.SDKClient.Logger().Named("multiagent") | ||
coordinateURL, err := c.SDKClient.URL.Parse("/api/v2/workspaceproxies/me/coordinate") | ||
if err != nil { | ||
@@ -454,12 +455,13 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro | ||
return nil, xerrors.Errorf("dial coordinate websocket: %w", err) | ||
} | ||
go httpapi.HeartbeatClose(ctx,logger,cancel, conn) | ||
nc := websocket.NetConn(ctx, conn, websocket.MessageText) | ||
rma := remoteMultiAgentHandler{ | ||
sdk: c, | ||
nc: nc, | ||
cancel: cancel, | ||
legacyAgentCache: map[uuid.UUID]bool{}, | ||
} | ||
@@ -472,6 +474,11 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro | ||
OnRemove: func(agpl.Queue) { conn.Close(websocket.StatusGoingAway, "closed") }, | ||
}).Init() | ||
go func() { | ||
<-ctx.Done() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. If I'm understanding this correctly, we're depending on the fact that the reader goroutine below cancels the context on a failed read. I think we shouldalso tear down the multi-agent on a failed write of subscription messages. It's unlikely that we'd have a failure that leaves the connection half-open (e.g. for reads but not writes), but such things are possible and you don't want the proxy limping on unable to subscribe to new agents. | ||
ma.Close() | ||
}() | ||
go func() { | ||
defer cancel() | ||
dec := json.NewDecoder(nc) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. [Re: lines 488 to 488] I think it's worth dropping an INFO log here. See this comment inline onGraphite. | ||
@@ -480,16 +487,17 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro | ||
err := dec.Decode(&msg) | ||
if err != nil { | ||
if xerrors.Is(err, io.EOF) { | ||
logger.Info(ctx, "websocket connection severed", slog.Error(err)) | ||
return | ||
} | ||
logger.Error(ctx, "decode coordinator nodes", slog.Error(err)) | ||
return | ||
} | ||
err = ma.Enqueue(msg.Nodes) | ||
if err != nil { | ||
logger.Error(ctx, "enqueue nodes from coordinator", slog.Error(err)) | ||
continue | ||
} | ||
} | ||
@@ -499,8 +507,9 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro | ||
} | ||
type remoteMultiAgentHandler struct { | ||
sdk *Client | ||
nc net.Conn | ||
cancel func() | ||
legacyMu sync.RWMutex | ||
legacyAgentCache map[uuid.UUID]bool | ||
@@ -517,10 +526,12 @@ func (a *remoteMultiAgentHandler) writeJSON(v interface{}) error { | ||
// Node updates are tiny, so even the dinkiest connection can handle them if it's not hung. | ||
err = a.nc.SetWriteDeadline(time.Now().Add(agpl.WriteTimeout)) | ||
if err != nil { | ||
a.cancel() | ||
return xerrors.Errorf("set write deadline: %w", err) | ||
} | ||
_, err = a.nc.Write(data) | ||
if err != nil { | ||
a.cancel() | ||
return xerrors.Errorf("write message: %w", err) | ||
} | ||
@@ -531,6 +542,7 @@ func (a *remoteMultiAgentHandler) writeJSON(v interface{}) error { | ||
// our successful write, it is important that we reset the deadline before it fires. | ||
err = a.nc.SetWriteDeadline(time.Time{}) | ||
if err != nil { | ||
a.cancel() | ||
return xerrors.Errorf("clear write deadline: %w", err) | ||
} | ||
@@ -573,7 +585,7 @@ func (a *remoteMultiAgentHandler) AgentIsLegacy(agentID uuid.UUID) bool { | ||
return a.sdk.AgentIsLegacy(ctx, agentID) | ||
}) | ||
if err != nil { | ||
a.sdk.SDKClient.Logger().Error(ctx, "failed to check agent legacy status", slog.F("agent_id", agentID), slog.Error(err)) | ||
// Assume that the agent is legacy since this failed, while less | ||
// efficient it will always work. | ||