- Notifications
You must be signed in to change notification settings - Fork1.1k
feat: reinitialize agents when a prebuilt workspace is claimed#17475
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes from1 commit
c09c9b9476fe718c8bca67ce4eea52ac64e362db7cdcc7379ff66b3fefff5d9cebd5db26791389feebefb117b5ca22b4149bbd2c758042017e8dcee725f97ba9b156721ee970e54d7e727998581d93003763fc120f879c761784c9604eb27bf4d2cf38b4f0d20df5384bb3b6883972db146b1585eb16cd730d803150adc0b4ecf103fa3edf7e45919a63250872125ecb65eea7e1339f3c1a8ba65363dcc7ad9b6d394571d890747bb3870dbFile filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
Minor code quality enhancements
- Loading branch information
Uh oh!
There was an error while loading.Please reload this page.
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -25,7 +25,6 @@ func TestPubsubWorkspaceClaimPublisher(t *testing.T) { | ||
| logger := testutil.Logger(t) | ||
| ps := pubsub.NewInMemory() | ||
| workspaceID := uuid.New() | ||
| reinitEvents := make(chan agentsdk.ReinitializationEvent, 1) | ||
| publisher := prebuilds.NewPubsubWorkspaceClaimPublisher(ps) | ||
| listener := prebuilds.NewPubsubWorkspaceClaimListener(ps, logger) | ||
| @@ -35,15 +34,15 @@ func TestPubsubWorkspaceClaimPublisher(t *testing.T) { | ||
| defer cancel() | ||
| claim := agentsdk.ReinitializationEvent{ | ||
| WorkspaceID: workspaceID, | ||
| Reason: agentsdk.ReinitializeReasonPrebuildClaimed, | ||
| } | ||
| err = publisher.PublishWorkspaceClaim(claim) | ||
| require.NoError(t, err) | ||
| gotEvent := testutil.RequireReceive(ctx, t, reinitEvents) | ||
| require.Equal(t, workspaceID, gotEvent.WorkspaceID) | ||
| require.Equal(t, claim.Reason, gotEvent.Reason) | ||
| }) | ||
| t.Run("fail to publish claim", func(t *testing.T) { | ||
| @@ -53,7 +52,6 @@ func TestPubsubWorkspaceClaimPublisher(t *testing.T) { | ||
| publisher := prebuilds.NewPubsubWorkspaceClaimPublisher(ps) | ||
| claim := agentsdk.ReinitializationEvent{ | ||
| WorkspaceID: uuid.New(), | ||
| Reason: agentsdk.ReinitializeReasonPrebuildClaimed, | ||
| } | ||
| @@ -74,26 +72,21 @@ func TestPubsubWorkspaceClaimListener(t *testing.T) { | ||
| claims := make(chan agentsdk.ReinitializationEvent, 1) // Buffer to avoid messing with goroutines in the rest of the test | ||
| workspaceID := uuid.New() | ||
| cancelFunc, err := listener.ListenForWorkspaceClaims(context.Background(), workspaceID, claims) | ||
| require.NoError(t, err) | ||
| defer cancelFunc() | ||
| // Publish a claim | ||
| channel := agentsdk.PrebuildClaimedChannel(workspaceID) | ||
| reason := agentsdk.ReinitializeReasonPrebuildClaimed | ||
| err = ps.Publish(channel, []byte(reason)) | ||
| require.NoError(t, err) | ||
| // Verify we receive the claim | ||
| ctx := testutil.Context(t, testutil.WaitShort) | ||
| claim := testutil.RequireReceive(ctx, t, claims) | ||
| require.Equal(t, workspaceID, claim.WorkspaceID) | ||
| require.Equal(t, reason, claim.Reason) | ||
| }) | ||
| t.Run("ignores claim events for other workspaces", func(t *testing.T) { | ||
SasSwart marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| @@ -111,7 +104,7 @@ func TestPubsubWorkspaceClaimListener(t *testing.T) { | ||
| // Publish a claim for a different workspace | ||
| channel := agentsdk.PrebuildClaimedChannel(otherWorkspaceID) | ||
| err = ps.Publish(channel, []byte(agentsdk.ReinitializeReasonPrebuildClaimed)) | ||
| require.NoError(t, err) | ||
| // Verify we don't receive the claim | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -2647,62 +2647,60 @@ func TestAgentConnectionInfo(t *testing.T) { | ||
| func TestReinit(t *testing.T) { | ||
| t.Parallel() | ||
| db, ps := dbtestutil.NewDB(t) | ||
| pubsubSpy := pubsubReinitSpy{ | ||
| Pubsub: ps, | ||
| subscribed: make(chan string), | ||
| } | ||
| client := coderdtest.New(t, &coderdtest.Options{ | ||
| Database: db, | ||
| Pubsub: &pubsubSpy, | ||
| }) | ||
| user := coderdtest.CreateFirstUser(t, client) | ||
| r := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{ | ||
| OrganizationID: user.OrganizationID, | ||
| OwnerID: user.UserID, | ||
| }).WithAgent().Do() | ||
| pubsubSpy.expectedEvent = agentsdk.PrebuildClaimedChannel(r.Workspace.ID) | ||
SasSwart marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| agentCtx := testutil.Context(t, testutil.WaitShort) | ||
| agentClient := agentsdk.New(client.URL) | ||
| agentClient.SetSessionToken(r.AgentToken) | ||
| agentReinitializedCh := make(chan *agentsdk.ReinitializationEvent) | ||
| go func() { | ||
| reinitEvent, err := agentClient.WaitForReinit(agentCtx) | ||
| assert.NoError(t, err) | ||
| agentReinitializedCh <- reinitEvent | ||
| }() | ||
| // We need to subscribe before we publish, lest we miss the event | ||
| ctx := testutil.Context(t, testutil.WaitShort) | ||
| testutil.TryReceive(ctx, t, pubsubSpy.subscribed) // Wait for the appropriate subscription | ||
| // Now that we're subscribed, publish the event | ||
| err :=prebuilds.NewPubsubWorkspaceClaimPublisher(ps).PublishWorkspaceClaim(agentsdk.ReinitializationEvent{ | ||
| WorkspaceID: r.Workspace.ID, | ||
| Reason: agentsdk.ReinitializeReasonPrebuildClaimed, | ||
| }) | ||
| require.NoError(t, err) | ||
| ctx = testutil.Context(t, testutil.WaitShort) | ||
| reinitEvent := testutil.TryReceive(ctx, t, agentReinitializedCh) | ||
| require.NotNil(t, reinitEvent) | ||
| require.Equal(t, r.Workspace.ID, reinitEvent.WorkspaceID) | ||
| } | ||
| type pubsubReinitSpy struct { | ||
| pubsub.Pubsub | ||
| subscribed chan string | ||
| expectedEvent string | ||
| } | ||
| func (p *pubsubReinitSpy) Subscribe(event string, listener pubsub.Listener) (cancel func(), err error) { | ||
| if p.expectedEvent != "" && event == p.expectedEvent { | ||
| close(p.subscribed) | ||
| } | ||
| return p.Pubsub.Subscribe(event, listener) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -698,7 +698,6 @@ const ( | ||
| type ReinitializationEvent struct { | ||
| WorkspaceID uuid.UUID | ||
| Reason ReinitializationReason `json:"reason"` | ||
| } | ||
| @@ -806,8 +805,9 @@ func (s *SSEAgentReinitTransmitter) Transmit(ctx context.Context, reinitEvents < | ||
| return xerrors.Errorf("failed to create sse transmitter: %w", err) | ||
| } | ||
| defer func() { | ||
| // Block returning until the ServerSentEventSender is closed | ||
| // to avoid a race condition where we might write or flush to rw after the handler returns. | ||
| <-sseSenderClosed | ||
| }() | ||
SasSwart marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.