@@ -15,6 +15,7 @@ import (
15
15
"cdr.dev/slog"
16
16
"cdr.dev/slog/sloggers/sloghuman"
17
17
"github.com/coder/coder/v2/coderd/tracing"
18
+ "github.com/coder/coder/v2/coderd/util/syncmap"
18
19
"github.com/coder/coder/v2/codersdk"
19
20
"github.com/coder/coder/v2/codersdk/workspacesdk"
20
21
"github.com/coder/coder/v2/scaletest/createusers"
@@ -32,8 +33,14 @@ type Runner struct {
32
33
createUserRunner * createusers.Runner
33
34
workspacebuildRunners []* workspacebuild.Runner
34
35
35
- // startTime records when workspace builds begin (for metrics timing)
36
- startTime time.Time
36
+ // workspace name to workspace
37
+ workspaces * syncmap.Map [string ,* workspace ]
38
+ }
39
+
40
+ type workspace struct {
41
+ workspaceID uuid.UUID
42
+ buildStartTime time.Time
43
+ updateLatency time.Duration
37
44
}
38
45
39
46
var (
44
51
45
52
func NewRunner (client * codersdk.Client ,cfg Config )* Runner {
46
53
return & Runner {
47
- client :client ,
48
- cfg :cfg ,
54
+ client :client ,
55
+ cfg :cfg ,
56
+ workspaces :syncmap .New [string ,* workspace ](),
49
57
}
50
58
}
51
59
@@ -113,24 +121,32 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error {
113
121
114
122
completionCh := make (chan error ,1 )
115
123
go func () {
116
- completionCh <- r .watchWorkspaceUpdates (watchCtx ,clients ,logs )
124
+ completionCh <- r .watchWorkspaceUpdates (watchCtx ,clients ,user , logs )
117
125
}()
118
126
119
127
reachedBarrier = true
120
128
r .cfg .DialBarrier .Wait ()
121
129
122
- r .startTime = time .Now ()
123
-
124
130
workspaceRunners := make ([]* workspacebuild.Runner ,0 ,r .cfg .WorkspaceCount )
125
131
for i := range r .cfg .WorkspaceCount {
132
+ workspaceName ,err := loadtestutil .GenerateWorkspaceName (id )
133
+ if err != nil {
134
+ return xerrors .Errorf ("generate random name for workspace: %w" ,err )
135
+ }
126
136
workspaceBuildConfig := r .cfg .Workspace
127
137
workspaceBuildConfig .OrganizationID = r .cfg .User .OrganizationID
128
138
workspaceBuildConfig .UserID = user .ID .String ()
139
+ workspaceBuildConfig .Request .Name = workspaceName
129
140
130
141
runner := workspacebuild .NewRunner (client ,workspaceBuildConfig )
131
142
workspaceRunners = append (workspaceRunners ,runner )
132
143
133
144
_ ,_ = fmt .Fprintf (logs ,"Creating workspace %d/%d...\n " ,i + 1 ,r .cfg .WorkspaceCount )
145
+
146
+ // Record build start time before running the workspace build
147
+ r .workspaces .Store (workspaceName ,& workspace {
148
+ buildStartTime :time .Now (),
149
+ })
134
150
err = runner .Run (ctx ,fmt .Sprintf ("%s-%d" ,id ,i ),logs )
135
151
if err != nil {
136
152
return xerrors .Errorf ("create workspace %d: %w" ,i ,err )
@@ -163,7 +179,7 @@ func (r *Runner) dialCoderConnect(ctx context.Context, client *codersdk.Client,
163
179
u ,err := client .URL .Parse ("/api/v2/tailnet" )
164
180
if err != nil {
165
181
logger .Error (ctx ,"failed to parse tailnet URL" ,slog .Error (err ))
166
- r .cfg .Metrics .AddError (append ( r .cfg .MetricLabelValues ,"parse_url" ) ... )
182
+ r .cfg .Metrics .AddError (user . Username , r .cfg .WorkspaceCount ,"parse_url" )
167
183
return nil ,xerrors .Errorf ("parse tailnet URL: %w" ,err )
168
184
}
169
185
@@ -183,7 +199,7 @@ func (r *Runner) dialCoderConnect(ctx context.Context, client *codersdk.Client,
183
199
clients ,err := dialer .Dial (ctx ,nil )
184
200
if err != nil {
185
201
logger .Error (ctx ,"failed to dial workspace updates" ,slog .Error (err ))
186
- r .cfg .Metrics .AddError (append ( r .cfg .MetricLabelValues ,"dial" ) ... )
202
+ r .cfg .Metrics .AddError (user . Username , r .cfg .WorkspaceCount ,"dial" )
187
203
return nil ,xerrors .Errorf ("dial workspace updates: %w" ,err )
188
204
}
189
205
@@ -192,63 +208,86 @@ func (r *Runner) dialCoderConnect(ctx context.Context, client *codersdk.Client,
192
208
193
209
// watchWorkspaceUpdates processes workspace updates and returns error or nil
194
210
// once all expected workspaces and agents are seen.
195
- func (r * Runner )watchWorkspaceUpdates (ctx context.Context ,clients * tailnet.ControlProtocolClients ,logs io.Writer )error {
211
+ func (r * Runner )watchWorkspaceUpdates (ctx context.Context ,clients * tailnet.ControlProtocolClients ,user codersdk. User , logs io.Writer )error {
196
212
defer clients .Closer .Close ()
197
213
198
- seenWorkspaces := make (map [uuid.UUID ]bool )
199
- seenAgents := make (map [uuid.UUID ]bool )
200
214
expectedWorkspaces := r .cfg .WorkspaceCount
201
- expectedAgents := expectedWorkspaces
202
-
203
- _ ,_ = fmt .Fprintf (logs ,"Waiting for %d workspaces and %d agents\n " ,expectedWorkspaces ,expectedAgents )
215
+ seenWorkspaces := 0
216
+ // Workspace ID to agent update arrival time.
217
+ // At the end, we reconcile to see which took longer, and mark that as the
218
+ // latency.
219
+ agents := make (map [uuid.UUID ]time.Time )
204
220
221
+ _ ,_ = fmt .Fprintf (logs ,"Waiting for %d workspaces and their agents\n " ,expectedWorkspaces )
205
222
for {
206
223
select {
207
224
case <- ctx .Done ():
208
225
_ ,_ = fmt .Fprintf (logs ,"Context canceled while waiting for workspace updates: %v\n " ,ctx .Err ())
209
- r .cfg .Metrics .AddError (append ( r .cfg .MetricLabelValues ,"context_done" ) ... )
226
+ r .cfg .Metrics .AddError (user . Username , r .cfg .WorkspaceCount ,"context_done" )
210
227
return ctx .Err ()
211
228
default :
212
229
}
213
230
214
231
update ,err := clients .WorkspaceUpdates .Recv ()
215
232
if err != nil {
216
233
_ ,_ = fmt .Fprintf (logs ,"Workspace updates stream error: %v\n " ,err )
217
- r .cfg .Metrics .AddError (append ( r .cfg .MetricLabelValues ,"recv" ) ... )
234
+ r .cfg .Metrics .AddError (user . Username , r .cfg .WorkspaceCount ,"recv" )
218
235
return xerrors .Errorf ("receive workspace update: %w" ,err )
219
236
}
220
237
221
238
for _ ,ws := range update .UpsertedWorkspaces {
222
239
wsID ,err := uuid .FromBytes (ws .Id )
223
240
if err != nil {
224
241
_ ,_ = fmt .Fprintf (logs ,"Invalid workspace ID in update: %v\n " ,err )
225
- r .cfg .Metrics .AddError (append ( r .cfg .MetricLabelValues ,"bad_workspace_id" ) ... )
242
+ r .cfg .Metrics .AddError (user . Username , r .cfg .WorkspaceCount ,"bad_workspace_id" )
226
243
continue
227
244
}
228
- if ! seenWorkspaces [wsID ] {
229
- seenWorkspaces [wsID ]= true
230
- _ ,_ = fmt .Fprintf (logs ,"Received workspace update: %s (%d/%d)\n " ,wsID ,len (seenWorkspaces ),expectedWorkspaces )
245
+
246
+ if tracking ,ok := r .workspaces .Load (ws .GetName ());ok {
247
+ if tracking .updateLatency == 0 {
248
+ r .workspaces .Store (ws .GetName (),& workspace {
249
+ workspaceID :wsID ,
250
+ buildStartTime :tracking .buildStartTime ,
251
+ updateLatency :time .Since (tracking .buildStartTime ),
252
+ })
253
+ seenWorkspaces ++
254
+ }
255
+ }else if ! ok {
256
+ return xerrors .Errorf ("received update for unknown workspace %q (id: %s)" ,ws .GetName (),wsID )
231
257
}
232
258
}
233
259
234
260
for _ ,agent := range update .UpsertedAgents {
235
- agentID ,err := uuid .FromBytes (agent .Id )
261
+ wsID ,err := uuid .FromBytes (agent .WorkspaceId )
236
262
if err != nil {
237
- _ ,_ = fmt .Fprintf (logs ,"Invalidagent ID in update: %v\n " ,err )
238
- r .cfg .Metrics .AddError (append ( r .cfg .MetricLabelValues ,"bad_agent_id" ) ... )
263
+ _ ,_ = fmt .Fprintf (logs ,"Invalidworkspace ID in agent update: %v\n " ,err )
264
+ r .cfg .Metrics .AddError (user . Username , r .cfg .WorkspaceCount ,"bad_agent_workspace_id" )
239
265
continue
240
266
}
241
- if ! seenAgents [ agentID ] {
242
- seenAgents [ agentID ] = true
243
- _ , _ = fmt . Fprintf ( logs , "Received agent update: %s (%d/%d) \n " , agentID , len ( seenAgents ), expectedAgents )
267
+
268
+ if _ , ok := agents [ wsID ]; ! ok {
269
+ agents [ wsID ] = time . Now ( )
244
270
}
245
271
}
246
272
247
- if len (seenWorkspaces )>= int (expectedWorkspaces )&& len (seenAgents )>= int (expectedAgents ) {
248
- elapsed := time .Since (r .startTime )
249
- _ ,_ = fmt .Fprintf (logs ,"All expected workspaces (%d) and agents (%d) received in %v\n " ,
250
- len (seenWorkspaces ),len (seenAgents ),elapsed )
251
- r .cfg .Metrics .RecordCompletion (elapsed ,r .cfg .MetricLabelValues ... )
273
+ if seenWorkspaces == int (expectedWorkspaces )&& len (agents )== int (expectedWorkspaces ) {
274
+ // For each workspace, record the latency from build start to
275
+ // workspace update, or agent update, whichever is later.
276
+ r .workspaces .Range (func (wsName string ,ws * workspace )bool {
277
+ if agentTime ,ok := agents [ws .workspaceID ];ok {
278
+ agentLatency := agentTime .Sub (ws .buildStartTime )
279
+ if agentLatency > ws .updateLatency {
280
+ // Update in-place, so GetMetrics is accurate.
281
+ ws .updateLatency = agentLatency
282
+ }
283
+ }else {
284
+ // Unreachable, recorded for debugging
285
+ r .cfg .Metrics .AddError (user .Username ,r .cfg .WorkspaceCount ,"missing_agent" )
286
+ }
287
+ r .cfg .Metrics .RecordCompletion (ws .updateLatency ,user .Username ,r .cfg .WorkspaceCount ,wsName )
288
+ return true
289
+ })
290
+ _ ,_ = fmt .Fprintf (logs ,"Updates received for all %d workspaces and agents\n " ,expectedWorkspaces )
252
291
return nil
253
292
}
254
293
}
@@ -260,9 +299,14 @@ const (
260
299
)
261
300
262
301
func (r * Runner )GetMetrics ()map [string ]any {
302
+ latencyMap := make (map [string ]float64 )
303
+ r .workspaces .Range (func (wsName string ,ws * workspace )bool {
304
+ latencyMap [wsName ]= ws .updateLatency .Seconds ()
305
+ return true
306
+ })
263
307
return map [string ]any {
264
308
WorkspaceUpdatesErrorsTotal :r .cfg .Metrics .numErrors .Load (),
265
- WorkspaceUpdatesLatencyMetric :r . cfg . Metrics . completionDuration . Seconds () ,
309
+ WorkspaceUpdatesLatencyMetric :latencyMap ,
266
310
}
267
311
}
268
312