Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat: integrate Acquirer for provisioner jobs#9717

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
spikecurtis merged 12 commits intomainfromspike/9428-pt2-integrate-acquirer
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
Show all changes
12 commits
Select commitHold shift + click to select a range
1e53245
chore: add Acquirer to provisionerdserver pkg
spikecurtisSep 8, 2023
1ceac51
code review improvements & fixes
spikecurtisSep 13, 2023
23d0e3c
feat: integrate Acquirer for provisioner jobs
spikecurtisSep 13, 2023
bb22d9b
Merge branch 'main' of https://github.com/coder/coder into spike/9428…
spikecurtisSep 18, 2023
403831c
Fix imports, whitespace
spikecurtisSep 18, 2023
83df4d1
provisionerdserver always closes; remove poll interval from playwright
spikecurtisSep 18, 2023
76d7ac7
post jobs outside transactions
spikecurtisSep 18, 2023
13181b3
graceful shutdown in test
spikecurtisSep 18, 2023
ab3a73b
Mark AcquireJob deprecated
spikecurtisSep 18, 2023
d4070d2
Graceful shutdown on all provisionerd tests
spikecurtisSep 18, 2023
e845140
Deprecate, not remove CLI flags
spikecurtisSep 18, 2023
27162b7
Merge branch 'main' of https://github.com/coder/coder into spike/9428…
spikecurtisSep 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletionscli/server.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -1012,7 +1012,8 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.

autobuildTicker := time.NewTicker(vals.AutobuildPollInterval.Value())
defer autobuildTicker.Stop()
autobuildExecutor := autobuild.NewExecutor(ctx, options.Database, coderAPI.TemplateScheduleStore, logger, autobuildTicker.C)
autobuildExecutor := autobuild.NewExecutor(
ctx, options.Database, options.Pubsub, coderAPI.TemplateScheduleStore, logger, autobuildTicker.C)
autobuildExecutor.Run()

hangDetectorTicker := time.NewTicker(vals.JobHangDetectorInterval.Value())
Expand DownExpand Up@@ -1378,16 +1379,12 @@ func newProvisionerDaemon(
connector[string(database.ProvisionerTypeTerraform)] = sdkproto.NewDRPCProvisionerClient(terraformClient)
}

debounce := time.Second
return provisionerd.New(func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) {
// This debounces calls to listen every second. Read the comment
// in provisionerdserver.go to learn more!
return coderAPI.CreateInMemoryProvisionerDaemon(ctx, debounce)
return coderAPI.CreateInMemoryProvisionerDaemon(ctx)
}, &provisionerd.Options{
Logger: logger.Named("provisionerd"),
JobPollInterval: cfg.Provisioner.DaemonPollInterval.Value(),
JobPollJitter: cfg.Provisioner.DaemonPollJitter.Value(),
JobPollDebounce: debounce,
UpdateInterval: time.Second,
ForceCancelInterval: cfg.Provisioner.ForceCancelInterval.Value(),
Connector: connector,
Expand Down
4 changes: 2 additions & 2 deletionscli/testdata/coder_server_--help.golden
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -393,10 +393,10 @@ updating, and deleting workspace resources.
Time to force cancel provisioning tasks that are stuck.

--provisioner-daemon-poll-interval duration, $CODER_PROVISIONER_DAEMON_POLL_INTERVAL (default: 1s)
Time to wait before polling for a new job.
Deprecated and ignored.

--provisioner-daemon-poll-jitter duration, $CODER_PROVISIONER_DAEMON_POLL_JITTER (default: 100ms)
Random jitter added to the poll interval.
Deprecated and ignored.

--provisioner-daemon-psk string, $CODER_PROVISIONER_DAEMON_PSK
Pre-shared key to authenticate external provisioner daemons to Coder
Expand Down
4 changes: 2 additions & 2 deletionscli/testdata/server-config.yaml.golden
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -348,10 +348,10 @@ provisioning:
# tests.
# (default: false, type: bool)
daemonsEcho: false
#Time to wait before polling for a new job.
#Deprecated and ignored.
# (default: 1s, type: duration)
daemonPollInterval: 1s
#Random jitter added to the poll interval.
#Deprecated and ignored.
# (default: 100ms, type: duration)
daemonPollJitter: 100ms
# Time to force cancel provisioning tasks that are stuck.
Expand Down
4 changes: 2 additions & 2 deletionscoderd/activitybump_internal_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -134,7 +134,7 @@ func Test_ActivityBumpWorkspace(t *testing.T) {
TemplateID: template.ID,
Ttl: sql.NullInt64{Valid: true, Int64: int64(tt.workspaceTTL)},
})
job = dbgen.ProvisionerJob(t, db, database.ProvisionerJob{
job = dbgen.ProvisionerJob(t, db,nil,database.ProvisionerJob{
OrganizationID: org.ID,
CompletedAt: tt.jobCompletedAt,
})
Expand DownExpand Up@@ -225,7 +225,7 @@ func Test_ActivityBumpWorkspace(t *testing.T) {
func insertPrevWorkspaceBuild(t *testing.T, db database.Store, orgID, tvID, workspaceID uuid.UUID, transition database.WorkspaceTransition, buildNumber int32) {
t.Helper()

job := dbgen.ProvisionerJob(t, db, database.ProvisionerJob{
job := dbgen.ProvisionerJob(t, db,nil,database.ProvisionerJob{
OrganizationID: orgID,
})
_ = dbgen.WorkspaceResource(t, db, database.WorkspaceResource{
Expand Down
21 changes: 19 additions & 2 deletionscoderd/autobuild/lifecycle_executor.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -16,6 +16,8 @@ import (
"github.com/coder/coder/v2/coderd/database/db2sdk"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/database/provisionerjobs"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/schedule"
"github.com/coder/coder/v2/coderd/schedule/cron"
"github.com/coder/coder/v2/coderd/wsbuilder"
Expand All@@ -26,6 +28,7 @@ import (
type Executor struct {
ctx context.Context
db database.Store
ps pubsub.Pubsub
templateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
log slog.Logger
tick <-chan time.Time
Expand All@@ -40,11 +43,12 @@ type Stats struct {
}

// New returns a new wsactions executor.
func NewExecutor(ctx context.Context, db database.Store, tss *atomic.Pointer[schedule.TemplateScheduleStore], log slog.Logger, tick <-chan time.Time) *Executor {
func NewExecutor(ctx context.Context, db database.Store,ps pubsub.Pubsub,tss *atomic.Pointer[schedule.TemplateScheduleStore], log slog.Logger, tick <-chan time.Time) *Executor {
le := &Executor{
//nolint:gocritic // Autostart has a limited set of permissions.
ctx: dbauthz.AsAutostart(ctx),
db: db,
ps: ps,
templateScheduleStore: tss,
tick: tick,
log: log.Named("autobuild"),
Expand DownExpand Up@@ -129,6 +133,7 @@ func (e *Executor) runOnce(t time.Time) Stats {
log := e.log.With(slog.F("workspace_id", wsID))

eg.Go(func() error {
var job *database.ProvisionerJob
err := e.db.InTx(func(tx database.Store) error {
// Re-check eligibility since the first check was outside the
// transaction and the workspace settings may have changed.
Expand DownExpand Up@@ -168,7 +173,8 @@ func (e *Executor) runOnce(t time.Time) Stats {
SetLastWorkspaceBuildJobInTx(&latestJob).
Reason(reason)

if _, _, err := builder.Build(e.ctx, tx, nil); err != nil {
_, job, err = builder.Build(e.ctx, tx, nil)
if err != nil {
log.Error(e.ctx, "unable to transition workspace",
slog.F("transition", nextTransition),
slog.Error(err),
Expand DownExpand Up@@ -230,6 +236,17 @@ func (e *Executor) runOnce(t time.Time) Stats {
if err != nil {
log.Error(e.ctx, "workspace scheduling failed", slog.Error(err))
}
if job != nil && err == nil {
// Note that we can't refactor such that posting the job happens inside wsbuilder because it's called
// with an outer transaction like this, and we need to make sure the outer transaction commits before
// posting the job. If we post before the transaction commits, provisionerd might try to acquire the
// job, fail, and then sit idle instead of picking up the job.
err = provisionerjobs.PostJob(e.ps, *job)
if err != nil {
// Client probably doesn't care about this error, so just log it.
log.Error(e.ctx, "failed to post provisioner job to pubsub", slog.Error(err))
}
}
return nil
})
}
Expand Down
11 changes: 6 additions & 5 deletionscoderd/batchstats/batcher_internal_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -14,6 +14,7 @@ import (
"github.com/coder/coder/v2/coderd/database/dbgen"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/cryptorand"
Expand All@@ -26,11 +27,11 @@ func TestBatchStats(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
log := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
store,_ := dbtestutil.NewDB(t)
store,ps := dbtestutil.NewDB(t)

// Set up some test dependencies.
deps1 := setupDeps(t, store)
deps2 := setupDeps(t, store)
deps1 := setupDeps(t, store, ps)
deps2 := setupDeps(t, store, ps)
tick := make(chan time.Time)
flushed := make(chan int, 1)

Expand DownExpand Up@@ -168,7 +169,7 @@ type deps struct {
// It creates an organization, user, template, workspace, and agent
// along with all the other miscellaneous plumbing required to link
// them together.
func setupDeps(t *testing.T, store database.Store) deps {
func setupDeps(t *testing.T, store database.Store, ps pubsub.Pubsub) deps {
t.Helper()

org := dbgen.Organization(t, store, database.Organization{})
Expand All@@ -194,7 +195,7 @@ func setupDeps(t *testing.T, store database.Store) deps {
OrganizationID: org.ID,
LastUsedAt: time.Now().Add(-time.Hour),
})
pj := dbgen.ProvisionerJob(t, store, database.ProvisionerJob{
pj := dbgen.ProvisionerJob(t, store,ps,database.ProvisionerJob{
InitiatorID: user.ID,
OrganizationID: org.ID,
})
Expand Down
17 changes: 10 additions & 7 deletionscoderd/coderd.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -4,7 +4,6 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"flag"
"fmt"
"io"
Expand DownExpand Up@@ -366,6 +365,11 @@ func New(options *Options) *API {
UserQuietHoursScheduleStore: options.UserQuietHoursScheduleStore,
Experiments: experiments,
healthCheckGroup: &singleflight.Group[string, *healthcheck.Report]{},
Acquirer: provisionerdserver.NewAcquirer(
ctx,
options.Logger.Named("acquirer"),
options.Database,
options.Pubsub),
}
if options.UpdateCheckOptions != nil {
api.updateChecker = updatecheck.New(
Expand DownExpand Up@@ -1016,6 +1020,8 @@ type API struct {
healthCheckCache atomic.Pointer[healthcheck.Report]

statsBatcher *batchstats.Batcher

Acquirer *provisionerdserver.Acquirer
}

// Close waits for all WebSocket connections to drain before returning.
Expand DownExpand Up@@ -1067,7 +1073,7 @@ func compressHandler(h http.Handler) http.Handler {

// CreateInMemoryProvisionerDaemon is an in-memory connection to a provisionerd.
// Useful when starting coderd and provisionerd in the same process.
func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce time.Duration) (client proto.DRPCProvisionerDaemonClient, err error) {
func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context) (client proto.DRPCProvisionerDaemonClient, err error) {
tracer := api.TracerProvider.Tracer(tracing.TracerName)
clientSession, serverSession := provisionersdk.MemTransportPipe()
defer func() {
Expand All@@ -1077,11 +1083,8 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce ti
}
}()

tags, err:=json.Marshal(database.StringMap{
tags:=provisionerdserver.Tags{
provisionerdserver.TagScope: provisionerdserver.ScopeOrganization,
})
if err != nil {
return nil, xerrors.Errorf("marshal tags: %w", err)
}

mux := drpcmux.New()
Expand All@@ -1098,14 +1101,14 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce ti
tags,
api.Database,
api.Pubsub,
api.Acquirer,
api.Telemetry,
tracer,
&api.QuotaCommitter,
&api.Auditor,
api.TemplateScheduleStore,
api.UserQuietHoursScheduleStore,
api.DeploymentValues,
debounce,
provisionerdserver.Options{
OIDCConfig: api.OIDCConfig,
GitAuthConfigs: api.GitAuthConfigs,
Expand Down
35 changes: 30 additions & 5 deletionscoderd/coderdtest/coderdtest.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -266,6 +266,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
lifecycleExecutor := autobuild.NewExecutor(
ctx,
options.Database,
options.Pubsub,
&templateScheduleStore,
slogtest.Make(t, nil).Named("autobuild.executor").Leveled(slog.LevelDebug),
options.AutobuildTicker,
Expand DownExpand Up@@ -453,6 +454,30 @@ func NewWithAPI(t testing.TB, options *Options) (*codersdk.Client, io.Closer, *c
return client, provisionerCloser, coderAPI
}

// provisionerdCloser wraps a provisioner daemon as an io.Closer that can be called multiple times
type provisionerdCloser struct {
mu sync.Mutex
closed bool
d *provisionerd.Server
}

func (c *provisionerdCloser) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return nil
}
c.closed = true
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
shutdownErr := c.d.Shutdown(ctx)
closeErr := c.d.Close()
if shutdownErr != nil {
return shutdownErr
}
return closeErr
}

// NewProvisionerDaemon launches a provisionerd instance configured to work
// well with coderd testing. It registers the "echo" provisioner for
// quick testing.
Expand DownExpand Up@@ -482,17 +507,17 @@ func NewProvisionerDaemon(t testing.TB, coderAPI *coderd.API) io.Closer {
assert.NoError(t, err)
}()

closer := provisionerd.New(func(ctx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient, error) {
return coderAPI.CreateInMemoryProvisionerDaemon(ctx, 0)
daemon := provisionerd.New(func(ctx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient, error) {
return coderAPI.CreateInMemoryProvisionerDaemon(ctx)
}, &provisionerd.Options{
Logger: coderAPI.Logger.Named("provisionerd").Leveled(slog.LevelDebug),
JobPollInterval: 50 * time.Millisecond,
UpdateInterval: 250 * time.Millisecond,
ForceCancelInterval: time.Second,
Connector: provisionerd.LocalProvisioners{
string(database.ProvisionerTypeEcho): sdkproto.NewDRPCProvisionerClient(echoClient),
},
})
closer := &provisionerdCloser{d: daemon}
t.Cleanup(func() {
_ = closer.Close()
})
Expand All@@ -518,21 +543,21 @@ func NewExternalProvisionerDaemon(t *testing.T, client *codersdk.Client, org uui
assert.NoError(t, err)
}()

closer := provisionerd.New(func(ctx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient, error) {
daemon := provisionerd.New(func(ctx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient, error) {
return client.ServeProvisionerDaemon(ctx, codersdk.ServeProvisionerDaemonRequest{
Organization: org,
Provisioners: []codersdk.ProvisionerType{codersdk.ProvisionerTypeEcho},
Tags: tags,
})
}, &provisionerd.Options{
Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug),
JobPollInterval: 50 * time.Millisecond,
UpdateInterval: 250 * time.Millisecond,
ForceCancelInterval: time.Second,
Connector: provisionerd.LocalProvisioners{
string(database.ProvisionerTypeEcho): sdkproto.NewDRPCProvisionerClient(echoClient),
},
})
closer := &provisionerdCloser{d: daemon}
t.Cleanup(func() {
_ = closer.Close()
})
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp