- Notifications
You must be signed in to change notification settings - Fork929
feat: reinitialize agents when a prebuilt workspace is claimed#17475
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes from1 commit
c09c9b9
476fe71
8c8bca6
7ce4eea
52ac64e
362db7c
dcc7379
ff66b3f
efff5d9
cebd5db
2679138
9feebef
b117b5c
a22b414
9bbd2c7
5804201
7e8dcee
725f97b
a9b1567
21ee970
e54d7e7
2799858
1d93003
763fc12
0f879c7
61784c9
604eb27
bf4d2cf
38b4f0d
20df538
4bb3b68
83972db
146b158
5eb16cd
730d803
150adc0
b4ecf10
3fa3edf
7e45919
a632508
72125ec
b65eea7
e1339f3
c1a8ba6
5363dcc
7ad9b6d
394571d
890747b
b3870db
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
- Loading branch information
Uh oh!
There was an error while loading.Please reload this page.
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -2,23 +2,16 @@ package prebuilds | ||
import ( | ||
"context" | ||
"sync" | ||
"github.com/google/uuid" | ||
"golang.org/x/xerrors" | ||
"cdr.dev/slog" | ||
"github.com/coder/coder/v2/coderd/database/pubsub" | ||
"github.com/coder/coder/v2/codersdk/agentsdk" | ||
) | ||
func NewPubsubWorkspaceClaimPublisher(ps pubsub.Pubsub) *PubsubWorkspaceClaimPublisher { | ||
return &PubsubWorkspaceClaimPublisher{ps: ps} | ||
} | ||
@@ -35,10 +28,6 @@ func (p PubsubWorkspaceClaimPublisher) PublishWorkspaceClaim(claim agentsdk.Rein | ||
return nil | ||
} | ||
func NewPubsubWorkspaceClaimListener(ps pubsub.Pubsub, logger slog.Logger) *PubsubWorkspaceClaimListener { | ||
return &PubsubWorkspaceClaimListener{ps: ps, logger: logger} | ||
} | ||
@@ -49,6 +38,12 @@ type PubsubWorkspaceClaimListener struct { | ||
} | ||
func (p PubsubWorkspaceClaimListener) ListenForWorkspaceClaims(ctx context.Context, workspaceID uuid.UUID) (func(), <-chan agentsdk.ReinitializationEvent, error) { | ||
select { | ||
case <-ctx.Done(): | ||
return func() {}, nil, ctx.Err() | ||
default: | ||
} | ||
workspaceClaims := make(chan agentsdk.ReinitializationEvent, 1) | ||
SasSwart marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
cancelSub, err := p.ps.Subscribe(agentsdk.PrebuildClaimedChannel(workspaceID), func(inner context.Context, id []byte) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. As we talked about on a call --- the pubsub is not considered reliable transport, and we can miss events. This can lead to a situation where the agent misses the reinit signal and never reinitializes, even though it has been claimed. The deep problem here is that we are using the PubSub to send material information (who the new owner is), rather than just a kick that there is new information available (the workspace has a new owner). In the latter case, when there is an error, we can recover by re-querying the database to find the owner, and then decide whether we need to signal the agent with new information. This requires the handler to keep track of the last owner it sent, but that's a trivial amount of memory to keep. I don't necessarily think this needs to be fixed in this PR, since the plan is to move to a new "stream of manifest" architecture, but as we implement that, we need to keep error handling in mind on both sides: coderd recovers from a pubsub error by querying the database (or closing the connection if the database query fails), and then deciding whether there is something new to send. The agent recovers from a dropped connection by redialing, and then checking the new manifest against it's existing one to see if it needs to take any action. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Thanks for identifying this. Let's defer it beyond this PR to the next release if there are no objections. I'd like to get to this as part of the manifest streaming work. | ||
claimantID, err := uuid.ParseBytes(id) | ||
@@ -91,52 +86,3 @@ func (p PubsubWorkspaceClaimListener) ListenForWorkspaceClaims(ctx context.Conte | ||
return cancel, workspaceClaims, nil | ||
} | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -44,6 +44,7 @@ import ( | ||
"github.com/coder/coder/v2/coderd/tracing" | ||
"github.com/coder/coder/v2/coderd/wspubsub" | ||
"github.com/coder/coder/v2/codersdk" | ||
"github.com/coder/coder/v2/codersdk/agentsdk" | ||
"github.com/coder/coder/v2/codersdk/drpc" | ||
"github.com/coder/coder/v2/provisioner" | ||
"github.com/coder/coder/v2/provisionerd/proto" | ||
@@ -1749,7 +1750,12 @@ func (s *server) CompleteJob(ctx context.Context, completed *proto.CompletedJob) | ||
slog.F("user", input.PrebuildClaimedByUser.String()), | ||
slog.F("workspace_id", workspace.ID)) | ||
err = prebuilds.NewPubsubWorkspaceClaimPublisher(s.Pubsub).PublishWorkspaceClaim(agentsdk.ReinitializationEvent{ | ||
UserID: input.PrebuildClaimedByUser, | ||
WorkspaceID: workspace.ID, | ||
Reason: agentsdk.ReinitializeReasonPrebuildClaimed, | ||
SasSwart marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
}) | ||
if err != nil { | ||
s.Logger.Error(ctx, "failed to publish workspace claim event", slog.Error(err)) | ||
} | ||
} | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1181,14 +1181,24 @@ func (api *API) workspaceAgentReinit(rw http.ResponseWriter, r *http.Request) { | ||
log.Info(ctx, "agent waiting for reinit instruction") | ||
cancel, reinitEvents, err := prebuilds.NewPubsubWorkspaceClaimListener(api.Pubsub, log).ListenForWorkspaceClaims(ctx, workspace.ID) | ||
if err != nil { | ||
log.Error(ctx, "failed to subscribe to prebuild claimed channel", slog.Error(err)) | ||
httpapi.InternalServerError(rw, xerrors.New("failed to subscribe to prebuild claimed channel")) | ||
return | ||
} | ||
defer cancel() | ||
transmitter := agentsdk.NewSSEAgentReinitTransmitter(log, rw, r) | ||
err = transmitter.Transmit(ctx, reinitEvents) | ||
SasSwart marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
if err != nil { | ||
log.Error(ctx, "failed to stream agent reinit events", slog.Error(err)) | ||
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ | ||
Message: "Internal error streaming agent reinitialization events.", | ||
Detail: err.Error(), | ||
}) | ||
} | ||
} | ||
// convertProvisionedApps converts applications that are in the middle of provisioning process. | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -24,6 +24,7 @@ import ( | ||
"github.com/coder/coder/v2/agent/proto" | ||
"github.com/coder/coder/v2/apiversion" | ||
"github.com/coder/coder/v2/coderd/httpapi" | ||
"github.com/coder/coder/v2/codersdk" | ||
drpcsdk "github.com/coder/coder/v2/codersdk/drpc" | ||
tailnetproto "github.com/coder/coder/v2/tailnet/proto" | ||
@@ -730,8 +731,86 @@ func (c *Client) WaitForReinit(ctx context.Context) (*ReinitializationEvent, err | ||
return nil, codersdk.ReadBodyAsError(res) | ||
} | ||
reinitEvent, err := NewSSEAgentReinitReceiver(res.Body).Receive(ctx) | ||
if err != nil { | ||
return nil, xerrors.Errorf("listening for reinitialization events: %w", err) | ||
} | ||
return reinitEvent, nil | ||
} | ||
func WaitForReinitLoop(ctx context.Context, logger slog.Logger, client *Client) <-chan ReinitializationEvent { | ||
reinitEvents := make(chan ReinitializationEvent) | ||
go func() { | ||
for retrier := retry.New(100*time.Millisecond, 10*time.Second); retrier.Wait(ctx); { | ||
logger.Debug(ctx, "waiting for agent reinitialization instructions") | ||
reinitEvent, err := client.WaitForReinit(ctx) | ||
SasSwart marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
if err != nil { | ||
logger.Error(ctx, "failed to wait for agent reinitialization instructions", slog.Error(err)) | ||
continue | ||
} | ||
SasSwart marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
select { | ||
case <-ctx.Done(): | ||
close(reinitEvents) | ||
return | ||
case reinitEvents <- *reinitEvent: | ||
} | ||
} | ||
}() | ||
return reinitEvents | ||
} | ||
func NewSSEAgentReinitTransmitter(logger slog.Logger, rw http.ResponseWriter, r *http.Request) *SSEAgentReinitTransmitter { | ||
return &SSEAgentReinitTransmitter{logger: logger, rw: rw, r: r} | ||
} | ||
type SSEAgentReinitTransmitter struct { | ||
rw http.ResponseWriter | ||
r *http.Request | ||
logger slog.Logger | ||
} | ||
func (s *SSEAgentReinitTransmitter) Transmit(ctx context.Context, reinitEvents <-chan ReinitializationEvent) error { | ||
select { | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
default: | ||
} | ||
sseSendEvent, sseSenderClosed, err := httpapi.ServerSentEventSender(s.rw, s.r) | ||
if err != nil { | ||
return xerrors.Errorf("failed to create sse transmitter: %w", err) | ||
} | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
case <-sseSenderClosed: | ||
return xerrors.New("sse connection closed") | ||
SasSwart marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
case reinitEvent := <-reinitEvents: | ||
err := sseSendEvent(codersdk.ServerSentEvent{ | ||
Type: codersdk.ServerSentEventTypeData, | ||
Data: reinitEvent, | ||
}) | ||
if err != nil { | ||
s.logger.Warn(ctx, "failed to send SSE response to trigger reinit", slog.Error(err)) | ||
} | ||
} | ||
} | ||
} | ||
func NewSSEAgentReinitReceiver(r io.ReadCloser) *SSEAgentReinitReceiver { | ||
return &SSEAgentReinitReceiver{r: r} | ||
} | ||
type SSEAgentReinitReceiver struct { | ||
r io.ReadCloser | ||
} | ||
func (s *SSEAgentReinitReceiver) Receive(ctx context.Context) (*ReinitializationEvent, error) { | ||
nextEvent := codersdk.ServerSentEventReader(ctx, s.r) | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
@@ -763,26 +842,3 @@ func (c *Client) WaitForReinit(ctx context.Context) (*ReinitializationEvent, err | ||
} | ||
} | ||
} | ||
Uh oh!
There was an error while loading.Please reload this page.