Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

fix: send prebuild job notification after job build db commit#20693

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Open
ssncferreira wants to merge1 commit intomain
base:main
Choose a base branch
Loading
fromssncferreira/fix-prebuild-job-notification-race
Open
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 66 additions & 26 deletionsenterprise/coderd/prebuilds/reconcile.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -697,7 +697,8 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW
return xerrors.Errorf("failed to generate unique prebuild ID: %w", err)
}

return c.store.InTx(func(db database.Store) error {
var provisionerJob *database.ProvisionerJob
err = c.store.InTx(func(db database.Store) error {
template, err := db.GetTemplateByID(ctx, templateID)
if err != nil {
return xerrors.Errorf("failed to get template: %w", err)
Expand DownExpand Up@@ -732,11 +733,20 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW
c.logger.Info(ctx, "attempting to create prebuild", slog.F("name", name),
slog.F("workspace_id", prebuiltWorkspaceID.String()), slog.F("preset_id", presetID.String()))

return c.provision(ctx, db, prebuiltWorkspaceID, template, presetID, database.WorkspaceTransitionStart, workspace, DeprovisionModeNormal)
provisionerJob, err = c.provision(ctx, db, prebuiltWorkspaceID, template, presetID, database.WorkspaceTransitionStart, workspace, DeprovisionModeNormal)
return err
}, &database.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: false,
})
if err != nil {
return err
}

// Publish provisioner job event to notify the acquirer that a new job was posted
c.publishProvisionerJob(ctx, provisionerJob, prebuiltWorkspaceID)

Comment on lines +746 to +748
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Should we only do this ifprovisionerJob is not nil?

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

provisionerJob should never benil, we already check it inprovision and return an error in that case:

ifprovisionerJob==nil {
// This should not happen, builder.Build() should either return a job or an error.
// Returning an error to fail fast if we hit this unexpected case.
returnnil,xerrors.Errorf("provision succeeded but returned no job")
}

Nevertheless, I'm also checking inpublishProvisionerJob if the job isnil, just in case:

ifprovisionerJob==nil {
return
}

return nil
}

// provisionDelete provisions a delete transition for a prebuilt workspace.
Expand All@@ -748,26 +758,25 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW
//
// IMPORTANT: This function must be called within a database transaction. It does not create its own transaction.
// The caller is responsible for managing the transaction boundary via db.InTx().
func (c *StoreReconciler) provisionDelete(ctx context.Context, db database.Store, workspaceID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID, mode DeprovisionMode) error {
func (c *StoreReconciler) provisionDelete(ctx context.Context, db database.Store, workspaceID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID, mode DeprovisionMode)(*database.ProvisionerJob,error) {
workspace, err := db.GetWorkspaceByID(ctx, workspaceID)
if err != nil {
return xerrors.Errorf("get workspace by ID: %w", err)
returnnil,xerrors.Errorf("get workspace by ID: %w", err)
}

template, err := db.GetTemplateByID(ctx, templateID)
if err != nil {
return xerrors.Errorf("failed to get template: %w", err)
returnnil,xerrors.Errorf("failed to get template: %w", err)
}

if workspace.OwnerID != database.PrebuildsSystemUserID {
return xerrors.Errorf("prebuilt workspace is not owned by prebuild user anymore, probably it was claimed")
returnnil,xerrors.Errorf("prebuilt workspace is not owned by prebuild user anymore, probably it was claimed")
}

c.logger.Info(ctx, "attempting to delete prebuild", slog.F("orphan", mode.String()),
slog.F("name", workspace.Name), slog.F("workspace_id", workspaceID.String()), slog.F("preset_id", presetID.String()))

return c.provision(ctx, db, workspaceID, template, presetID,
database.WorkspaceTransitionDelete, workspace, mode)
return c.provision(ctx, db, workspaceID, template, presetID, database.WorkspaceTransitionDelete, workspace, mode)
}

// cancelAndOrphanDeletePendingPrebuilds cancels pending prebuild jobs from inactive template versions
Expand All@@ -779,7 +788,8 @@ func (c *StoreReconciler) provisionDelete(ctx context.Context, db database.Store
// Since these jobs were never processed by a provisioner, no Terraform resources were created,
// making it safe to orphan-delete the workspaces (skipping Terraform destroy).
func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Context, templateID uuid.UUID, templateVersionID uuid.UUID, presetID uuid.UUID) error {
return c.store.InTx(func(db database.Store) error {
provisionerJobs := make(map[uuid.UUID]*database.ProvisionerJob)
err := c.store.InTx(func(db database.Store) error {
canceledJobs, err := db.UpdatePrebuildProvisionerJobWithCancel(
ctx,
database.UpdatePrebuildProvisionerJobWithCancelParams{
Expand DownExpand Up@@ -808,11 +818,13 @@ func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Cont

var multiErr multierror.Error
for _, job := range canceledJobs {
err = c.provisionDelete(ctx, db, job.WorkspaceID, job.TemplateID, presetID, DeprovisionModeOrphan)
provisionerJob,err:= c.provisionDelete(ctx, db, job.WorkspaceID, job.TemplateID, presetID, DeprovisionModeOrphan)
if err != nil {
c.logger.Error(ctx, "failed to orphan delete canceled prebuild",
slog.F("workspace_id", job.WorkspaceID.String()), slog.Error(err))
multiErr.Errors = append(multiErr.Errors, err)
} else {
provisionerJobs[job.WorkspaceID] = provisionerJob
}
}

Expand All@@ -821,15 +833,35 @@ func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Cont
Isolation: sql.LevelRepeatableRead,
ReadOnly: false,
})
if err != nil {
return err
}

// Publish provisioner job events to notify the acquirer that new jobs were posted
for workspaceID, job := range provisionerJobs {
c.publishProvisionerJob(ctx, job, workspaceID)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

If the same channel name is signaled multiple times with identical payload strings within the same transaction, only one instance of the notification event is delivered to listeners.

Link

We are now publishing these job events outside of the transaction, so we don't get this automatic de-duping. I'm not sure if this will cause issues with larger deployments?

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

AFAIK, we were never publishing the events within the transaction. Previously, we would send the job to theprovisionNotifyCh channel, which would then publish the job outside the transactionhttps://github.com/coder/coder/blob/main/enterprise/coderd/prebuilds/reconcile.go#L164
Therefore, this deduplication was not happening in the previous implementation as well. The only difference here is that we guarantee that we only publish the job after the transaction commits, ensuring provisioners can see the job when they query the database.

Also, there seems to be a rule to avoid calling publish within a transaction:https://github.com/coder/coder/blob/8274251f/scripts/rules.go#L143

Nevertheless, this is a good point about deduping. Because it seems the job is published as (link):

msg, err := json.Marshal(JobPosting{    OrganizationID:  job.OrganizationID,    ProvisionerType: job.Provisioner,    Tags:            job.Tags,})

Meaning that all of these canceled jobs would have the same payload. So theoretically, we only need to send one job event, right? 🤔

Copy link
Member

@johnstcnjohnstcnNov 10, 2025
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Therefore, this deduplication was not happening in the previous implementation as well.

Fair enough 👍 Just wanted to make sure this was called out.

Also, there seems to be a rule to avoid calling publish within a transaction:https://github.com/coder/coder/blob/8274251f/scripts/rules.go#L143

I completely forgot about that 😁 good callout!

Meaning that all of these canceled jobs would have the same payload. So theoretically, we only need to send one job event, right? 🤔

That's how I understand it, yes!provisionerdserver.Acquirer listens to allprovisioner_job_posted events and 'wakes up' its provisioner if the tags match.

EDIT: now that I think about it more, I'm not sure that pubsub events actually impact canceled jobs.


return nil
}

func (c *StoreReconciler) deletePrebuiltWorkspace(ctx context.Context, prebuiltWorkspaceID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID) error {
return c.store.InTx(func(db database.Store) error {
return c.provisionDelete(ctx, db, prebuiltWorkspaceID, templateID, presetID, DeprovisionModeNormal)
var provisionerJob *database.ProvisionerJob
err := c.store.InTx(func(db database.Store) (err error) {
provisionerJob, err = c.provisionDelete(ctx, db, prebuiltWorkspaceID, templateID, presetID, DeprovisionModeNormal)
return err
}, &database.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: false,
})
if err != nil {
return err
}

// Publish provisioner job event to notify the acquirer that a new job was posted
c.publishProvisionerJob(ctx, provisionerJob, prebuiltWorkspaceID)

return nil
}

func (c *StoreReconciler) provision(
Expand All@@ -841,10 +873,10 @@ func (c *StoreReconciler) provision(
transition database.WorkspaceTransition,
workspace database.Workspace,
mode DeprovisionMode,
) error {
)(*database.ProvisionerJob,error) {
tvp, err := db.GetPresetParametersByTemplateVersionID(ctx, template.ActiveVersionID)
if err != nil {
return xerrors.Errorf("fetch preset details: %w", err)
returnnil,xerrors.Errorf("fetch preset details: %w", err)
}

var params []codersdk.WorkspaceBuildParameter
Expand DownExpand Up@@ -893,26 +925,34 @@ func (c *StoreReconciler) provision(
audit.WorkspaceBuildBaggage{},
)
if err != nil {
return xerrors.Errorf("provision workspace: %w", err)
returnnil,xerrors.Errorf("provision workspace: %w", err)
}

if provisionerJob == nil {
return nil
}

// Publish provisioner job event outside of transaction.
select {
case c.provisionNotifyCh <- *provisionerJob:
default: // channel full, drop the message; provisioner will pick this job up later with its periodic check, though.
c.logger.Warn(ctx, "provisioner job notification queue full, dropping",
slog.F("job_id", provisionerJob.ID), slog.F("prebuild_id", prebuildID.String()))
// This should not happen, builder.Build() should either return a job or an error.
// Returning an error to fail fast if we hit this unexpected case.
return nil, xerrors.Errorf("provision succeeded but returned no job")
}

c.logger.Info(ctx, "prebuild job scheduled", slog.F("transition", transition),
slog.F("prebuild_id", prebuildID.String()), slog.F("preset_id", presetID.String()),
slog.F("job_id", provisionerJob.ID))

return nil
return provisionerJob, nil
}

// publishProvisionerJob publishes a provisioner job event to notify the acquirer that a new job has been created.
// This must be called after the database transaction that creates the job has committed to ensure
// the job is visible to provisioners when they query the database.
func (c *StoreReconciler) publishProvisionerJob(ctx context.Context, provisionerJob *database.ProvisionerJob, workspaceID uuid.UUID) {
if provisionerJob == nil {
return
}
select {
case c.provisionNotifyCh <- *provisionerJob:
default: // channel full, drop the message; provisioner will pick this job up later with its periodic check
c.logger.Warn(ctx, "provisioner job notification queue full, dropping",
slog.F("job_id", provisionerJob.ID), slog.F("prebuild_id", workspaceID.String()))
}
}

// ForceMetricsUpdate forces the metrics collector, if defined, to update its state (we cache the metrics state to
Expand Down
2 changes: 1 addition & 1 deletionenterprise/coderd/workspaceagents_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -95,7 +95,7 @@ func TestReinitializeAgent(t *testing.T) {

// Ensure that workspace agents can reinitialize against claimed prebuilds in non-default organizations:
for _, useDefaultOrg := range []bool{true, false} {
t.Run("", func(t *testing.T) {
t.Run(fmt.Sprintf("useDefaultOrg=%t", useDefaultOrg), func(t *testing.T) {
t.Parallel()

tempAgentLog := testutil.CreateTemp(t, "", "testReinitializeAgent")
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp