- Notifications
You must be signed in to change notification settings - Fork1k
feat(scaletest): add runner for thundering herd autostart#19998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Changes fromall commits
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
package autostart | ||
import ( | ||
"sync" | ||
"time" | ||
"golang.org/x/xerrors" | ||
"github.com/coder/coder/v2/codersdk" | ||
"github.com/coder/coder/v2/scaletest/createusers" | ||
"github.com/coder/coder/v2/scaletest/workspacebuild" | ||
) | ||
typeConfigstruct { | ||
// User is the configuration for the user to create. | ||
User createusers.Config`json:"user"` | ||
// Workspace is the configuration for the workspace to create. The workspace | ||
// will be built using the new user. | ||
// | ||
// OrganizationID is ignored and set to the new user's organization ID. | ||
Workspace workspacebuild.Config`json:"workspace"` | ||
// WorkspaceJobTimeout is how long to wait for any one workspace job | ||
// (start or stop) to complete. | ||
WorkspaceJobTimeout time.Duration`json:"workspace_job_timeout"` | ||
// AutostartDelay is how long after all the workspaces have been stopped | ||
// to schedule them to be started again. | ||
AutostartDelay time.Duration`json:"autostart_delay"` | ||
// AutostartTimeout is how long to wait for the autostart build to be | ||
// initiated after the scheduled time. | ||
AutostartTimeout time.Duration`json:"autostart_timeout"` | ||
Metrics*Metrics`json:"-"` | ||
// SetupBarrier is used to ensure all runners own stopped workspaces | ||
// before setting the autostart schedule on each. | ||
SetupBarrier*sync.WaitGroup`json:"-"` | ||
} | ||
func (cConfig)Validate()error { | ||
iferr:=c.User.Validate();err!=nil { | ||
returnxerrors.Errorf("user config: %w",err) | ||
} | ||
c.Workspace.OrganizationID=c.User.OrganizationID | ||
// This value will be overwritten during the test. | ||
c.Workspace.UserID=codersdk.Me | ||
spikecurtis marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
iferr:=c.Workspace.Validate();err!=nil { | ||
returnxerrors.Errorf("workspace config: %w",err) | ||
} | ||
ifc.SetupBarrier==nil { | ||
returnxerrors.New("setup barrier must be set") | ||
} | ||
ifc.WorkspaceJobTimeout<=0 { | ||
returnxerrors.New("workspace_job_timeout must be greater than 0") | ||
} | ||
ifc.AutostartDelay<time.Minute*2 { | ||
returnxerrors.New("autostart_delay must be at least 2 minutes") | ||
} | ||
ifc.AutostartTimeout<=0 { | ||
returnxerrors.New("autostart_timeout must be greater than 0") | ||
} | ||
ifc.Metrics==nil { | ||
returnxerrors.New("metrics must be set") | ||
} | ||
returnnil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
package autostart | ||
import ( | ||
"time" | ||
"github.com/prometheus/client_golang/prometheus" | ||
) | ||
type Metrics struct { | ||
AutostartJobCreationLatencySeconds prometheus.HistogramVec | ||
AutostartJobAcquiredLatencySeconds prometheus.HistogramVec | ||
AutostartTotalLatencySeconds prometheus.HistogramVec | ||
AutostartErrorsTotal prometheus.CounterVec | ||
} | ||
func NewMetrics(reg prometheus.Registerer) *Metrics { | ||
m := &Metrics{ | ||
AutostartJobCreationLatencySeconds: *prometheus.NewHistogramVec(prometheus.HistogramOpts{ | ||
Namespace: "coderd", | ||
Subsystem: "scaletest", | ||
Name: "autostart_job_creation_latency_seconds", | ||
Help: "Time from when the workspace is scheduled to be autostarted to when the autostart job has been created.", | ||
}, []string{"username", "workspace_name"}), | ||
AutostartJobAcquiredLatencySeconds: *prometheus.NewHistogramVec(prometheus.HistogramOpts{ | ||
Namespace: "coderd", | ||
Subsystem: "scaletest", | ||
Name: "autostart_job_acquired_latency_seconds", | ||
Help: "Time from when the workspace is scheduled to be autostarted to when the job has been acquired by a provisioner daemon.", | ||
}, []string{"username", "workspace_name"}), | ||
AutostartTotalLatencySeconds: *prometheus.NewHistogramVec(prometheus.HistogramOpts{ | ||
Namespace: "coderd", | ||
Subsystem: "scaletest", | ||
Name: "autostart_total_latency_seconds", | ||
Help: "Time from when the workspace is scheduled to be autostarted to when the autostart build has finished.", | ||
}, []string{"username", "workspace_name"}), | ||
AutostartErrorsTotal: *prometheus.NewCounterVec(prometheus.CounterOpts{ | ||
Namespace: "coderd", | ||
Subsystem: "scaletest", | ||
Name: "autostart_errors_total", | ||
Help: "Total number of autostart errors", | ||
}, []string{"username", "action"}), | ||
} | ||
reg.MustRegister(m.AutostartTotalLatencySeconds) | ||
reg.MustRegister(m.AutostartJobCreationLatencySeconds) | ||
reg.MustRegister(m.AutostartJobAcquiredLatencySeconds) | ||
reg.MustRegister(m.AutostartErrorsTotal) | ||
return m | ||
} | ||
func (m *Metrics) RecordCompletion(elapsed time.Duration, username string, workspace string) { | ||
m.AutostartTotalLatencySeconds.WithLabelValues(username, workspace).Observe(elapsed.Seconds()) | ||
} | ||
func (m *Metrics) RecordJobCreation(elapsed time.Duration, username string, workspace string) { | ||
m.AutostartJobCreationLatencySeconds.WithLabelValues(username, workspace).Observe(elapsed.Seconds()) | ||
} | ||
func (m *Metrics) RecordJobAcquired(elapsed time.Duration, username string, workspace string) { | ||
m.AutostartJobAcquiredLatencySeconds.WithLabelValues(username, workspace).Observe(elapsed.Seconds()) | ||
} | ||
func (m *Metrics) AddError(username string, action string) { | ||
m.AutostartErrorsTotal.WithLabelValues(username, action).Inc() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,245 @@ | ||
package autostart | ||
import ( | ||
"context" | ||
"fmt" | ||
"io" | ||
"time" | ||
"golang.org/x/xerrors" | ||
"cdr.dev/slog" | ||
"cdr.dev/slog/sloggers/sloghuman" | ||
"github.com/coder/coder/v2/coderd/tracing" | ||
"github.com/coder/coder/v2/codersdk" | ||
"github.com/coder/coder/v2/scaletest/createusers" | ||
"github.com/coder/coder/v2/scaletest/harness" | ||
"github.com/coder/coder/v2/scaletest/loadtestutil" | ||
"github.com/coder/coder/v2/scaletest/workspacebuild" | ||
) | ||
type Runner struct { | ||
client *codersdk.Client | ||
cfg Config | ||
createUserRunner *createusers.Runner | ||
workspacebuildRunner *workspacebuild.Runner | ||
autostartTotalLatency time.Duration | ||
autostartJobCreationLatency time.Duration | ||
autostartJobAcquiredLatency time.Duration | ||
} | ||
func NewRunner(client *codersdk.Client, cfg Config) *Runner { | ||
return &Runner{ | ||
client: client, | ||
cfg: cfg, | ||
} | ||
} | ||
var ( | ||
_ harness.Runnable = &Runner{} | ||
_ harness.Cleanable = &Runner{} | ||
_ harness.Collectable = &Runner{} | ||
) | ||
func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error { | ||
ctx, span := tracing.StartSpan(ctx) | ||
defer span.End() | ||
reachedBarrier := false | ||
defer func() { | ||
if !reachedBarrier { | ||
r.cfg.SetupBarrier.Done() | ||
} | ||
}() | ||
logs = loadtestutil.NewSyncWriter(logs) | ||
logger := slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug) | ||
r.client.SetLogger(logger) | ||
r.client.SetLogBodies(true) | ||
r.createUserRunner = createusers.NewRunner(r.client, r.cfg.User) | ||
newUserAndToken, err := r.createUserRunner.RunReturningUser(ctx, id, logs) | ||
if err != nil { | ||
r.cfg.Metrics.AddError("", "create_user") | ||
return xerrors.Errorf("create user: %w", err) | ||
} | ||
newUser := newUserAndToken.User | ||
newUserClient := codersdk.New(r.client.URL, | ||
codersdk.WithSessionToken(newUserAndToken.SessionToken), | ||
codersdk.WithLogger(logger), | ||
codersdk.WithLogBodies()) | ||
//nolint:gocritic // short log is fine | ||
logger.Info(ctx, "user created", slog.F("username", newUser.Username), slog.F("user_id", newUser.ID.String())) | ||
workspaceBuildConfig := r.cfg.Workspace | ||
workspaceBuildConfig.OrganizationID = r.cfg.User.OrganizationID | ||
workspaceBuildConfig.UserID = newUser.ID.String() | ||
// We'll wait for the build ourselves to avoid multiple API requests | ||
workspaceBuildConfig.NoWaitForBuild = true | ||
r.workspacebuildRunner = workspacebuild.NewRunner(newUserClient, workspaceBuildConfig) | ||
workspace, err := r.workspacebuildRunner.RunReturningWorkspace(ctx, id, logs) | ||
if err != nil { | ||
r.cfg.Metrics.AddError(newUser.Username, "create_workspace") | ||
return xerrors.Errorf("create workspace: %w", err) | ||
} | ||
watchCtx, cancel := context.WithCancel(ctx) | ||
defer cancel() | ||
workspaceUpdates, err := newUserClient.WatchWorkspace(watchCtx, workspace.ID) | ||
if err != nil { | ||
r.cfg.Metrics.AddError(newUser.Username, "watch_workspace") | ||
return xerrors.Errorf("watch workspace: %w", err) | ||
} | ||
createWorkspaceCtx, cancel2 := context.WithTimeout(ctx, r.cfg.WorkspaceJobTimeout) | ||
defer cancel2() | ||
err = waitForWorkspaceUpdate(createWorkspaceCtx, logger, workspaceUpdates, func(ws codersdk.Workspace) bool { | ||
return ws.LatestBuild.Transition == codersdk.WorkspaceTransitionStart && | ||
ws.LatestBuild.Job.Status == codersdk.ProvisionerJobSucceeded | ||
}) | ||
if err != nil { | ||
r.cfg.Metrics.AddError(newUser.Username, "wait_for_initial_build") | ||
return xerrors.Errorf("timeout waiting for initial workspace build to complete: %w", err) | ||
} | ||
logger.Info(ctx, "stopping workspace", slog.F("workspace_name", workspace.Name)) | ||
_, err = newUserClient.CreateWorkspaceBuild(ctx, workspace.ID, codersdk.CreateWorkspaceBuildRequest{ | ||
Transition: codersdk.WorkspaceTransitionStop, | ||
}) | ||
if err != nil { | ||
r.cfg.Metrics.AddError(newUser.Username, "create_stop_build") | ||
return xerrors.Errorf("create stop build: %w", err) | ||
} | ||
stopBuildCtx, cancel3 := context.WithTimeout(ctx, r.cfg.WorkspaceJobTimeout) | ||
defer cancel3() | ||
err = waitForWorkspaceUpdate(stopBuildCtx, logger, workspaceUpdates, func(ws codersdk.Workspace) bool { | ||
return ws.LatestBuild.Transition == codersdk.WorkspaceTransitionStop && | ||
ws.LatestBuild.Job.Status == codersdk.ProvisionerJobSucceeded | ||
}) | ||
if err != nil { | ||
r.cfg.Metrics.AddError(newUser.Username, "wait_for_stop_build") | ||
return xerrors.Errorf("timeout waiting for stop build to complete: %w", err) | ||
} | ||
logger.Info(ctx, "workspace stopped successfully", slog.F("workspace_name", workspace.Name)) | ||
logger.Info(ctx, "waiting for all runners to reach barrier") | ||
reachedBarrier = true | ||
r.cfg.SetupBarrier.Done() | ||
ethanndickson marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
r.cfg.SetupBarrier.Wait() | ||
logger.Info(ctx, "all runners reached barrier, proceeding with autostart schedule") | ||
testStartTime := time.Now().UTC() | ||
autostartTime := testStartTime.Add(r.cfg.AutostartDelay).Round(time.Minute) | ||
schedule := fmt.Sprintf("CRON_TZ=UTC %d %d * * *", autostartTime.Minute(), autostartTime.Hour()) | ||
logger.Info(ctx, "setting autostart schedule for workspace", slog.F("workspace_name", workspace.Name), slog.F("schedule", schedule)) | ||
err = newUserClient.UpdateWorkspaceAutostart(ctx, workspace.ID, codersdk.UpdateWorkspaceAutostartRequest{ | ||
Schedule: &schedule, | ||
}) | ||
if err != nil { | ||
r.cfg.Metrics.AddError(newUser.Username, "update_workspace_autostart") | ||
return xerrors.Errorf("update workspace autostart: %w", err) | ||
} | ||
logger.Info(ctx, "waiting for workspace to autostart", slog.F("workspace_name", workspace.Name)) | ||
autostartInitiateCtx, cancel4 := context.WithDeadline(ctx, autostartTime.Add(r.cfg.AutostartDelay)) | ||
defer cancel4() | ||
logger.Info(ctx, "listening for workspace updates to detect autostart build") | ||
err = waitForWorkspaceUpdate(autostartInitiateCtx, logger, workspaceUpdates, func(ws codersdk.Workspace) bool { | ||
if ws.LatestBuild.Transition != codersdk.WorkspaceTransitionStart { | ||
return false | ||
} | ||
// The job has been created, but it might be pending | ||
if r.autostartJobCreationLatency == 0 { | ||
r.autostartJobCreationLatency = time.Since(autostartTime) | ||
ethanndickson marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
r.cfg.Metrics.RecordJobCreation(r.autostartJobCreationLatency, newUser.Username, workspace.Name) | ||
} | ||
if ws.LatestBuild.Job.Status == codersdk.ProvisionerJobRunning || | ||
ws.LatestBuild.Job.Status == codersdk.ProvisionerJobSucceeded { | ||
// Job is no longer pending, but it might not have finished | ||
if r.autostartJobAcquiredLatency == 0 { | ||
r.autostartJobAcquiredLatency = time.Since(autostartTime) | ||
ethanndickson marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
r.cfg.Metrics.RecordJobAcquired(r.autostartJobAcquiredLatency, newUser.Username, workspace.Name) | ||
} | ||
ethanndickson marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
return ws.LatestBuild.Job.Status == codersdk.ProvisionerJobSucceeded | ||
} | ||
return false | ||
}) | ||
if err != nil { | ||
r.cfg.Metrics.AddError(newUser.Username, "wait_for_autostart_build") | ||
return xerrors.Errorf("timeout waiting for autostart build to be created: %w", err) | ||
} | ||
r.autostartTotalLatency = time.Since(autostartTime) | ||
logger.Info(ctx, "autostart workspace build complete", slog.F("duration", r.autostartTotalLatency)) | ||
r.cfg.Metrics.RecordCompletion(r.autostartTotalLatency, newUser.Username, workspace.Name) | ||
return nil | ||
} | ||
func waitForWorkspaceUpdate(ctx context.Context, logger slog.Logger, updates <-chan codersdk.Workspace, shouldBreak func(codersdk.Workspace) bool) error { | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
case updatedWorkspace, ok := <-updates: | ||
if !ok { | ||
return xerrors.New("workspace updates channel closed") | ||
} | ||
logger.Debug(ctx, "received workspace update", slog.F("update", updatedWorkspace)) | ||
if shouldBreak(updatedWorkspace) { | ||
return nil | ||
} | ||
} | ||
} | ||
} | ||
func (r *Runner) Cleanup(ctx context.Context, id string, logs io.Writer) error { | ||
if r.workspacebuildRunner != nil { | ||
_, _ = fmt.Fprintln(logs, "Cleaning up workspace...") | ||
if err := r.workspacebuildRunner.Cleanup(ctx, id, logs); err != nil { | ||
return xerrors.Errorf("cleanup workspace: %w", err) | ||
} | ||
} | ||
if r.createUserRunner != nil { | ||
_, _ = fmt.Fprintln(logs, "Cleaning up user...") | ||
if err := r.createUserRunner.Cleanup(ctx, id, logs); err != nil { | ||
return xerrors.Errorf("cleanup user: %w", err) | ||
} | ||
} | ||
return nil | ||
} | ||
const ( | ||
AutostartTotalLatencyMetric = "autostart_total_latency_seconds" | ||
AutostartJobCreationLatencyMetric = "autostart_job_creation_latency_seconds" | ||
AutostartJobAcquiredLatencyMetric = "autostart_job_acquired_latency_seconds" | ||
) | ||
func (r *Runner) GetMetrics() map[string]any { | ||
return map[string]any{ | ||
AutostartTotalLatencyMetric: r.autostartTotalLatency.Seconds(), | ||
AutostartJobCreationLatencyMetric: r.autostartJobCreationLatency.Seconds(), | ||
AutostartJobAcquiredLatencyMetric: r.autostartJobAcquiredLatency.Seconds(), | ||
} | ||
} |
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.