Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit65ac6cb

Browse files
feat(scaletest): add runner for coder connect load gen (#19904)
Relates tocoder/internal#889.This PR adds a scaletest runner that simulates a single Coder Connect client receiving workspace updates.An instance of a workspace updates runner does the following:- Creates a user, if a session token is not supplied.- Attempts to repeatedly dial the Coder Connect endpoint, with a configurable (two minutes by default) timeout. - Once dialed successfully, waits for any other concurrently executing runners to also dial successfully, or timeout (using the barrier).- Starts a configurable number of workspace builds.- Waits for that many workspaces to be seen over the workspace updates stream (with a configurable timeout).Exposes two prometheus metrics:- `workspace_updates_latency_seconds` - `HistogramVec`. Labels = `{username, num_owned_workspaces, workspace_name}` - This is the time between starting a workspace build, and receiving both the corresponding workspace update.- `workspace_updates_errors_total` - `NewCounterVec`. Labels = `{username, num_owned_workspaces, action}` - The number of times a specific action of the runner has failed, per user/client.
1 parent2bd11df commit65ac6cb

File tree

4 files changed

+529
-0
lines changed

4 files changed

+529
-0
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package workspaceupdates
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"golang.org/x/xerrors"
8+
9+
"github.com/coder/coder/v2/codersdk"
10+
"github.com/coder/coder/v2/scaletest/createusers"
11+
"github.com/coder/coder/v2/scaletest/workspacebuild"
12+
)
13+
14+
typeConfigstruct {
15+
// User is the configuration for the user to create.
16+
User createusers.Config`json:"user"`
17+
18+
// Workspace is the configuration for the workspace to create. The workspace
19+
// will be built using the new user.
20+
//
21+
// OrganizationID is ignored and set to the new user's organization ID.
22+
Workspace workspacebuild.Config`json:"workspace"`
23+
24+
// WorkspaceCount is the number of workspaces to create.
25+
WorkspaceCountint64`json:"power_user_workspaces"`
26+
27+
// WorkspaceUpdatesTimeout is how long to wait for all expected workspace updates.
28+
WorkspaceUpdatesTimeout time.Duration`json:"workspace_updates_timeout"`
29+
30+
// DialTimeout is how long to wait to successfully dial the Coder Connect
31+
// endpoint.
32+
DialTimeout time.Duration`json:"dial_timeout"`
33+
34+
Metrics*Metrics`json:"-"`
35+
36+
// DialBarrier is used to ensure all runners have dialed the Coder Connect
37+
// endpoint before creating their workspace(s).
38+
DialBarrier*sync.WaitGroup`json:"-"`
39+
}
40+
41+
func (cConfig)Validate()error {
42+
iferr:=c.User.Validate();err!=nil {
43+
returnxerrors.Errorf("user config: %w",err)
44+
}
45+
c.Workspace.OrganizationID=c.User.OrganizationID
46+
// This value will be overwritten during the test.
47+
c.Workspace.UserID=codersdk.Me
48+
iferr:=c.Workspace.Validate();err!=nil {
49+
returnxerrors.Errorf("workspace config: %w",err)
50+
}
51+
52+
ifc.Workspace.Request.Name!="" {
53+
returnxerrors.New("workspace name cannot be overridden")
54+
}
55+
56+
ifc.WorkspaceCount<=0 {
57+
returnxerrors.New("workspace_count must be greater than 0")
58+
}
59+
60+
ifc.DialBarrier==nil {
61+
returnxerrors.New("dial barrier must be set")
62+
}
63+
64+
ifc.WorkspaceUpdatesTimeout<=0 {
65+
returnxerrors.New("workspace_updates_timeout must be greater than 0")
66+
}
67+
68+
ifc.DialTimeout<=0 {
69+
returnxerrors.New("dial_timeout must be greater than 0")
70+
}
71+
72+
ifc.Metrics==nil {
73+
returnxerrors.New("metrics must be set")
74+
}
75+
76+
returnnil
77+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package workspaceupdates
2+
3+
import (
4+
"strconv"
5+
"time"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
)
9+
10+
typeMetricsstruct {
11+
WorkspaceUpdatesLatencySeconds prometheus.HistogramVec
12+
WorkspaceUpdatesErrorsTotal prometheus.CounterVec
13+
}
14+
15+
funcNewMetrics(reg prometheus.Registerer)*Metrics {
16+
m:=&Metrics{
17+
WorkspaceUpdatesLatencySeconds:*prometheus.NewHistogramVec(prometheus.HistogramOpts{
18+
Namespace:"coderd",
19+
Subsystem:"scaletest",
20+
Name:"workspace_updates_latency_seconds",
21+
Help:"Time between starting a workspace build and receiving both the agent update and workspace update",
22+
}, []string{"username","num_owned_workspaces","workspace_name"}),
23+
WorkspaceUpdatesErrorsTotal:*prometheus.NewCounterVec(prometheus.CounterOpts{
24+
Namespace:"coderd",
25+
Subsystem:"scaletest",
26+
Name:"workspace_updates_errors_total",
27+
Help:"Total number of workspace updates errors",
28+
}, []string{"username","num_owned_workspaces","action"}),
29+
}
30+
31+
reg.MustRegister(m.WorkspaceUpdatesLatencySeconds)
32+
reg.MustRegister(m.WorkspaceUpdatesErrorsTotal)
33+
returnm
34+
}
35+
36+
func (m*Metrics)RecordCompletion(elapsed time.Duration,usernamestring,ownedWorkspacesint64,workspacestring) {
37+
m.WorkspaceUpdatesLatencySeconds.WithLabelValues(username,strconv.Itoa(int(ownedWorkspaces)),workspace).Observe(elapsed.Seconds())
38+
}
39+
40+
func (m*Metrics)AddError(usernamestring,ownedWorkspacesint64,actionstring) {
41+
m.WorkspaceUpdatesErrorsTotal.WithLabelValues(username,strconv.Itoa(int(ownedWorkspaces)),action).Inc()
42+
}

‎scaletest/workspaceupdates/run.go‎

Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
package workspaceupdates
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"net/http"
8+
"time"
9+
10+
"golang.org/x/xerrors"
11+
12+
"github.com/coder/websocket"
13+
14+
"cdr.dev/slog"
15+
"cdr.dev/slog/sloggers/sloghuman"
16+
"github.com/coder/coder/v2/coderd/tracing"
17+
"github.com/coder/coder/v2/codersdk"
18+
"github.com/coder/coder/v2/codersdk/workspacesdk"
19+
"github.com/coder/coder/v2/scaletest/createusers"
20+
"github.com/coder/coder/v2/scaletest/harness"
21+
"github.com/coder/coder/v2/scaletest/loadtestutil"
22+
"github.com/coder/coder/v2/scaletest/workspacebuild"
23+
"github.com/coder/coder/v2/tailnet"
24+
tailnetproto"github.com/coder/coder/v2/tailnet/proto"
25+
)
26+
27+
typeRunnerstruct {
28+
client*codersdk.Client
29+
cfgConfig
30+
31+
createUserRunner*createusers.Runner
32+
workspacebuildRunners []*workspacebuild.Runner
33+
34+
// workspace name to workspace
35+
workspacesmap[string]*workspace
36+
}
37+
38+
typeworkspacestruct {
39+
buildStartTime time.Time
40+
updateLatency time.Duration
41+
}
42+
43+
var (
44+
_ harness.Runnable=&Runner{}
45+
_ harness.Cleanable=&Runner{}
46+
_ harness.Collectable=&Runner{}
47+
)
48+
49+
funcNewRunner(client*codersdk.Client,cfgConfig)*Runner {
50+
return&Runner{
51+
client:client,
52+
cfg:cfg,
53+
workspaces:make(map[string]*workspace),
54+
}
55+
}
56+
57+
func (r*Runner)Run(ctx context.Context,idstring,logs io.Writer)error {
58+
ctx,span:=tracing.StartSpan(ctx)
59+
deferspan.End()
60+
61+
reachedBarrier:=false
62+
deferfunc() {
63+
if!reachedBarrier {
64+
r.cfg.DialBarrier.Done()
65+
}
66+
}()
67+
68+
logs=loadtestutil.NewSyncWriter(logs)
69+
logger:=slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug)
70+
r.client.SetLogger(logger)
71+
r.client.SetLogBodies(true)
72+
73+
r.createUserRunner=createusers.NewRunner(r.client,r.cfg.User)
74+
newUserAndToken,err:=r.createUserRunner.RunReturningUser(ctx,id,logs)
75+
iferr!=nil {
76+
returnxerrors.Errorf("create user: %w",err)
77+
}
78+
newUser:=newUserAndToken.User
79+
newUserClient:=codersdk.New(r.client.URL,
80+
codersdk.WithSessionToken(newUserAndToken.SessionToken),
81+
codersdk.WithLogger(logger),
82+
codersdk.WithLogBodies())
83+
84+
logger.Info(ctx,fmt.Sprintf("user %q created",newUser.Username),slog.F("id",newUser.ID.String()))
85+
86+
dialCtx,cancel:=context.WithTimeout(ctx,r.cfg.DialTimeout)
87+
defercancel()
88+
89+
logger.Info(ctx,"connecting to workspace updates stream")
90+
clients,err:=r.dialTailnet(dialCtx,newUserClient,newUser,logger)
91+
iferr!=nil {
92+
returnxerrors.Errorf("tailnet dial failed: %w",err)
93+
}
94+
deferclients.Closer.Close()
95+
logger.Info(ctx,"connected to workspace updates stream")
96+
97+
watchCtx,cancelWatch:=context.WithCancel(ctx)
98+
defercancelWatch()
99+
100+
completionCh:=make(chanerror,1)
101+
gofunc() {
102+
completionCh<-r.watchWorkspaceUpdates(watchCtx,clients,newUser,logger)
103+
}()
104+
105+
reachedBarrier=true
106+
r.cfg.DialBarrier.Done()
107+
r.cfg.DialBarrier.Wait()
108+
109+
r.workspacebuildRunners=make([]*workspacebuild.Runner,0,r.cfg.WorkspaceCount)
110+
fori:=ranger.cfg.WorkspaceCount {
111+
workspaceName,err:=loadtestutil.GenerateWorkspaceName(id)
112+
iferr!=nil {
113+
returnxerrors.Errorf("generate random name for workspace: %w",err)
114+
}
115+
workspaceBuildConfig:=r.cfg.Workspace
116+
workspaceBuildConfig.OrganizationID=r.cfg.User.OrganizationID
117+
workspaceBuildConfig.UserID=newUser.ID.String()
118+
workspaceBuildConfig.Request.Name=workspaceName
119+
120+
runner:=workspacebuild.NewRunner(newUserClient,workspaceBuildConfig)
121+
r.workspacebuildRunners=append(r.workspacebuildRunners,runner)
122+
123+
logger.Info(ctx,fmt.Sprintf("creating workspace %d/%d",i+1,r.cfg.WorkspaceCount))
124+
125+
// Record build start time before running the workspace build
126+
r.workspaces[workspaceName]=&workspace{
127+
buildStartTime:time.Now(),
128+
}
129+
err=runner.Run(ctx,fmt.Sprintf("%s-%d",id,i),logs)
130+
iferr!=nil {
131+
returnxerrors.Errorf("create workspace %d: %w",i,err)
132+
}
133+
}
134+
135+
logger.Info(ctx,fmt.Sprintf("waiting up to %v for workspace updates to complete...",r.cfg.WorkspaceUpdatesTimeout))
136+
137+
waitUpdatesCtx,cancel:=context.WithTimeout(ctx,r.cfg.WorkspaceUpdatesTimeout)
138+
defercancel()
139+
140+
select {
141+
caseerr:=<-completionCh:
142+
iferr!=nil {
143+
returnxerrors.Errorf("workspace updates streaming failed: %w",err)
144+
}
145+
logger.Info(ctx,"workspace updates streaming completed successfully")
146+
returnnil
147+
case<-waitUpdatesCtx.Done():
148+
cancelWatch()
149+
clients.Closer.Close()
150+
<-completionCh// ensure watch goroutine exits
151+
ifwaitUpdatesCtx.Err()==context.DeadlineExceeded {
152+
returnxerrors.Errorf("timeout waiting for workspace updates after %v",r.cfg.WorkspaceUpdatesTimeout)
153+
}
154+
returnwaitUpdatesCtx.Err()
155+
}
156+
}
157+
158+
func (r*Runner)dialTailnet(ctx context.Context,client*codersdk.Client,user codersdk.User,logger slog.Logger) (*tailnet.ControlProtocolClients,error) {
159+
u,err:=client.URL.Parse("/api/v2/tailnet")
160+
iferr!=nil {
161+
logger.Error(ctx,"failed to parse tailnet URL",slog.Error(err))
162+
r.cfg.Metrics.AddError(user.Username,r.cfg.WorkspaceCount,"parse_url")
163+
returnnil,xerrors.Errorf("parse tailnet URL: %w",err)
164+
}
165+
166+
dialer:=workspacesdk.NewWebsocketDialer(
167+
logger,
168+
u,
169+
&websocket.DialOptions{
170+
HTTPHeader: http.Header{
171+
"Coder-Session-Token": []string{client.SessionToken()},
172+
},
173+
},
174+
workspacesdk.WithWorkspaceUpdates(&tailnetproto.WorkspaceUpdatesRequest{
175+
WorkspaceOwnerId:tailnet.UUIDToByteSlice(user.ID),
176+
}),
177+
)
178+
179+
clients,err:=dialer.Dial(ctx,nil)
180+
iferr!=nil {
181+
logger.Error(ctx,"failed to dial workspace updates",slog.Error(err))
182+
r.cfg.Metrics.AddError(user.Username,r.cfg.WorkspaceCount,"dial")
183+
returnnil,xerrors.Errorf("dial workspace updates: %w",err)
184+
}
185+
186+
return&clients,nil
187+
}
188+
189+
// watchWorkspaceUpdates processes workspace updates and returns error or nil
190+
// once all expected workspaces and agents are seen.
191+
func (r*Runner)watchWorkspaceUpdates(ctx context.Context,clients*tailnet.ControlProtocolClients,user codersdk.User,logger slog.Logger)error {
192+
expectedWorkspaces:=r.cfg.WorkspaceCount
193+
// workspace name to time the update was seen
194+
seenWorkspaces:=make(map[string]time.Time)
195+
196+
logger.Info(ctx,fmt.Sprintf("waiting for %d workspaces and their agents",expectedWorkspaces))
197+
for {
198+
select {
199+
case<-ctx.Done():
200+
logger.Error(ctx,"context canceled while waiting for workspace updates",slog.Error(ctx.Err()))
201+
r.cfg.Metrics.AddError(user.Username,r.cfg.WorkspaceCount,"context_done")
202+
returnctx.Err()
203+
default:
204+
}
205+
206+
update,err:=clients.WorkspaceUpdates.Recv()
207+
iferr!=nil {
208+
logger.Error(ctx,"workspace updates stream error",slog.Error(err))
209+
r.cfg.Metrics.AddError(user.Username,r.cfg.WorkspaceCount,"recv")
210+
returnxerrors.Errorf("receive workspace update: %w",err)
211+
}
212+
recvTime:=time.Now()
213+
214+
for_,ws:=rangeupdate.UpsertedWorkspaces {
215+
seenWorkspaces[ws.Name]=recvTime
216+
}
217+
218+
iflen(seenWorkspaces)==int(expectedWorkspaces) {
219+
forwsName,seenTime:=rangeseenWorkspaces {
220+
// We only receive workspace updates for those that we built.
221+
// If we received a workspace update for a workspace we didn't build,
222+
// we're risking racing with the code that writes workspace
223+
// build start times to this map.
224+
ws,ok:=r.workspaces[wsName]
225+
if!ok {
226+
logger.Error(ctx,"received update for unexpected workspace",slog.F("workspace",wsName),slog.F("seen_workspaces",seenWorkspaces))
227+
r.cfg.Metrics.AddError(user.Username,r.cfg.WorkspaceCount,"unexpected_workspace")
228+
returnxerrors.Errorf("received update for unexpected workspace %q",wsName)
229+
}
230+
ws.updateLatency=seenTime.Sub(ws.buildStartTime)
231+
r.cfg.Metrics.RecordCompletion(ws.updateLatency,user.Username,r.cfg.WorkspaceCount,wsName)
232+
}
233+
logger.Info(ctx,fmt.Sprintf("updates received for all %d workspaces and agents",expectedWorkspaces))
234+
returnnil
235+
}
236+
}
237+
}
238+
239+
const (
240+
WorkspaceUpdatesLatencyMetric="workspace_updates_latency_seconds"
241+
)
242+
243+
func (r*Runner)GetMetrics()map[string]any {
244+
latencyMap:=make(map[string]float64)
245+
forwsName,ws:=ranger.workspaces {
246+
latencyMap[wsName]=ws.updateLatency.Seconds()
247+
}
248+
returnmap[string]any{
249+
WorkspaceUpdatesLatencyMetric:latencyMap,
250+
}
251+
}
252+
253+
func (r*Runner)Cleanup(ctx context.Context,idstring,logs io.Writer)error {
254+
fori,runner:=ranger.workspacebuildRunners {
255+
ifrunner!=nil {
256+
_,_=fmt.Fprintf(logs,"Cleaning up workspace %d/%d...\n",i+1,len(r.workspacebuildRunners))
257+
iferr:=runner.Cleanup(ctx,fmt.Sprintf("%s-%d",id,i),logs);err!=nil {
258+
returnxerrors.Errorf("cleanup workspace %d: %w",i,err)
259+
}
260+
}
261+
}
262+
263+
ifr.createUserRunner!=nil {
264+
_,_=fmt.Fprintln(logs,"Cleaning up user...")
265+
iferr:=r.createUserRunner.Cleanup(ctx,id,logs);err!=nil {
266+
returnxerrors.Errorf("cleanup user: %w",err)
267+
}
268+
}
269+
270+
returnnil
271+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp