@@ -15,6 +15,7 @@ import (
15
15
"cdr.dev/slog"
16
16
"cdr.dev/slog/sloggers/sloghuman"
17
17
"github.com/coder/coder/v2/coderd/tracing"
18
+ "github.com/coder/coder/v2/coderd/util/syncmap"
18
19
"github.com/coder/coder/v2/codersdk"
19
20
"github.com/coder/coder/v2/codersdk/workspacesdk"
20
21
"github.com/coder/coder/v2/scaletest/createusers"
@@ -32,8 +33,14 @@ type Runner struct {
32
33
createUserRunner * createusers.Runner
33
34
workspacebuildRunners []* workspacebuild.Runner
34
35
35
- // startTime records when workspace builds begin (for metrics timing)
36
- startTime time.Time
36
+ // workspace name to workspace
37
+ workspaces * syncmap.Map [string ,* workspace ]
38
+ }
39
+
40
+ type workspace struct {
41
+ workspaceID uuid.UUID
42
+ buildStartTime time.Time
43
+ updateLatency time.Duration
37
44
}
38
45
39
46
var (
44
51
45
52
func NewRunner (client * codersdk.Client ,cfg Config )* Runner {
46
53
return & Runner {
47
- client :client ,
48
- cfg :cfg ,
54
+ client :client ,
55
+ cfg :cfg ,
56
+ workspaces :syncmap .New [string ,* workspace ](),
49
57
}
50
58
}
51
59
@@ -113,24 +121,32 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error {
113
121
114
122
completionCh := make (chan error ,1 )
115
123
go func () {
116
- completionCh <- r .watchWorkspaceUpdates (watchCtx ,clients ,logs )
124
+ completionCh <- r .watchWorkspaceUpdates (watchCtx ,clients ,user , logs )
117
125
}()
118
126
119
127
reachedBarrier = true
120
128
r .cfg .DialBarrier .Wait ()
121
129
122
- r .startTime = time .Now ()
123
-
124
130
workspaceRunners := make ([]* workspacebuild.Runner ,0 ,r .cfg .WorkspaceCount )
125
131
for i := range r .cfg .WorkspaceCount {
132
+ workspaceName ,err := loadtestutil .GenerateWorkspaceName (id )
133
+ if err != nil {
134
+ return xerrors .Errorf ("generate random name for workspace: %w" ,err )
135
+ }
126
136
workspaceBuildConfig := r .cfg .Workspace
127
137
workspaceBuildConfig .OrganizationID = r .cfg .User .OrganizationID
128
138
workspaceBuildConfig .UserID = user .ID .String ()
139
+ workspaceBuildConfig .Request .Name = workspaceName
129
140
130
141
runner := workspacebuild .NewRunner (client ,workspaceBuildConfig )
131
142
workspaceRunners = append (workspaceRunners ,runner )
132
143
133
144
_ ,_ = fmt .Fprintf (logs ,"Creating workspace %d/%d...\n " ,i + 1 ,r .cfg .WorkspaceCount )
145
+
146
+ // Record build start time before running the workspace build
147
+ r .workspaces .Store (workspaceName ,& workspace {
148
+ buildStartTime :time .Now (),
149
+ })
134
150
err = runner .Run (ctx ,fmt .Sprintf ("%s-%d" ,id ,i ),logs )
135
151
if err != nil {
136
152
return xerrors .Errorf ("create workspace %d: %w" ,i ,err )
@@ -163,7 +179,7 @@ func (r *Runner) dialCoderConnect(ctx context.Context, client *codersdk.Client,
163
179
u ,err := client .URL .Parse ("/api/v2/tailnet" )
164
180
if err != nil {
165
181
logger .Error (ctx ,"failed to parse tailnet URL" ,slog .Error (err ))
166
- r .cfg .Metrics .AddError (append ( r .cfg .MetricLabelValues ,"parse_url" ) ... )
182
+ r .cfg .Metrics .AddError (user . Username , r .cfg .WorkspaceCount ,"parse_url" )
167
183
return nil ,xerrors .Errorf ("parse tailnet URL: %w" ,err )
168
184
}
169
185
@@ -183,7 +199,7 @@ func (r *Runner) dialCoderConnect(ctx context.Context, client *codersdk.Client,
183
199
clients ,err := dialer .Dial (ctx ,nil )
184
200
if err != nil {
185
201
logger .Error (ctx ,"failed to dial workspace updates" ,slog .Error (err ))
186
- r .cfg .Metrics .AddError (append ( r .cfg .MetricLabelValues ,"dial" ) ... )
202
+ r .cfg .Metrics .AddError (user . Username , r .cfg .WorkspaceCount ,"dial" )
187
203
return nil ,xerrors .Errorf ("dial workspace updates: %w" ,err )
188
204
}
189
205
@@ -192,63 +208,89 @@ func (r *Runner) dialCoderConnect(ctx context.Context, client *codersdk.Client,
192
208
193
209
// watchWorkspaceUpdates processes workspace updates and returns error or nil
194
210
// once all expected workspaces and agents are seen.
195
- func (r * Runner )watchWorkspaceUpdates (ctx context.Context ,clients * tailnet.ControlProtocolClients ,logs io.Writer )error {
211
+ func (r * Runner )watchWorkspaceUpdates (ctx context.Context ,clients * tailnet.ControlProtocolClients ,user codersdk. User , logs io.Writer )error {
196
212
defer clients .Closer .Close ()
197
213
198
- seenWorkspaces := make (map [uuid.UUID ]bool )
199
- seenAgents := make (map [uuid.UUID ]bool )
200
214
expectedWorkspaces := r .cfg .WorkspaceCount
201
- expectedAgents := expectedWorkspaces
215
+ seenWorkspaces := 0
216
+
217
+ // Workspace ID to agent update arrival time.
218
+ // At the end, we reconcile to see which took longer, and mark that as the
219
+ // latency.
220
+ agents := make (map [uuid.UUID ]time.Time )
202
221
203
- _ ,_ = fmt .Fprintf (logs ,"Waiting for %d workspaces and%d agents\n " ,expectedWorkspaces , expectedAgents )
222
+ _ ,_ = fmt .Fprintf (logs ,"Waiting for %d workspaces andtheir agents\n " ,expectedWorkspaces )
204
223
205
224
for {
206
225
select {
207
226
case <- ctx .Done ():
208
227
_ ,_ = fmt .Fprintf (logs ,"Context canceled while waiting for workspace updates: %v\n " ,ctx .Err ())
209
- r .cfg .Metrics .AddError (append ( r .cfg .MetricLabelValues ,"context_done" ) ... )
228
+ r .cfg .Metrics .AddError (user . Username , r .cfg .WorkspaceCount ,"context_done" )
210
229
return ctx .Err ()
211
230
default :
212
231
}
213
232
214
233
update ,err := clients .WorkspaceUpdates .Recv ()
215
234
if err != nil {
216
235
_ ,_ = fmt .Fprintf (logs ,"Workspace updates stream error: %v\n " ,err )
217
- r .cfg .Metrics .AddError (append ( r .cfg .MetricLabelValues ,"recv" ) ... )
236
+ r .cfg .Metrics .AddError (user . Username , r .cfg .WorkspaceCount ,"recv" )
218
237
return xerrors .Errorf ("receive workspace update: %w" ,err )
219
238
}
220
239
221
240
for _ ,ws := range update .UpsertedWorkspaces {
222
241
wsID ,err := uuid .FromBytes (ws .Id )
223
242
if err != nil {
224
243
_ ,_ = fmt .Fprintf (logs ,"Invalid workspace ID in update: %v\n " ,err )
225
- r .cfg .Metrics .AddError (append ( r .cfg .MetricLabelValues ,"bad_workspace_id" ) ... )
244
+ r .cfg .Metrics .AddError (user . Username , r .cfg .WorkspaceCount ,"bad_workspace_id" )
226
245
continue
227
246
}
228
- if ! seenWorkspaces [wsID ] {
229
- seenWorkspaces [wsID ]= true
230
- _ ,_ = fmt .Fprintf (logs ,"Received workspace update: %s (%d/%d)\n " ,wsID ,len (seenWorkspaces ),expectedWorkspaces )
247
+
248
+ if tracking ,ok := r .workspaces .Load (ws .GetName ());ok {
249
+ if tracking .updateLatency == 0 {
250
+ r .workspaces .Store (ws .GetName (),& workspace {
251
+ workspaceID :wsID ,
252
+ buildStartTime :tracking .buildStartTime ,
253
+ updateLatency :time .Since (tracking .buildStartTime ),
254
+ })
255
+ seenWorkspaces ++
256
+ }
257
+ }else if ! ok {
258
+ return xerrors .Errorf ("received update for unknown workspace %q (id: %s)" ,ws .GetName (),wsID )
231
259
}
232
260
}
233
261
234
262
for _ ,agent := range update .UpsertedAgents {
235
- agentID ,err := uuid .FromBytes (agent .Id )
263
+ wsID ,err := uuid .FromBytes (agent .WorkspaceId )
236
264
if err != nil {
237
- _ ,_ = fmt .Fprintf (logs ,"Invalidagent ID in update: %v\n " ,err )
238
- r .cfg .Metrics .AddError (append ( r .cfg .MetricLabelValues ,"bad_agent_id" ) ... )
265
+ _ ,_ = fmt .Fprintf (logs ,"Invalidworkspace ID in agent update: %v\n " ,err )
266
+ r .cfg .Metrics .AddError (user . Username , r .cfg .WorkspaceCount ,"bad_agent_workspace_id" )
239
267
continue
240
268
}
241
- if ! seenAgents [ agentID ] {
242
- seenAgents [ agentID ] = true
243
- _ , _ = fmt . Fprintf ( logs , "Received agent update: %s (%d/%d) \n " , agentID , len ( seenAgents ), expectedAgents )
269
+
270
+ if _ , ok := agents [ wsID ]; ! ok {
271
+ agents [ wsID ] = time . Now ( )
244
272
}
245
273
}
246
274
247
- if len (seenWorkspaces )>= int (expectedWorkspaces )&& len (seenAgents )>= int (expectedAgents ) {
248
- elapsed := time .Since (r .startTime )
249
- _ ,_ = fmt .Fprintf (logs ,"All expected workspaces (%d) and agents (%d) received in %v\n " ,
250
- len (seenWorkspaces ),len (seenAgents ),elapsed )
251
- r .cfg .Metrics .RecordCompletion (elapsed ,r .cfg .MetricLabelValues ... )
275
+ if seenWorkspaces == int (expectedWorkspaces )&& len (agents )== int (expectedWorkspaces ) {
276
+ // For each workspace, record the latency from build start to
277
+ // workspace update, or agent update, whichever is later.
278
+ r .workspaces .Range (func (wsName string ,ws * workspace )bool {
279
+ if agentTime ,ok := agents [ws .workspaceID ];ok {
280
+ agentLatency := agentTime .Sub (ws .buildStartTime )
281
+ if agentLatency > ws .updateLatency {
282
+ // Update in-place, so our final metrics reporting is
283
+ // correct.
284
+ ws .updateLatency = agentLatency
285
+ }
286
+ }else {
287
+ // Unreachable, recorded for debugging
288
+ r .cfg .Metrics .AddError (user .Username ,r .cfg .WorkspaceCount ,"missing_agent" )
289
+ }
290
+ r .cfg .Metrics .RecordCompletion (ws .updateLatency ,user .Username ,r .cfg .WorkspaceCount ,wsName )
291
+ return true
292
+ })
293
+ _ ,_ = fmt .Fprintf (logs ,"Updates received for all %d workspaces and agents\n " ,expectedWorkspaces )
252
294
return nil
253
295
}
254
296
}
@@ -260,9 +302,14 @@ const (
260
302
)
261
303
262
304
func (r * Runner )GetMetrics ()map [string ]any {
305
+ latencyMap := make (map [string ]float64 )
306
+ r .workspaces .Range (func (wsName string ,ws * workspace )bool {
307
+ latencyMap [wsName ]= ws .updateLatency .Seconds ()
308
+ return true
309
+ })
263
310
return map [string ]any {
264
311
WorkspaceUpdatesErrorsTotal :r .cfg .Metrics .numErrors .Load (),
265
- WorkspaceUpdatesLatencyMetric :r . cfg . Metrics . completionDuration . Seconds () ,
312
+ WorkspaceUpdatesLatencyMetric :latencyMap ,
266
313
}
267
314
}
268
315