Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat(scaletest): add runner for coder connect load gen#19904

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
ethanndickson merged 6 commits intomainfromethan/coder-connect-load-gen
Sep 30, 2025
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletionsscaletest/workspaceupdates/config.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
package workspaceupdates

import (
"sync"
"time"

"golang.org/x/xerrors"

"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/scaletest/createusers"
"github.com/coder/coder/v2/scaletest/workspacebuild"
)

type Config struct {
// User is the configuration for the user to create.
User createusers.Config `json:"user"`

// Workspace is the configuration for the workspace to create. The workspace
// will be built using the new user.
//
// OrganizationID is ignored and set to the new user's organization ID.
Workspace workspacebuild.Config `json:"workspace"`

// WorkspaceCount is the number of workspaces to create.
WorkspaceCount int64 `json:"power_user_workspaces"`

// WorkspaceUpdatesTimeout is how long to wait for all expected workspace updates.
WorkspaceUpdatesTimeout time.Duration `json:"workspace_updates_timeout"`

// DialTimeout is how long to wait to successfully dial the Coder Connect
// endpoint.
DialTimeout time.Duration `json:"dial_timeout"`

Metrics *Metrics `json:"-"`

// DialBarrier is used to ensure all runners have dialed the Coder Connect
// endpoint before creating their workspace(s).
DialBarrier *sync.WaitGroup `json:"-"`
}

func (c Config) Validate() error {
if err := c.User.Validate(); err != nil {
return xerrors.Errorf("user config: %w", err)
}
c.Workspace.OrganizationID = c.User.OrganizationID
// This value will be overwritten during the test.
c.Workspace.UserID = codersdk.Me
if err := c.Workspace.Validate(); err != nil {
return xerrors.Errorf("workspace config: %w", err)
}

if c.Workspace.Request.Name != "" {
return xerrors.New("workspace name cannot be overridden")
}

if c.WorkspaceCount <= 0 {
return xerrors.New("workspace_count must be greater than 0")
}

if c.DialBarrier == nil {
return xerrors.New("dial barrier must be set")
}

if c.WorkspaceUpdatesTimeout <= 0 {
return xerrors.New("workspace_updates_timeout must be greater than 0")
}

if c.DialTimeout <= 0 {
return xerrors.New("dial_timeout must be greater than 0")
}

if c.Metrics == nil {
return xerrors.New("metrics must be set")
}

return nil
}
42 changes: 42 additions & 0 deletionsscaletest/workspaceupdates/metrics.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
package workspaceupdates

import (
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
)

type Metrics struct {
WorkspaceUpdatesLatencySeconds prometheus.HistogramVec
WorkspaceUpdatesErrorsTotal prometheus.CounterVec
}

func NewMetrics(reg prometheus.Registerer) *Metrics {
m := &Metrics{
WorkspaceUpdatesLatencySeconds: *prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "coderd",
Subsystem: "scaletest",
Name: "workspace_updates_latency_seconds",
Help: "Time between starting a workspace build and receiving both the agent update and workspace update",
}, []string{"username", "num_owned_workspaces", "workspace_name"}),
WorkspaceUpdatesErrorsTotal: *prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "coderd",
Subsystem: "scaletest",
Name: "workspace_updates_errors_total",
Help: "Total number of workspace updates errors",
}, []string{"username", "num_owned_workspaces", "action"}),
}

reg.MustRegister(m.WorkspaceUpdatesLatencySeconds)
reg.MustRegister(m.WorkspaceUpdatesErrorsTotal)
return m
}

func (m *Metrics) RecordCompletion(elapsed time.Duration, username string, ownedWorkspaces int64, workspace string) {
m.WorkspaceUpdatesLatencySeconds.WithLabelValues(username, strconv.Itoa(int(ownedWorkspaces)), workspace).Observe(elapsed.Seconds())
}

func (m *Metrics) AddError(username string, ownedWorkspaces int64, action string) {
m.WorkspaceUpdatesErrorsTotal.WithLabelValues(username, strconv.Itoa(int(ownedWorkspaces)), action).Inc()
}
271 changes: 271 additions & 0 deletionsscaletest/workspaceupdates/run.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
package workspaceupdates

import (
"context"
"fmt"
"io"
"net/http"
"time"

"golang.org/x/xerrors"

"github.com/coder/websocket"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/sloghuman"
"github.com/coder/coder/v2/coderd/tracing"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/workspacesdk"
"github.com/coder/coder/v2/scaletest/createusers"
"github.com/coder/coder/v2/scaletest/harness"
"github.com/coder/coder/v2/scaletest/loadtestutil"
"github.com/coder/coder/v2/scaletest/workspacebuild"
"github.com/coder/coder/v2/tailnet"
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
)

type Runner struct {
client *codersdk.Client
cfg Config

createUserRunner *createusers.Runner
workspacebuildRunners []*workspacebuild.Runner

// workspace name to workspace
workspaces map[string]*workspace
}

type workspace struct {
buildStartTime time.Time
updateLatency time.Duration
}

var (
_ harness.Runnable = &Runner{}
_ harness.Cleanable = &Runner{}
_ harness.Collectable = &Runner{}
)

func NewRunner(client *codersdk.Client, cfg Config) *Runner {
return &Runner{
client: client,
cfg: cfg,
workspaces: make(map[string]*workspace),
}
}

func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error {
ctx, span := tracing.StartSpan(ctx)
defer span.End()

reachedBarrier := false
defer func() {
if !reachedBarrier {
r.cfg.DialBarrier.Done()
}
}()

logs = loadtestutil.NewSyncWriter(logs)
logger := slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug)
r.client.SetLogger(logger)
r.client.SetLogBodies(true)

r.createUserRunner = createusers.NewRunner(r.client, r.cfg.User)
newUserAndToken, err := r.createUserRunner.RunReturningUser(ctx, id, logs)
if err != nil {
return xerrors.Errorf("create user: %w", err)
}
newUser := newUserAndToken.User
newUserClient := codersdk.New(r.client.URL,
codersdk.WithSessionToken(newUserAndToken.SessionToken),
codersdk.WithLogger(logger),
codersdk.WithLogBodies())

logger.Info(ctx, fmt.Sprintf("user %q created", newUser.Username), slog.F("id", newUser.ID.String()))

dialCtx, cancel := context.WithTimeout(ctx, r.cfg.DialTimeout)
defer cancel()

logger.Info(ctx, "connecting to workspace updates stream")
clients, err := r.dialTailnet(dialCtx, newUserClient, newUser, logger)
if err != nil {
return xerrors.Errorf("tailnet dial failed: %w", err)
}
defer clients.Closer.Close()
logger.Info(ctx, "connected to workspace updates stream")

watchCtx, cancelWatch := context.WithCancel(ctx)
defer cancelWatch()

completionCh := make(chan error, 1)
go func() {
completionCh <- r.watchWorkspaceUpdates(watchCtx, clients, newUser, logger)
}()

reachedBarrier = true
r.cfg.DialBarrier.Done()
r.cfg.DialBarrier.Wait()

r.workspacebuildRunners = make([]*workspacebuild.Runner, 0, r.cfg.WorkspaceCount)
for i := range r.cfg.WorkspaceCount {
workspaceName, err := loadtestutil.GenerateWorkspaceName(id)
if err != nil {
return xerrors.Errorf("generate random name for workspace: %w", err)
}
workspaceBuildConfig := r.cfg.Workspace
workspaceBuildConfig.OrganizationID = r.cfg.User.OrganizationID
workspaceBuildConfig.UserID = newUser.ID.String()
workspaceBuildConfig.Request.Name = workspaceName

runner := workspacebuild.NewRunner(newUserClient, workspaceBuildConfig)
r.workspacebuildRunners = append(r.workspacebuildRunners, runner)

logger.Info(ctx, fmt.Sprintf("creating workspace %d/%d", i+1, r.cfg.WorkspaceCount))

// Record build start time before running the workspace build
r.workspaces[workspaceName] = &workspace{
buildStartTime: time.Now(),
}
err = runner.Run(ctx, fmt.Sprintf("%s-%d", id, i), logs)
if err != nil {
return xerrors.Errorf("create workspace %d: %w", i, err)
}
}

logger.Info(ctx, fmt.Sprintf("waiting up to %v for workspace updates to complete...", r.cfg.WorkspaceUpdatesTimeout))

waitUpdatesCtx, cancel := context.WithTimeout(ctx, r.cfg.WorkspaceUpdatesTimeout)
defer cancel()

select {
case err := <-completionCh:
if err != nil {
return xerrors.Errorf("workspace updates streaming failed: %w", err)
}
logger.Info(ctx, "workspace updates streaming completed successfully")
return nil
case <-waitUpdatesCtx.Done():
cancelWatch()
clients.Closer.Close()
<-completionCh // ensure watch goroutine exits
if waitUpdatesCtx.Err() == context.DeadlineExceeded {
return xerrors.Errorf("timeout waiting for workspace updates after %v", r.cfg.WorkspaceUpdatesTimeout)
}
return waitUpdatesCtx.Err()
}
}

func (r *Runner) dialTailnet(ctx context.Context, client *codersdk.Client, user codersdk.User, logger slog.Logger) (*tailnet.ControlProtocolClients, error) {
u, err := client.URL.Parse("/api/v2/tailnet")
if err != nil {
logger.Error(ctx, "failed to parse tailnet URL", slog.Error(err))
r.cfg.Metrics.AddError(user.Username, r.cfg.WorkspaceCount, "parse_url")
return nil, xerrors.Errorf("parse tailnet URL: %w", err)
}

dialer := workspacesdk.NewWebsocketDialer(
logger,
u,
&websocket.DialOptions{
HTTPHeader: http.Header{
"Coder-Session-Token": []string{client.SessionToken()},
},
},
workspacesdk.WithWorkspaceUpdates(&tailnetproto.WorkspaceUpdatesRequest{
WorkspaceOwnerId: tailnet.UUIDToByteSlice(user.ID),
}),
)

clients, err := dialer.Dial(ctx, nil)
if err != nil {
logger.Error(ctx, "failed to dial workspace updates", slog.Error(err))
r.cfg.Metrics.AddError(user.Username, r.cfg.WorkspaceCount, "dial")
return nil, xerrors.Errorf("dial workspace updates: %w", err)
}

return &clients, nil
}

// watchWorkspaceUpdates processes workspace updates and returns error or nil
// once all expected workspaces and agents are seen.
func (r *Runner) watchWorkspaceUpdates(ctx context.Context, clients *tailnet.ControlProtocolClients, user codersdk.User, logger slog.Logger) error {
expectedWorkspaces := r.cfg.WorkspaceCount
// workspace name to time the update was seen
seenWorkspaces := make(map[string]time.Time)

logger.Info(ctx, fmt.Sprintf("waiting for %d workspaces and their agents", expectedWorkspaces))
for {
select {
case <-ctx.Done():
logger.Error(ctx, "context canceled while waiting for workspace updates", slog.Error(ctx.Err()))
r.cfg.Metrics.AddError(user.Username, r.cfg.WorkspaceCount, "context_done")
return ctx.Err()
default:
}

update, err := clients.WorkspaceUpdates.Recv()
if err != nil {
logger.Error(ctx, "workspace updates stream error", slog.Error(err))
r.cfg.Metrics.AddError(user.Username, r.cfg.WorkspaceCount, "recv")
return xerrors.Errorf("receive workspace update: %w", err)
}
recvTime := time.Now()

for _, ws := range update.UpsertedWorkspaces {
seenWorkspaces[ws.Name] = recvTime
}

if len(seenWorkspaces) == int(expectedWorkspaces) {
for wsName, seenTime := range seenWorkspaces {
// We only receive workspace updates for those that we built.
// If we received a workspace update for a workspace we didn't build,
// we're risking racing with the code that writes workspace
// build start times to this map.
ws, ok := r.workspaces[wsName]
if !ok {
logger.Error(ctx, "received update for unexpected workspace", slog.F("workspace", wsName), slog.F("seen_workspaces", seenWorkspaces))
r.cfg.Metrics.AddError(user.Username, r.cfg.WorkspaceCount, "unexpected_workspace")
return xerrors.Errorf("received update for unexpected workspace %q", wsName)
}
ws.updateLatency = seenTime.Sub(ws.buildStartTime)
r.cfg.Metrics.RecordCompletion(ws.updateLatency, user.Username, r.cfg.WorkspaceCount, wsName)
}
logger.Info(ctx, fmt.Sprintf("updates received for all %d workspaces and agents", expectedWorkspaces))
return nil
}
}
}

const (
WorkspaceUpdatesLatencyMetric = "workspace_updates_latency_seconds"
)

func (r *Runner) GetMetrics() map[string]any {
latencyMap := make(map[string]float64)
for wsName, ws := range r.workspaces {
latencyMap[wsName] = ws.updateLatency.Seconds()
}
return map[string]any{
WorkspaceUpdatesLatencyMetric: latencyMap,
}
}

func (r *Runner) Cleanup(ctx context.Context, id string, logs io.Writer) error {
for i, runner := range r.workspacebuildRunners {
if runner != nil {
_, _ = fmt.Fprintf(logs, "Cleaning up workspace %d/%d...\n", i+1, len(r.workspacebuildRunners))
if err := runner.Cleanup(ctx, fmt.Sprintf("%s-%d", id, i), logs); err != nil {
return xerrors.Errorf("cleanup workspace %d: %w", i, err)
}
}
}

if r.createUserRunner != nil {
_, _ = fmt.Fprintln(logs, "Cleaning up user...")
if err := r.createUserRunner.Cleanup(ctx, id, logs); err != nil {
return xerrors.Errorf("cleanup user: %w", err)
}
}

return nil
}
Loading
Loading

[8]ページ先頭

©2009-2025 Movatter.jp