Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit375c70d

Browse files
authored
feat: integrate Acquirer for provisioner jobs (#9717)
* chore: add Acquirer to provisionerdserver pkgSigned-off-by: Spike Curtis <spike@coder.com>* code review improvements & fixesSigned-off-by: Spike Curtis <spike@coder.com>* feat: integrate Acquirer for provisioner jobsSigned-off-by: Spike Curtis <spike@coder.com>* Fix imports, whitespaceSigned-off-by: Spike Curtis <spike@coder.com>* provisionerdserver always closes; remove poll interval from playwrightSigned-off-by: Spike Curtis <spike@coder.com>* post jobs outside transactionsSigned-off-by: Spike Curtis <spike@coder.com>* graceful shutdown in testSigned-off-by: Spike Curtis <spike@coder.com>* Mark AcquireJob deprecatedSigned-off-by: Spike Curtis <spike@coder.com>* Graceful shutdown on all provisionerd testsSigned-off-by: Spike Curtis <spike@coder.com>* Deprecate, not remove CLI flagsSigned-off-by: Spike Curtis <spike@coder.com>---------Signed-off-by: Spike Curtis <spike@coder.com>
1 parent6cf531b commit375c70d

File tree

41 files changed

+1486
-1085
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1486
-1085
lines changed

‎cli/server.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,7 +1012,8 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
10121012

10131013
autobuildTicker:=time.NewTicker(vals.AutobuildPollInterval.Value())
10141014
deferautobuildTicker.Stop()
1015-
autobuildExecutor:=autobuild.NewExecutor(ctx,options.Database,coderAPI.TemplateScheduleStore,logger,autobuildTicker.C)
1015+
autobuildExecutor:=autobuild.NewExecutor(
1016+
ctx,options.Database,options.Pubsub,coderAPI.TemplateScheduleStore,logger,autobuildTicker.C)
10161017
autobuildExecutor.Run()
10171018

10181019
hangDetectorTicker:=time.NewTicker(vals.JobHangDetectorInterval.Value())
@@ -1378,16 +1379,12 @@ func newProvisionerDaemon(
13781379
connector[string(database.ProvisionerTypeTerraform)]=sdkproto.NewDRPCProvisionerClient(terraformClient)
13791380
}
13801381

1381-
debounce:=time.Second
13821382
returnprovisionerd.New(func(ctx context.Context) (proto.DRPCProvisionerDaemonClient,error) {
13831383
// This debounces calls to listen every second. Read the comment
13841384
// in provisionerdserver.go to learn more!
1385-
returncoderAPI.CreateInMemoryProvisionerDaemon(ctx,debounce)
1385+
returncoderAPI.CreateInMemoryProvisionerDaemon(ctx)
13861386
},&provisionerd.Options{
13871387
Logger:logger.Named("provisionerd"),
1388-
JobPollInterval:cfg.Provisioner.DaemonPollInterval.Value(),
1389-
JobPollJitter:cfg.Provisioner.DaemonPollJitter.Value(),
1390-
JobPollDebounce:debounce,
13911388
UpdateInterval:time.Second,
13921389
ForceCancelInterval:cfg.Provisioner.ForceCancelInterval.Value(),
13931390
Connector:connector,

‎cli/testdata/coder_server_--help.golden

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,10 +393,10 @@ updating, and deleting workspace resources.
393393
Time to force cancel provisioning tasks that are stuck.
394394

395395
--provisioner-daemon-poll-interval duration, $CODER_PROVISIONER_DAEMON_POLL_INTERVAL (default: 1s)
396-
Time to wait before polling for a new job.
396+
Deprecated and ignored.
397397

398398
--provisioner-daemon-poll-jitter duration, $CODER_PROVISIONER_DAEMON_POLL_JITTER (default: 100ms)
399-
Random jitter added to the poll interval.
399+
Deprecated and ignored.
400400

401401
--provisioner-daemon-psk string, $CODER_PROVISIONER_DAEMON_PSK
402402
Pre-shared key to authenticate external provisioner daemons to Coder

‎cli/testdata/server-config.yaml.golden

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,10 +348,10 @@ provisioning:
348348
# tests.
349349
# (default: false, type: bool)
350350
daemonsEcho: false
351-
#Time to wait before polling for a new job.
351+
#Deprecated and ignored.
352352
# (default: 1s, type: duration)
353353
daemonPollInterval: 1s
354-
#Random jitter added to the poll interval.
354+
#Deprecated and ignored.
355355
# (default: 100ms, type: duration)
356356
daemonPollJitter: 100ms
357357
# Time to force cancel provisioning tasks that are stuck.

‎coderd/activitybump_internal_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func Test_ActivityBumpWorkspace(t *testing.T) {
134134
TemplateID:template.ID,
135135
Ttl: sql.NullInt64{Valid:true,Int64:int64(tt.workspaceTTL)},
136136
})
137-
job=dbgen.ProvisionerJob(t,db, database.ProvisionerJob{
137+
job=dbgen.ProvisionerJob(t,db,nil,database.ProvisionerJob{
138138
OrganizationID:org.ID,
139139
CompletedAt:tt.jobCompletedAt,
140140
})
@@ -225,7 +225,7 @@ func Test_ActivityBumpWorkspace(t *testing.T) {
225225
funcinsertPrevWorkspaceBuild(t*testing.T,db database.Store,orgID,tvID,workspaceID uuid.UUID,transition database.WorkspaceTransition,buildNumberint32) {
226226
t.Helper()
227227

228-
job:=dbgen.ProvisionerJob(t,db, database.ProvisionerJob{
228+
job:=dbgen.ProvisionerJob(t,db,nil,database.ProvisionerJob{
229229
OrganizationID:orgID,
230230
})
231231
_=dbgen.WorkspaceResource(t,db, database.WorkspaceResource{

‎coderd/autobuild/lifecycle_executor.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
"github.com/coder/coder/v2/coderd/database/db2sdk"
1717
"github.com/coder/coder/v2/coderd/database/dbauthz"
1818
"github.com/coder/coder/v2/coderd/database/dbtime"
19+
"github.com/coder/coder/v2/coderd/database/provisionerjobs"
20+
"github.com/coder/coder/v2/coderd/database/pubsub"
1921
"github.com/coder/coder/v2/coderd/schedule"
2022
"github.com/coder/coder/v2/coderd/schedule/cron"
2123
"github.com/coder/coder/v2/coderd/wsbuilder"
@@ -26,6 +28,7 @@ import (
2628
typeExecutorstruct {
2729
ctx context.Context
2830
db database.Store
31+
ps pubsub.Pubsub
2932
templateScheduleStore*atomic.Pointer[schedule.TemplateScheduleStore]
3033
log slog.Logger
3134
tick<-chan time.Time
@@ -40,11 +43,12 @@ type Stats struct {
4043
}
4144

4245
// New returns a new wsactions executor.
43-
funcNewExecutor(ctx context.Context,db database.Store,tss*atomic.Pointer[schedule.TemplateScheduleStore],log slog.Logger,tick<-chan time.Time)*Executor {
46+
funcNewExecutor(ctx context.Context,db database.Store,ps pubsub.Pubsub,tss*atomic.Pointer[schedule.TemplateScheduleStore],log slog.Logger,tick<-chan time.Time)*Executor {
4447
le:=&Executor{
4548
//nolint:gocritic // Autostart has a limited set of permissions.
4649
ctx:dbauthz.AsAutostart(ctx),
4750
db:db,
51+
ps:ps,
4852
templateScheduleStore:tss,
4953
tick:tick,
5054
log:log.Named("autobuild"),
@@ -129,6 +133,7 @@ func (e *Executor) runOnce(t time.Time) Stats {
129133
log:=e.log.With(slog.F("workspace_id",wsID))
130134

131135
eg.Go(func()error {
136+
varjob*database.ProvisionerJob
132137
err:=e.db.InTx(func(tx database.Store)error {
133138
// Re-check eligibility since the first check was outside the
134139
// transaction and the workspace settings may have changed.
@@ -168,7 +173,8 @@ func (e *Executor) runOnce(t time.Time) Stats {
168173
SetLastWorkspaceBuildJobInTx(&latestJob).
169174
Reason(reason)
170175

171-
if_,_,err:=builder.Build(e.ctx,tx,nil);err!=nil {
176+
_,job,err=builder.Build(e.ctx,tx,nil)
177+
iferr!=nil {
172178
log.Error(e.ctx,"unable to transition workspace",
173179
slog.F("transition",nextTransition),
174180
slog.Error(err),
@@ -230,6 +236,17 @@ func (e *Executor) runOnce(t time.Time) Stats {
230236
iferr!=nil {
231237
log.Error(e.ctx,"workspace scheduling failed",slog.Error(err))
232238
}
239+
ifjob!=nil&&err==nil {
240+
// Note that we can't refactor such that posting the job happens inside wsbuilder because it's called
241+
// with an outer transaction like this, and we need to make sure the outer transaction commits before
242+
// posting the job. If we post before the transaction commits, provisionerd might try to acquire the
243+
// job, fail, and then sit idle instead of picking up the job.
244+
err=provisionerjobs.PostJob(e.ps,*job)
245+
iferr!=nil {
246+
// Client probably doesn't care about this error, so just log it.
247+
log.Error(e.ctx,"failed to post provisioner job to pubsub",slog.Error(err))
248+
}
249+
}
233250
returnnil
234251
})
235252
}

‎coderd/batchstats/batcher_internal_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/coder/coder/v2/coderd/database/dbgen"
1515
"github.com/coder/coder/v2/coderd/database/dbtestutil"
1616
"github.com/coder/coder/v2/coderd/database/dbtime"
17+
"github.com/coder/coder/v2/coderd/database/pubsub"
1718
"github.com/coder/coder/v2/coderd/rbac"
1819
"github.com/coder/coder/v2/codersdk/agentsdk"
1920
"github.com/coder/coder/v2/cryptorand"
@@ -26,11 +27,11 @@ func TestBatchStats(t *testing.T) {
2627
ctx,cancel:=context.WithCancel(context.Background())
2728
t.Cleanup(cancel)
2829
log:=slogtest.Make(t,&slogtest.Options{IgnoreErrors:true}).Leveled(slog.LevelDebug)
29-
store,_:=dbtestutil.NewDB(t)
30+
store,ps:=dbtestutil.NewDB(t)
3031

3132
// Set up some test dependencies.
32-
deps1:=setupDeps(t,store)
33-
deps2:=setupDeps(t,store)
33+
deps1:=setupDeps(t,store,ps)
34+
deps2:=setupDeps(t,store,ps)
3435
tick:=make(chan time.Time)
3536
flushed:=make(chanint,1)
3637

@@ -168,7 +169,7 @@ type deps struct {
168169
// It creates an organization, user, template, workspace, and agent
169170
// along with all the other miscellaneous plumbing required to link
170171
// them together.
171-
funcsetupDeps(t*testing.T,store database.Store)deps {
172+
funcsetupDeps(t*testing.T,store database.Store,ps pubsub.Pubsub)deps {
172173
t.Helper()
173174

174175
org:=dbgen.Organization(t,store, database.Organization{})
@@ -194,7 +195,7 @@ func setupDeps(t *testing.T, store database.Store) deps {
194195
OrganizationID:org.ID,
195196
LastUsedAt:time.Now().Add(-time.Hour),
196197
})
197-
pj:=dbgen.ProvisionerJob(t,store, database.ProvisionerJob{
198+
pj:=dbgen.ProvisionerJob(t,store,ps,database.ProvisionerJob{
198199
InitiatorID:user.ID,
199200
OrganizationID:org.ID,
200201
})

‎coderd/coderd.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"crypto/tls"
66
"crypto/x509"
7-
"encoding/json"
87
"flag"
98
"fmt"
109
"io"
@@ -366,6 +365,11 @@ func New(options *Options) *API {
366365
UserQuietHoursScheduleStore:options.UserQuietHoursScheduleStore,
367366
Experiments:experiments,
368367
healthCheckGroup:&singleflight.Group[string,*healthcheck.Report]{},
368+
Acquirer:provisionerdserver.NewAcquirer(
369+
ctx,
370+
options.Logger.Named("acquirer"),
371+
options.Database,
372+
options.Pubsub),
369373
}
370374
ifoptions.UpdateCheckOptions!=nil {
371375
api.updateChecker=updatecheck.New(
@@ -1016,6 +1020,8 @@ type API struct {
10161020
healthCheckCache atomic.Pointer[healthcheck.Report]
10171021

10181022
statsBatcher*batchstats.Batcher
1023+
1024+
Acquirer*provisionerdserver.Acquirer
10191025
}
10201026

10211027
// Close waits for all WebSocket connections to drain before returning.
@@ -1067,7 +1073,7 @@ func compressHandler(h http.Handler) http.Handler {
10671073

10681074
// CreateInMemoryProvisionerDaemon is an in-memory connection to a provisionerd.
10691075
// Useful when starting coderd and provisionerd in the same process.
1070-
func (api*API)CreateInMemoryProvisionerDaemon(ctx context.Context,debounce time.Duration) (client proto.DRPCProvisionerDaemonClient,errerror) {
1076+
func (api*API)CreateInMemoryProvisionerDaemon(ctx context.Context) (client proto.DRPCProvisionerDaemonClient,errerror) {
10711077
tracer:=api.TracerProvider.Tracer(tracing.TracerName)
10721078
clientSession,serverSession:=provisionersdk.MemTransportPipe()
10731079
deferfunc() {
@@ -1077,11 +1083,8 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce ti
10771083
}
10781084
}()
10791085

1080-
tags,err:=json.Marshal(database.StringMap{
1086+
tags:=provisionerdserver.Tags{
10811087
provisionerdserver.TagScope:provisionerdserver.ScopeOrganization,
1082-
})
1083-
iferr!=nil {
1084-
returnnil,xerrors.Errorf("marshal tags: %w",err)
10851088
}
10861089

10871090
mux:=drpcmux.New()
@@ -1098,14 +1101,14 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce ti
10981101
tags,
10991102
api.Database,
11001103
api.Pubsub,
1104+
api.Acquirer,
11011105
api.Telemetry,
11021106
tracer,
11031107
&api.QuotaCommitter,
11041108
&api.Auditor,
11051109
api.TemplateScheduleStore,
11061110
api.UserQuietHoursScheduleStore,
11071111
api.DeploymentValues,
1108-
debounce,
11091112
provisionerdserver.Options{
11101113
OIDCConfig:api.OIDCConfig,
11111114
GitAuthConfigs:api.GitAuthConfigs,

‎coderd/coderdtest/coderdtest.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
266266
lifecycleExecutor:=autobuild.NewExecutor(
267267
ctx,
268268
options.Database,
269+
options.Pubsub,
269270
&templateScheduleStore,
270271
slogtest.Make(t,nil).Named("autobuild.executor").Leveled(slog.LevelDebug),
271272
options.AutobuildTicker,
@@ -453,6 +454,30 @@ func NewWithAPI(t testing.TB, options *Options) (*codersdk.Client, io.Closer, *c
453454
returnclient,provisionerCloser,coderAPI
454455
}
455456

457+
// provisionerdCloser wraps a provisioner daemon as an io.Closer that can be called multiple times
458+
typeprovisionerdCloserstruct {
459+
mu sync.Mutex
460+
closedbool
461+
d*provisionerd.Server
462+
}
463+
464+
func (c*provisionerdCloser)Close()error {
465+
c.mu.Lock()
466+
deferc.mu.Unlock()
467+
ifc.closed {
468+
returnnil
469+
}
470+
c.closed=true
471+
ctx,cancel:=context.WithTimeout(context.Background(),testutil.WaitShort)
472+
defercancel()
473+
shutdownErr:=c.d.Shutdown(ctx)
474+
closeErr:=c.d.Close()
475+
ifshutdownErr!=nil {
476+
returnshutdownErr
477+
}
478+
returncloseErr
479+
}
480+
456481
// NewProvisionerDaemon launches a provisionerd instance configured to work
457482
// well with coderd testing. It registers the "echo" provisioner for
458483
// quick testing.
@@ -482,17 +507,17 @@ func NewProvisionerDaemon(t testing.TB, coderAPI *coderd.API) io.Closer {
482507
assert.NoError(t,err)
483508
}()
484509

485-
closer:=provisionerd.New(func(ctx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient,error) {
486-
returncoderAPI.CreateInMemoryProvisionerDaemon(ctx,0)
510+
daemon:=provisionerd.New(func(ctx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient,error) {
511+
returncoderAPI.CreateInMemoryProvisionerDaemon(ctx)
487512
},&provisionerd.Options{
488513
Logger:coderAPI.Logger.Named("provisionerd").Leveled(slog.LevelDebug),
489-
JobPollInterval:50*time.Millisecond,
490514
UpdateInterval:250*time.Millisecond,
491515
ForceCancelInterval:time.Second,
492516
Connector: provisionerd.LocalProvisioners{
493517
string(database.ProvisionerTypeEcho):sdkproto.NewDRPCProvisionerClient(echoClient),
494518
},
495519
})
520+
closer:=&provisionerdCloser{d:daemon}
496521
t.Cleanup(func() {
497522
_=closer.Close()
498523
})
@@ -518,21 +543,21 @@ func NewExternalProvisionerDaemon(t *testing.T, client *codersdk.Client, org uui
518543
assert.NoError(t,err)
519544
}()
520545

521-
closer:=provisionerd.New(func(ctx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient,error) {
546+
daemon:=provisionerd.New(func(ctx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient,error) {
522547
returnclient.ServeProvisionerDaemon(ctx, codersdk.ServeProvisionerDaemonRequest{
523548
Organization:org,
524549
Provisioners: []codersdk.ProvisionerType{codersdk.ProvisionerTypeEcho},
525550
Tags:tags,
526551
})
527552
},&provisionerd.Options{
528553
Logger:slogtest.Make(t,nil).Named("provisionerd").Leveled(slog.LevelDebug),
529-
JobPollInterval:50*time.Millisecond,
530554
UpdateInterval:250*time.Millisecond,
531555
ForceCancelInterval:time.Second,
532556
Connector: provisionerd.LocalProvisioners{
533557
string(database.ProvisionerTypeEcho):sdkproto.NewDRPCProvisionerClient(echoClient),
534558
},
535559
})
560+
closer:=&provisionerdCloser{d:daemon}
536561
t.Cleanup(func() {
537562
_=closer.Close()
538563
})

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp