Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2c77682

Browse files
committed
fix: send prebuild job notification after job build db commit
1 parent0e21480 commit2c77682

File tree

2 files changed

+68
-27
lines changed

2 files changed

+68
-27
lines changed

‎enterprise/coderd/prebuilds/reconcile.go‎

Lines changed: 67 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,8 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW
697697
returnxerrors.Errorf("failed to generate unique prebuild ID: %w",err)
698698
}
699699

700-
returnc.store.InTx(func(db database.Store)error {
700+
varprovisionerJob*database.ProvisionerJob
701+
err=c.store.InTx(func(db database.Store)error {
701702
template,err:=db.GetTemplateByID(ctx,templateID)
702703
iferr!=nil {
703704
returnxerrors.Errorf("failed to get template: %w",err)
@@ -732,11 +733,20 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW
732733
c.logger.Info(ctx,"attempting to create prebuild",slog.F("name",name),
733734
slog.F("workspace_id",prebuiltWorkspaceID.String()),slog.F("preset_id",presetID.String()))
734735

735-
returnc.provision(ctx,db,prebuiltWorkspaceID,template,presetID,database.WorkspaceTransitionStart,workspace,DeprovisionModeNormal)
736+
provisionerJob,err=c.provision(ctx,db,prebuiltWorkspaceID,template,presetID,database.WorkspaceTransitionStart,workspace,DeprovisionModeNormal)
737+
returnerr
736738
},&database.TxOptions{
737739
Isolation:sql.LevelRepeatableRead,
738740
ReadOnly:false,
739741
})
742+
iferr!=nil {
743+
returnerr
744+
}
745+
746+
// Publish provisioner job event to notify the acquirer that a new job was posted
747+
c.publishProvisionerJob(ctx,provisionerJob,prebuiltWorkspaceID)
748+
749+
returnnil
740750
}
741751

742752
// provisionDelete provisions a delete transition for a prebuilt workspace.
@@ -748,26 +758,26 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW
748758
//
749759
// IMPORTANT: This function must be called within a database transaction. It does not create its own transaction.
750760
// The caller is responsible for managing the transaction boundary via db.InTx().
751-
func (c*StoreReconciler)provisionDelete(ctx context.Context,db database.Store,workspaceID uuid.UUID,templateID uuid.UUID,presetID uuid.UUID,modeDeprovisionMode)error {
761+
func (c*StoreReconciler)provisionDelete(ctx context.Context,db database.Store,workspaceID uuid.UUID,templateID uuid.UUID,presetID uuid.UUID,modeDeprovisionMode)(*database.ProvisionerJob,error) {
752762
workspace,err:=db.GetWorkspaceByID(ctx,workspaceID)
753763
iferr!=nil {
754-
returnxerrors.Errorf("get workspace by ID: %w",err)
764+
returnnil,xerrors.Errorf("get workspace by ID: %w",err)
755765
}
756766

757767
template,err:=db.GetTemplateByID(ctx,templateID)
758768
iferr!=nil {
759-
returnxerrors.Errorf("failed to get template: %w",err)
769+
returnnil,xerrors.Errorf("failed to get template: %w",err)
760770
}
761771

762772
ifworkspace.OwnerID!=database.PrebuildsSystemUserID {
763-
returnxerrors.Errorf("prebuilt workspace is not owned by prebuild user anymore, probably it was claimed")
773+
returnnil,xerrors.Errorf("prebuilt workspace is not owned by prebuild user anymore, probably it was claimed")
764774
}
765775

766776
c.logger.Info(ctx,"attempting to delete prebuild",slog.F("orphan",mode.String()),
767777
slog.F("name",workspace.Name),slog.F("workspace_id",workspaceID.String()),slog.F("preset_id",presetID.String()))
768778

769-
returnc.provision(ctx,db,workspaceID,template,presetID,
770-
database.WorkspaceTransitionDelete,workspace,mode)
779+
provisionerJob,err:=c.provision(ctx,db,workspaceID,template,presetID,database.WorkspaceTransitionDelete,workspace,mode)
780+
returnprovisionerJob,err
771781
}
772782

773783
// cancelAndOrphanDeletePendingPrebuilds cancels pending prebuild jobs from inactive template versions
@@ -779,7 +789,8 @@ func (c *StoreReconciler) provisionDelete(ctx context.Context, db database.Store
779789
// Since these jobs were never processed by a provisioner, no Terraform resources were created,
780790
// making it safe to orphan-delete the workspaces (skipping Terraform destroy).
781791
func (c*StoreReconciler)cancelAndOrphanDeletePendingPrebuilds(ctx context.Context,templateID uuid.UUID,templateVersionID uuid.UUID,presetID uuid.UUID)error {
782-
returnc.store.InTx(func(db database.Store)error {
792+
provisionerJobs:=make(map[uuid.UUID]*database.ProvisionerJob)
793+
err:=c.store.InTx(func(db database.Store)error {
783794
canceledJobs,err:=db.UpdatePrebuildProvisionerJobWithCancel(
784795
ctx,
785796
database.UpdatePrebuildProvisionerJobWithCancelParams{
@@ -808,11 +819,13 @@ func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Cont
808819

809820
varmultiErr multierror.Error
810821
for_,job:=rangecanceledJobs {
811-
err=c.provisionDelete(ctx,db,job.WorkspaceID,job.TemplateID,presetID,DeprovisionModeOrphan)
822+
provisionerJob,err:=c.provisionDelete(ctx,db,job.WorkspaceID,job.TemplateID,presetID,DeprovisionModeOrphan)
812823
iferr!=nil {
813824
c.logger.Error(ctx,"failed to orphan delete canceled prebuild",
814825
slog.F("workspace_id",job.WorkspaceID.String()),slog.Error(err))
815826
multiErr.Errors=append(multiErr.Errors,err)
827+
}else {
828+
provisionerJobs[job.WorkspaceID]=provisionerJob
816829
}
817830
}
818831

@@ -821,15 +834,35 @@ func (c *StoreReconciler) cancelAndOrphanDeletePendingPrebuilds(ctx context.Cont
821834
Isolation:sql.LevelRepeatableRead,
822835
ReadOnly:false,
823836
})
837+
iferr!=nil {
838+
returnerr
839+
}
840+
841+
// Publish provisioner job events to notify the acquirer that new jobs were posted
842+
forworkspaceID,job:=rangeprovisionerJobs {
843+
c.publishProvisionerJob(ctx,job,workspaceID)
844+
}
845+
846+
returnnil
824847
}
825848

826849
func (c*StoreReconciler)deletePrebuiltWorkspace(ctx context.Context,prebuiltWorkspaceID uuid.UUID,templateID uuid.UUID,presetID uuid.UUID)error {
827-
returnc.store.InTx(func(db database.Store)error {
828-
returnc.provisionDelete(ctx,db,prebuiltWorkspaceID,templateID,presetID,DeprovisionModeNormal)
850+
varprovisionerJob*database.ProvisionerJob
851+
err:=c.store.InTx(func(db database.Store) (errerror) {
852+
provisionerJob,err=c.provisionDelete(ctx,db,prebuiltWorkspaceID,templateID,presetID,DeprovisionModeNormal)
853+
returnerr
829854
},&database.TxOptions{
830855
Isolation:sql.LevelRepeatableRead,
831856
ReadOnly:false,
832857
})
858+
iferr!=nil {
859+
returnerr
860+
}
861+
862+
// Publish provisioner job event to notify the acquirer that a new job was posted
863+
c.publishProvisionerJob(ctx,provisionerJob,prebuiltWorkspaceID)
864+
865+
returnnil
833866
}
834867

835868
func (c*StoreReconciler)provision(
@@ -841,10 +874,10 @@ func (c *StoreReconciler) provision(
841874
transition database.WorkspaceTransition,
842875
workspace database.Workspace,
843876
modeDeprovisionMode,
844-
)error {
877+
)(*database.ProvisionerJob,error) {
845878
tvp,err:=db.GetPresetParametersByTemplateVersionID(ctx,template.ActiveVersionID)
846879
iferr!=nil {
847-
returnxerrors.Errorf("fetch preset details: %w",err)
880+
returnnil,xerrors.Errorf("fetch preset details: %w",err)
848881
}
849882

850883
varparams []codersdk.WorkspaceBuildParameter
@@ -893,26 +926,34 @@ func (c *StoreReconciler) provision(
893926
audit.WorkspaceBuildBaggage{},
894927
)
895928
iferr!=nil {
896-
returnxerrors.Errorf("provision workspace: %w",err)
929+
returnnil,xerrors.Errorf("provision workspace: %w",err)
897930
}
898-
899931
ifprovisionerJob==nil {
900-
returnnil
901-
}
902-
903-
// Publish provisioner job event outside of transaction.
904-
select {
905-
casec.provisionNotifyCh<-*provisionerJob:
906-
default:// channel full, drop the message; provisioner will pick this job up later with its periodic check, though.
907-
c.logger.Warn(ctx,"provisioner job notification queue full, dropping",
908-
slog.F("job_id",provisionerJob.ID),slog.F("prebuild_id",prebuildID.String()))
932+
// This should not happen, builder.Build() should either return a job or an error.
933+
// Returning an error to fail fast if we hit this unexpected case.
934+
returnnil,xerrors.Errorf("provision succeeded but returned no job")
909935
}
910936

911937
c.logger.Info(ctx,"prebuild job scheduled",slog.F("transition",transition),
912938
slog.F("prebuild_id",prebuildID.String()),slog.F("preset_id",presetID.String()),
913939
slog.F("job_id",provisionerJob.ID))
914940

915-
returnnil
941+
returnprovisionerJob,nil
942+
}
943+
944+
// publishProvisionerJob publishes a provisioner job event to notify the acquirer that a new job has been created.
945+
// This must be called after the database transaction that creates the job has committed to ensure
946+
// the job is visible to provisioners when they query the database.
947+
func (c*StoreReconciler)publishProvisionerJob(ctx context.Context,provisionerJob*database.ProvisionerJob,workspaceID uuid.UUID) {
948+
ifprovisionerJob==nil {
949+
return
950+
}
951+
select {
952+
casec.provisionNotifyCh<-*provisionerJob:
953+
default:// channel full, drop the message; provisioner will pick this job up later with its periodic check
954+
c.logger.Warn(ctx,"provisioner job notification queue full, dropping",
955+
slog.F("job_id",provisionerJob.ID),slog.F("prebuild_id",workspaceID.String()))
956+
}
916957
}
917958

918959
// ForceMetricsUpdate forces the metrics collector, if defined, to update its state (we cache the metrics state to

‎enterprise/coderd/workspaceagents_test.go‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func TestReinitializeAgent(t *testing.T) {
9595

9696
// Ensure that workspace agents can reinitialize against claimed prebuilds in non-default organizations:
9797
for_,useDefaultOrg:=range []bool{true,false} {
98-
t.Run("",func(t*testing.T) {
98+
t.Run(fmt.Sprintf("useDefaultOrg=%t",useDefaultOrg),func(t*testing.T) {
9999
t.Parallel()
100100

101101
tempAgentLog:=testutil.CreateTemp(t,"","testReinitializeAgent")

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp