- Notifications
You must be signed in to change notification settings - Fork1k
feat(scaletest): add runner for coder connect load gen#19904
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
3e4c43f
58d85d6
0dda84b
6c496c8
d904e6f
8c9747d
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
package workspaceupdates | ||
import ( | ||
"sync" | ||
"time" | ||
"golang.org/x/xerrors" | ||
"github.com/coder/coder/v2/codersdk" | ||
"github.com/coder/coder/v2/scaletest/createusers" | ||
"github.com/coder/coder/v2/scaletest/workspacebuild" | ||
) | ||
type Config struct { | ||
// User is the configuration for the user to create. | ||
User createusers.Config `json:"user"` | ||
// Workspace is the configuration for the workspace to create. The workspace | ||
// will be built using the new user. | ||
// | ||
// OrganizationID is ignored and set to the new user's organization ID. | ||
Workspace workspacebuild.Config `json:"workspace"` | ||
// WorkspaceCount is the number of workspaces to create. | ||
WorkspaceCount int64 `json:"power_user_workspaces"` | ||
// WorkspaceUpdatesTimeout is how long to wait for all expected workspace updates. | ||
WorkspaceUpdatesTimeout time.Duration `json:"workspace_updates_timeout"` | ||
// DialTimeout is how long to wait to successfully dial the Coder Connect | ||
// endpoint. | ||
DialTimeout time.Duration `json:"dial_timeout"` | ||
Metrics *Metrics `json:"-"` | ||
// DialBarrier is used to ensure all runners have dialed the Coder Connect | ||
// endpoint before creating their workspace(s). | ||
DialBarrier *sync.WaitGroup `json:"-"` | ||
} | ||
func (c Config) Validate() error { | ||
if err := c.User.Validate(); err != nil { | ||
return xerrors.Errorf("user config: %w", err) | ||
} | ||
c.Workspace.OrganizationID = c.User.OrganizationID | ||
// This value will be overwritten during the test. | ||
c.Workspace.UserID = codersdk.Me | ||
if err := c.Workspace.Validate(); err != nil { | ||
return xerrors.Errorf("workspace config: %w", err) | ||
} | ||
if c.Workspace.Request.Name != "" { | ||
return xerrors.New("workspace name cannot be overridden") | ||
} | ||
if c.WorkspaceCount <= 0 { | ||
return xerrors.New("workspace_count must be greater than 0") | ||
} | ||
if c.DialBarrier == nil { | ||
return xerrors.New("dial barrier must be set") | ||
} | ||
if c.WorkspaceUpdatesTimeout <= 0 { | ||
return xerrors.New("workspace_updates_timeout must be greater than 0") | ||
} | ||
if c.DialTimeout <= 0 { | ||
return xerrors.New("dial_timeout must be greater than 0") | ||
} | ||
if c.Metrics == nil { | ||
return xerrors.New("metrics must be set") | ||
} | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
package workspaceupdates | ||
import ( | ||
"strconv" | ||
"time" | ||
"github.com/prometheus/client_golang/prometheus" | ||
) | ||
type Metrics struct { | ||
WorkspaceUpdatesLatencySeconds prometheus.HistogramVec | ||
WorkspaceUpdatesErrorsTotal prometheus.CounterVec | ||
} | ||
func NewMetrics(reg prometheus.Registerer) *Metrics { | ||
m := &Metrics{ | ||
WorkspaceUpdatesLatencySeconds: *prometheus.NewHistogramVec(prometheus.HistogramOpts{ | ||
Namespace: "coderd", | ||
Subsystem: "scaletest", | ||
Name: "workspace_updates_latency_seconds", | ||
ethanndickson marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
Help: "Time between starting a workspace build and receiving both the agent update and workspace update", | ||
}, []string{"username", "num_owned_workspaces", "workspace_name"}), | ||
WorkspaceUpdatesErrorsTotal: *prometheus.NewCounterVec(prometheus.CounterOpts{ | ||
Namespace: "coderd", | ||
Subsystem: "scaletest", | ||
Name: "workspace_updates_errors_total", | ||
Help: "Total number of workspace updates errors", | ||
}, []string{"username", "num_owned_workspaces", "action"}), | ||
} | ||
reg.MustRegister(m.WorkspaceUpdatesLatencySeconds) | ||
reg.MustRegister(m.WorkspaceUpdatesErrorsTotal) | ||
return m | ||
} | ||
func (m *Metrics) RecordCompletion(elapsed time.Duration, username string, ownedWorkspaces int64, workspace string) { | ||
m.WorkspaceUpdatesLatencySeconds.WithLabelValues(username, strconv.Itoa(int(ownedWorkspaces)), workspace).Observe(elapsed.Seconds()) | ||
} | ||
func (m *Metrics) AddError(username string, ownedWorkspaces int64, action string) { | ||
m.WorkspaceUpdatesErrorsTotal.WithLabelValues(username, strconv.Itoa(int(ownedWorkspaces)), action).Inc() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,271 @@ | ||
package workspaceupdates | ||
import ( | ||
"context" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"time" | ||
"golang.org/x/xerrors" | ||
"github.com/coder/websocket" | ||
"cdr.dev/slog" | ||
"cdr.dev/slog/sloggers/sloghuman" | ||
"github.com/coder/coder/v2/coderd/tracing" | ||
"github.com/coder/coder/v2/codersdk" | ||
"github.com/coder/coder/v2/codersdk/workspacesdk" | ||
"github.com/coder/coder/v2/scaletest/createusers" | ||
"github.com/coder/coder/v2/scaletest/harness" | ||
"github.com/coder/coder/v2/scaletest/loadtestutil" | ||
"github.com/coder/coder/v2/scaletest/workspacebuild" | ||
"github.com/coder/coder/v2/tailnet" | ||
tailnetproto "github.com/coder/coder/v2/tailnet/proto" | ||
) | ||
type Runner struct { | ||
client *codersdk.Client | ||
cfg Config | ||
createUserRunner *createusers.Runner | ||
workspacebuildRunners []*workspacebuild.Runner | ||
// workspace name to workspace | ||
workspaces map[string]*workspace | ||
} | ||
type workspace struct { | ||
buildStartTime time.Time | ||
updateLatency time.Duration | ||
} | ||
var ( | ||
_ harness.Runnable = &Runner{} | ||
_ harness.Cleanable = &Runner{} | ||
_ harness.Collectable = &Runner{} | ||
) | ||
func NewRunner(client *codersdk.Client, cfg Config) *Runner { | ||
return &Runner{ | ||
client: client, | ||
cfg: cfg, | ||
workspaces: make(map[string]*workspace), | ||
} | ||
} | ||
func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error { | ||
ctx, span := tracing.StartSpan(ctx) | ||
defer span.End() | ||
reachedBarrier := false | ||
defer func() { | ||
if !reachedBarrier { | ||
r.cfg.DialBarrier.Done() | ||
} | ||
}() | ||
logs = loadtestutil.NewSyncWriter(logs) | ||
logger := slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug) | ||
r.client.SetLogger(logger) | ||
r.client.SetLogBodies(true) | ||
r.createUserRunner = createusers.NewRunner(r.client, r.cfg.User) | ||
newUserAndToken, err := r.createUserRunner.RunReturningUser(ctx, id, logs) | ||
if err != nil { | ||
return xerrors.Errorf("create user: %w", err) | ||
} | ||
newUser := newUserAndToken.User | ||
newUserClient := codersdk.New(r.client.URL, | ||
codersdk.WithSessionToken(newUserAndToken.SessionToken), | ||
codersdk.WithLogger(logger), | ||
codersdk.WithLogBodies()) | ||
logger.Info(ctx, fmt.Sprintf("user %q created", newUser.Username), slog.F("id", newUser.ID.String())) | ||
dialCtx, cancel := context.WithTimeout(ctx, r.cfg.DialTimeout) | ||
defer cancel() | ||
logger.Info(ctx, "connecting to workspace updates stream") | ||
clients, err := r.dialTailnet(dialCtx, newUserClient, newUser, logger) | ||
if err != nil { | ||
return xerrors.Errorf("tailnet dial failed: %w", err) | ||
} | ||
defer clients.Closer.Close() | ||
logger.Info(ctx, "connected to workspace updates stream") | ||
watchCtx, cancelWatch := context.WithCancel(ctx) | ||
defer cancelWatch() | ||
completionCh := make(chan error, 1) | ||
go func() { | ||
completionCh <- r.watchWorkspaceUpdates(watchCtx, clients, newUser, logger) | ||
}() | ||
reachedBarrier = true | ||
r.cfg.DialBarrier.Done() | ||
r.cfg.DialBarrier.Wait() | ||
r.workspacebuildRunners = make([]*workspacebuild.Runner, 0, r.cfg.WorkspaceCount) | ||
for i := range r.cfg.WorkspaceCount { | ||
workspaceName, err := loadtestutil.GenerateWorkspaceName(id) | ||
if err != nil { | ||
return xerrors.Errorf("generate random name for workspace: %w", err) | ||
} | ||
workspaceBuildConfig := r.cfg.Workspace | ||
workspaceBuildConfig.OrganizationID = r.cfg.User.OrganizationID | ||
workspaceBuildConfig.UserID = newUser.ID.String() | ||
workspaceBuildConfig.Request.Name = workspaceName | ||
runner := workspacebuild.NewRunner(newUserClient, workspaceBuildConfig) | ||
r.workspacebuildRunners = append(r.workspacebuildRunners, runner) | ||
logger.Info(ctx, fmt.Sprintf("creating workspace %d/%d", i+1, r.cfg.WorkspaceCount)) | ||
// Record build start time before running the workspace build | ||
r.workspaces[workspaceName] = &workspace{ | ||
buildStartTime: time.Now(), | ||
} | ||
err = runner.Run(ctx, fmt.Sprintf("%s-%d", id, i), logs) | ||
if err != nil { | ||
return xerrors.Errorf("create workspace %d: %w", i, err) | ||
} | ||
} | ||
logger.Info(ctx, fmt.Sprintf("waiting up to %v for workspace updates to complete...", r.cfg.WorkspaceUpdatesTimeout)) | ||
waitUpdatesCtx, cancel := context.WithTimeout(ctx, r.cfg.WorkspaceUpdatesTimeout) | ||
defer cancel() | ||
select { | ||
case err := <-completionCh: | ||
if err != nil { | ||
return xerrors.Errorf("workspace updates streaming failed: %w", err) | ||
ethanndickson marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
} | ||
logger.Info(ctx, "workspace updates streaming completed successfully") | ||
return nil | ||
case <-waitUpdatesCtx.Done(): | ||
cancelWatch() | ||
clients.Closer.Close() | ||
<-completionCh // ensure watch goroutine exits | ||
if waitUpdatesCtx.Err() == context.DeadlineExceeded { | ||
return xerrors.Errorf("timeout waiting for workspace updates after %v", r.cfg.WorkspaceUpdatesTimeout) | ||
} | ||
return waitUpdatesCtx.Err() | ||
} | ||
} | ||
func (r *Runner) dialTailnet(ctx context.Context, client *codersdk.Client, user codersdk.User, logger slog.Logger) (*tailnet.ControlProtocolClients, error) { | ||
u, err := client.URL.Parse("/api/v2/tailnet") | ||
if err != nil { | ||
logger.Error(ctx, "failed to parse tailnet URL", slog.Error(err)) | ||
r.cfg.Metrics.AddError(user.Username, r.cfg.WorkspaceCount, "parse_url") | ||
return nil, xerrors.Errorf("parse tailnet URL: %w", err) | ||
} | ||
dialer := workspacesdk.NewWebsocketDialer( | ||
logger, | ||
u, | ||
&websocket.DialOptions{ | ||
HTTPHeader: http.Header{ | ||
"Coder-Session-Token": []string{client.SessionToken()}, | ||
}, | ||
}, | ||
workspacesdk.WithWorkspaceUpdates(&tailnetproto.WorkspaceUpdatesRequest{ | ||
WorkspaceOwnerId: tailnet.UUIDToByteSlice(user.ID), | ||
}), | ||
) | ||
clients, err := dialer.Dial(ctx, nil) | ||
if err != nil { | ||
logger.Error(ctx, "failed to dial workspace updates", slog.Error(err)) | ||
r.cfg.Metrics.AddError(user.Username, r.cfg.WorkspaceCount, "dial") | ||
return nil, xerrors.Errorf("dial workspace updates: %w", err) | ||
} | ||
return &clients, nil | ||
} | ||
// watchWorkspaceUpdates processes workspace updates and returns error or nil | ||
// once all expected workspaces and agents are seen. | ||
func (r *Runner) watchWorkspaceUpdates(ctx context.Context, clients *tailnet.ControlProtocolClients, user codersdk.User, logger slog.Logger) error { | ||
expectedWorkspaces := r.cfg.WorkspaceCount | ||
// workspace name to time the update was seen | ||
seenWorkspaces := make(map[string]time.Time) | ||
logger.Info(ctx, fmt.Sprintf("waiting for %d workspaces and their agents", expectedWorkspaces)) | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
logger.Error(ctx, "context canceled while waiting for workspace updates", slog.Error(ctx.Err())) | ||
r.cfg.Metrics.AddError(user.Username, r.cfg.WorkspaceCount, "context_done") | ||
return ctx.Err() | ||
default: | ||
} | ||
update, err := clients.WorkspaceUpdates.Recv() | ||
if err != nil { | ||
logger.Error(ctx, "workspace updates stream error", slog.Error(err)) | ||
r.cfg.Metrics.AddError(user.Username, r.cfg.WorkspaceCount, "recv") | ||
return xerrors.Errorf("receive workspace update: %w", err) | ||
} | ||
recvTime := time.Now() | ||
for _, ws := range update.UpsertedWorkspaces { | ||
seenWorkspaces[ws.Name] = recvTime | ||
} | ||
if len(seenWorkspaces) == int(expectedWorkspaces) { | ||
for wsName, seenTime := range seenWorkspaces { | ||
// We only receive workspace updates for those that we built. | ||
// If we received a workspace update for a workspace we didn't build, | ||
// we're risking racing with the code that writes workspace | ||
// build start times to this map. | ||
ws, ok := r.workspaces[wsName] | ||
ethanndickson marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
if !ok { | ||
logger.Error(ctx, "received update for unexpected workspace", slog.F("workspace", wsName), slog.F("seen_workspaces", seenWorkspaces)) | ||
r.cfg.Metrics.AddError(user.Username, r.cfg.WorkspaceCount, "unexpected_workspace") | ||
return xerrors.Errorf("received update for unexpected workspace %q", wsName) | ||
} | ||
ws.updateLatency = seenTime.Sub(ws.buildStartTime) | ||
r.cfg.Metrics.RecordCompletion(ws.updateLatency, user.Username, r.cfg.WorkspaceCount, wsName) | ||
} | ||
logger.Info(ctx, fmt.Sprintf("updates received for all %d workspaces and agents", expectedWorkspaces)) | ||
return nil | ||
} | ||
} | ||
} | ||
const ( | ||
WorkspaceUpdatesLatencyMetric = "workspace_updates_latency_seconds" | ||
) | ||
func (r *Runner) GetMetrics() map[string]any { | ||
latencyMap := make(map[string]float64) | ||
for wsName, ws := range r.workspaces { | ||
latencyMap[wsName] = ws.updateLatency.Seconds() | ||
} | ||
return map[string]any{ | ||
WorkspaceUpdatesLatencyMetric: latencyMap, | ||
} | ||
} | ||
func (r *Runner) Cleanup(ctx context.Context, id string, logs io.Writer) error { | ||
for i, runner := range r.workspacebuildRunners { | ||
if runner != nil { | ||
_, _ = fmt.Fprintf(logs, "Cleaning up workspace %d/%d...\n", i+1, len(r.workspacebuildRunners)) | ||
if err := runner.Cleanup(ctx, fmt.Sprintf("%s-%d", id, i), logs); err != nil { | ||
return xerrors.Errorf("cleanup workspace %d: %w", i, err) | ||
} | ||
} | ||
} | ||
if r.createUserRunner != nil { | ||
_, _ = fmt.Fprintln(logs, "Cleaning up user...") | ||
if err := r.createUserRunner.Cleanup(ctx, id, logs); err != nil { | ||
return xerrors.Errorf("cleanup user: %w", err) | ||
} | ||
} | ||
return nil | ||
} |
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.