- Notifications
You must be signed in to change notification settings - Fork1.1k
fix: send prebuild job notification after job build db commit#20693
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
base:main
Are you sure you want to change the base?
Changes fromall commits
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -697,7 +697,8 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW | ||||||||||||||||||
| return xerrors.Errorf("failed to generate unique prebuild ID: %w", err) | ||||||||||||||||||
| } | ||||||||||||||||||
| var provisionerJob *database.ProvisionerJob | ||||||||||||||||||
| err = c.store.InTx(func(db database.Store) error { | ||||||||||||||||||
| template, err := db.GetTemplateByID(ctx, templateID) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| return xerrors.Errorf("failed to get template: %w", err) | ||||||||||||||||||
| @@ -732,11 +733,20 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW | ||||||||||||||||||
| c.logger.Info(ctx, "attempting to create prebuild", slog.F("name", name), | ||||||||||||||||||
| slog.F("workspace_id", prebuiltWorkspaceID.String()), slog.F("preset_id", presetID.String())) | ||||||||||||||||||
| provisionerJob, err = c.provision(ctx, db, prebuiltWorkspaceID, template, presetID, database.WorkspaceTransitionStart, workspace, DeprovisionModeNormal) | ||||||||||||||||||
| return err | ||||||||||||||||||
| }, &database.TxOptions{ | ||||||||||||||||||
| Isolation: sql.LevelRepeatableRead, | ||||||||||||||||||
| ReadOnly: false, | ||||||||||||||||||
| }) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| return err | ||||||||||||||||||
| } | ||||||||||||||||||
| // Publish provisioner job event to notify the acquirer that a new job was posted | ||||||||||||||||||
| c.publishProvisionerJob(ctx, provisionerJob, prebuiltWorkspaceID) | ||||||||||||||||||
Comment on lines +746 to +748 Member There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Should we only do this if ContributorAuthor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more.
coder/enterprise/coderd/prebuilds/reconcile.go Lines 930 to 934 inf161c82
Nevertheless, I'm also checking in coder/enterprise/coderd/prebuilds/reconcile.go Lines 947 to 949 inf161c82
| ||||||||||||||||||
| return nil | ||||||||||||||||||
| } | ||||||||||||||||||
| // provisionDelete provisions a delete transition for a prebuilt workspace. | ||||||||||||||||||
| @@ -748,26 +758,25 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW | ||||||||||||||||||
| // | ||||||||||||||||||
| // IMPORTANT: This function must be called within a database transaction. It does not create its own transaction. | ||||||||||||||||||
| // The caller is responsible for managing the transaction boundary via db.InTx(). | ||||||||||||||||||
| func (c *StoreReconciler) provisionDelete(ctx context.Context, db database.Store, workspaceID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID, mode DeprovisionMode)(*database.ProvisionerJob,error) { | ||||||||||||||||||
| workspace, err := db.GetWorkspaceByID(ctx, workspaceID) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| returnnil,xerrors.Errorf("get workspace by ID: %w", err) | ||||||||||||||||||
| } | ||||||||||||||||||
| template, err := db.GetTemplateByID(ctx, templateID) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| returnnil,xerrors.Errorf("failed to get template: %w", err) | ||||||||||||||||||
| } | ||||||||||||||||||
| if workspace.OwnerID != database.PrebuildsSystemUserID { | ||||||||||||||||||
| returnnil,xerrors.Errorf("prebuilt workspace is not owned by prebuild user anymore, probably it was claimed") | ||||||||||||||||||
| } | ||||||||||||||||||
| c.logger.Info(ctx, "attempting to delete prebuild", slog.F("orphan", mode.String()), | ||||||||||||||||||
| slog.F("name", workspace.Name), slog.F("workspace_id", workspaceID.String()), slog.F("preset_id", presetID.String())) | ||||||||||||||||||
| return c.provision(ctx, db, workspaceID, template, presetID, database.WorkspaceTransitionDelete, workspace, mode) | ||||||||||||||||||
| } | ||||||||||||||||||
| // cancelAndOrphanDeletePendingPrebuilds cancels pending prebuild jobs from inactive template versions | ||||||||||||||||||
| @@ -779,7 +788,8 @@ func (c *StoreReconciler) provisionDelete(ctx context.Context, db database.Store | ||||||||||||||||||
| // Since these jobs were never processed by a provisioner, no Terraform resources were created, | ||||||||||||||||||
| // making it safe to orphan-delete the workspaces (skipping Terraform destroy). | ||||||||||||||||||
| func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Context, templateID uuid.UUID, templateVersionID uuid.UUID, presetID uuid.UUID) error { | ||||||||||||||||||
| provisionerJobs := make(map[uuid.UUID]*database.ProvisionerJob) | ||||||||||||||||||
| err := c.store.InTx(func(db database.Store) error { | ||||||||||||||||||
| canceledJobs, err := db.UpdatePrebuildProvisionerJobWithCancel( | ||||||||||||||||||
| ctx, | ||||||||||||||||||
| database.UpdatePrebuildProvisionerJobWithCancelParams{ | ||||||||||||||||||
| @@ -808,11 +818,13 @@ func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Cont | ||||||||||||||||||
| var multiErr multierror.Error | ||||||||||||||||||
| for _, job := range canceledJobs { | ||||||||||||||||||
| provisionerJob,err:= c.provisionDelete(ctx, db, job.WorkspaceID, job.TemplateID, presetID, DeprovisionModeOrphan) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| c.logger.Error(ctx, "failed to orphan delete canceled prebuild", | ||||||||||||||||||
| slog.F("workspace_id", job.WorkspaceID.String()), slog.Error(err)) | ||||||||||||||||||
| multiErr.Errors = append(multiErr.Errors, err) | ||||||||||||||||||
| } else { | ||||||||||||||||||
| provisionerJobs[job.WorkspaceID] = provisionerJob | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
| @@ -821,15 +833,35 @@ func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Cont | ||||||||||||||||||
| Isolation: sql.LevelRepeatableRead, | ||||||||||||||||||
| ReadOnly: false, | ||||||||||||||||||
| }) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| return err | ||||||||||||||||||
| } | ||||||||||||||||||
| // Publish provisioner job events to notify the acquirer that new jobs were posted | ||||||||||||||||||
| for workspaceID, job := range provisionerJobs { | ||||||||||||||||||
| c.publishProvisionerJob(ctx, job, workspaceID) | ||||||||||||||||||
| } | ||||||||||||||||||
Member There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more.
We are now publishing these job events outside of the transaction, so we don't get this automatic de-duping. I'm not sure if this will cause issues with larger deployments? ContributorAuthor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. AFAIK, we were never publishing the events within the transaction. Previously, we would send the job to the Also, there seems to be a rule to avoid calling publish within a transaction:https://github.com/coder/coder/blob/8274251f/scripts/rules.go#L143 Nevertheless, this is a good point about deduping. Because it seems the job is published as (link): Meaning that all of these canceled jobs would have the same payload. So theoretically, we only need to send one job event, right? 🤔 Member There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more.
Fair enough 👍 Just wanted to make sure this was called out.
I completely forgot about that 😁 good callout!
That's how I understand it, yes! EDIT: now that I think about it more, I'm not sure that pubsub events actually impact canceled jobs. | ||||||||||||||||||
| return nil | ||||||||||||||||||
| } | ||||||||||||||||||
| func (c *StoreReconciler) deletePrebuiltWorkspace(ctx context.Context, prebuiltWorkspaceID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID) error { | ||||||||||||||||||
| var provisionerJob *database.ProvisionerJob | ||||||||||||||||||
| err := c.store.InTx(func(db database.Store) (err error) { | ||||||||||||||||||
| provisionerJob, err = c.provisionDelete(ctx, db, prebuiltWorkspaceID, templateID, presetID, DeprovisionModeNormal) | ||||||||||||||||||
| return err | ||||||||||||||||||
| }, &database.TxOptions{ | ||||||||||||||||||
| Isolation: sql.LevelRepeatableRead, | ||||||||||||||||||
| ReadOnly: false, | ||||||||||||||||||
| }) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| return err | ||||||||||||||||||
| } | ||||||||||||||||||
| // Publish provisioner job event to notify the acquirer that a new job was posted | ||||||||||||||||||
| c.publishProvisionerJob(ctx, provisionerJob, prebuiltWorkspaceID) | ||||||||||||||||||
| return nil | ||||||||||||||||||
| } | ||||||||||||||||||
| func (c *StoreReconciler) provision( | ||||||||||||||||||
| @@ -841,10 +873,10 @@ func (c *StoreReconciler) provision( | ||||||||||||||||||
| transition database.WorkspaceTransition, | ||||||||||||||||||
| workspace database.Workspace, | ||||||||||||||||||
| mode DeprovisionMode, | ||||||||||||||||||
| )(*database.ProvisionerJob,error) { | ||||||||||||||||||
| tvp, err := db.GetPresetParametersByTemplateVersionID(ctx, template.ActiveVersionID) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| returnnil,xerrors.Errorf("fetch preset details: %w", err) | ||||||||||||||||||
| } | ||||||||||||||||||
| var params []codersdk.WorkspaceBuildParameter | ||||||||||||||||||
| @@ -893,26 +925,34 @@ func (c *StoreReconciler) provision( | ||||||||||||||||||
| audit.WorkspaceBuildBaggage{}, | ||||||||||||||||||
| ) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| returnnil,xerrors.Errorf("provision workspace: %w", err) | ||||||||||||||||||
| } | ||||||||||||||||||
| if provisionerJob == nil { | ||||||||||||||||||
| // This should not happen, builder.Build() should either return a job or an error. | ||||||||||||||||||
| // Returning an error to fail fast if we hit this unexpected case. | ||||||||||||||||||
| return nil, xerrors.Errorf("provision succeeded but returned no job") | ||||||||||||||||||
| } | ||||||||||||||||||
| c.logger.Info(ctx, "prebuild job scheduled", slog.F("transition", transition), | ||||||||||||||||||
| slog.F("prebuild_id", prebuildID.String()), slog.F("preset_id", presetID.String()), | ||||||||||||||||||
| slog.F("job_id", provisionerJob.ID)) | ||||||||||||||||||
| return provisionerJob, nil | ||||||||||||||||||
| } | ||||||||||||||||||
| // publishProvisionerJob publishes a provisioner job event to notify the acquirer that a new job has been created. | ||||||||||||||||||
| // This must be called after the database transaction that creates the job has committed to ensure | ||||||||||||||||||
| // the job is visible to provisioners when they query the database. | ||||||||||||||||||
| func (c *StoreReconciler) publishProvisionerJob(ctx context.Context, provisionerJob *database.ProvisionerJob, workspaceID uuid.UUID) { | ||||||||||||||||||
| if provisionerJob == nil { | ||||||||||||||||||
| return | ||||||||||||||||||
| } | ||||||||||||||||||
| select { | ||||||||||||||||||
| case c.provisionNotifyCh <- *provisionerJob: | ||||||||||||||||||
| default: // channel full, drop the message; provisioner will pick this job up later with its periodic check | ||||||||||||||||||
| c.logger.Warn(ctx, "provisioner job notification queue full, dropping", | ||||||||||||||||||
| slog.F("job_id", provisionerJob.ID), slog.F("prebuild_id", workspaceID.String())) | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
| // ForceMetricsUpdate forces the metrics collector, if defined, to update its state (we cache the metrics state to | ||||||||||||||||||
Uh oh!
There was an error while loading.Please reload this page.