@@ -7,15 +7,13 @@ import (
7
7
"net/http"
8
8
"time"
9
9
10
- "github.com/google/uuid"
11
10
"golang.org/x/xerrors"
12
11
13
12
"github.com/coder/websocket"
14
13
15
14
"cdr.dev/slog"
16
15
"cdr.dev/slog/sloggers/sloghuman"
17
16
"github.com/coder/coder/v2/coderd/tracing"
18
- "github.com/coder/coder/v2/coderd/util/syncmap"
19
17
"github.com/coder/coder/v2/codersdk"
20
18
"github.com/coder/coder/v2/codersdk/workspacesdk"
21
19
"github.com/coder/coder/v2/scaletest/createusers"
@@ -34,11 +32,10 @@ type Runner struct {
34
32
workspacebuildRunners []* workspacebuild.Runner
35
33
36
34
// workspace name to workspace
37
- workspaces * syncmap. Map [string , * workspace ]
35
+ workspaces map [string ] * workspace
38
36
}
39
37
40
38
type workspace struct {
41
- workspaceID uuid.UUID
42
39
buildStartTime time.Time
43
40
updateLatency time.Duration
44
41
}
@@ -53,7 +50,7 @@ func NewRunner(client *codersdk.Client, cfg Config) *Runner {
53
50
return & Runner {
54
51
client :client ,
55
52
cfg :cfg ,
56
- workspaces :syncmap . New [string , * workspace ]( ),
53
+ workspaces :make ( map [string ] * workspace ),
57
54
}
58
55
}
59
56
@@ -73,62 +70,43 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error {
73
70
r .client .SetLogger (logger )
74
71
r .client .SetLogBodies (true )
75
72
76
- var (
77
- client = r .client
78
- user codersdk.User
79
- err error
80
- )
81
- if r .cfg .User .SessionToken != "" {
82
- user ,err = client .User (ctx ,"me" )
83
- if err != nil {
84
- return xerrors .Errorf ("get user with session token: %w" ,err )
85
- }
86
- }else {
87
- createUserConfig := createusers.Config {
88
- OrganizationID :r .cfg .User .OrganizationID ,
89
- Username :r .cfg .User .Username ,
90
- Email :r .cfg .User .Email ,
91
- }
92
- if err := createUserConfig .Validate ();err != nil {
93
- return xerrors .Errorf ("validate create user config: %w" ,err )
94
- }
95
- r .createUserRunner = createusers .NewRunner (r .client ,createUserConfig )
96
- newUser ,err := r .createUserRunner .RunReturningUser (ctx ,id ,logs )
97
- if err != nil {
98
- return xerrors .Errorf ("create user: %w" ,err )
99
- }
100
- user = newUser .User
101
- client = codersdk .New (r .client .URL )
102
- client .SetSessionToken (newUser .SessionToken )
103
- client .SetLogger (logger )
104
- client .SetLogBodies (true )
73
+ r .createUserRunner = createusers .NewRunner (r .client ,r .cfg .User )
74
+ newUser ,err := r .createUserRunner .RunReturningUser (ctx ,id ,logs )
75
+ if err != nil {
76
+ return xerrors .Errorf ("create user: %w" ,err )
105
77
}
78
+ user := newUser .User
79
+ client := codersdk .New (r .client .URL ,
80
+ codersdk .WithSessionToken (newUser .SessionToken ),
81
+ codersdk .WithLogger (logger ),
82
+ codersdk .WithLogBodies ())
106
83
107
- _ , _ = fmt .Fprintf ( logs , "Using user %q(id: %s) \n " ,user .Username , user .ID )
84
+ logger . Info ( ctx , fmt .Sprintf ( " user %qcreated " ,user .Username ), slog . F ( "id" , user .ID . String ()) )
108
85
109
86
dialCtx ,cancel := context .WithTimeout (ctx ,r .cfg .DialTimeout )
110
87
defer cancel ()
111
88
112
- _ , _ = fmt . Fprintf ( logs ,"Connecting to workspace updates stream for user %s \n " , user . Username )
89
+ logger . Info ( ctx ,"connecting to workspace updates stream" )
113
90
clients ,err := r .dialCoderConnect (dialCtx ,client ,user ,logger )
114
91
if err != nil {
115
92
return xerrors .Errorf ("coder connect dial failed: %w" ,err )
116
93
}
117
- _ ,_ = fmt .Fprintf (logs ,"Successfully connected to workspace updates stream\n " )
94
+ logger .Info (ctx ,"connected to workspace updates stream" )
95
+ defer clients .Closer .Close ()
118
96
119
97
watchCtx ,cancelWatch := context .WithCancel (ctx )
120
98
defer cancelWatch ()
121
99
122
100
completionCh := make (chan error ,1 )
123
101
go func () {
124
- completionCh <- r .watchWorkspaceUpdates (watchCtx ,clients ,user ,logs )
102
+ completionCh <- r .watchWorkspaceUpdates (watchCtx ,clients ,user ,logger )
125
103
}()
126
104
127
105
reachedBarrier = true
128
106
r .cfg .DialBarrier .Done ()
129
107
r .cfg .DialBarrier .Wait ()
130
108
131
- workspaceRunners : =make ([]* workspacebuild.Runner ,0 ,r .cfg .WorkspaceCount )
109
+ r . workspacebuildRunners = make ([]* workspacebuild.Runner ,0 ,r .cfg .WorkspaceCount )
132
110
for i := range r .cfg .WorkspaceCount {
133
111
workspaceName ,err := loadtestutil .GenerateWorkspaceName (id )
134
112
if err != nil {
@@ -140,23 +118,21 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error {
140
118
workspaceBuildConfig .Request .Name = workspaceName
141
119
142
120
runner := workspacebuild .NewRunner (client ,workspaceBuildConfig )
143
- workspaceRunners = append (workspaceRunners ,runner )
121
+ r . workspacebuildRunners = append (r . workspacebuildRunners ,runner )
144
122
145
- _ , _ = fmt .Fprintf ( logs , "Creating workspace %d/%d... \n " ,i + 1 ,r .cfg .WorkspaceCount )
123
+ logger . Info ( ctx , fmt .Sprintf ( "creating workspace %d/%d" ,i + 1 ,r .cfg .WorkspaceCount ) )
146
124
147
125
// Record build start time before running the workspace build
148
- r .workspaces . Store ( workspaceName , & workspace {
126
+ r .workspaces [ workspaceName ] = & workspace {
149
127
buildStartTime :time .Now (),
150
- })
128
+ }
151
129
err = runner .Run (ctx ,fmt .Sprintf ("%s-%d" ,id ,i ),logs )
152
130
if err != nil {
153
131
return xerrors .Errorf ("create workspace %d: %w" ,i ,err )
154
132
}
155
133
}
156
134
157
- r .workspacebuildRunners = workspaceRunners
158
-
159
- _ ,_ = fmt .Fprintf (logs ,"Waiting up to %v for workspace updates to complete...\n " ,r .cfg .WorkspaceUpdatesTimeout )
135
+ logger .Info (ctx ,fmt .Sprintf ("waiting up to %v for workspace updates to complete..." ,r .cfg .WorkspaceUpdatesTimeout ))
160
136
161
137
waitUpdatesCtx ,cancel := context .WithTimeout (ctx ,r .cfg .WorkspaceUpdatesTimeout )
162
138
defer cancel ()
@@ -166,7 +142,7 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error {
166
142
if err != nil {
167
143
return xerrors .Errorf ("workspace updates streaming failed: %w" ,err )
168
144
}
169
- _ , _ = fmt . Fprintf ( logs ,"Workspace updates streaming completed successfully\n " )
145
+ logger . Info ( ctx ,"workspace updates streaming completed successfully" )
170
146
return nil
171
147
case <- waitUpdatesCtx .Done ():
172
148
if waitUpdatesCtx .Err ()== context .DeadlineExceeded {
@@ -209,114 +185,65 @@ func (r *Runner) dialCoderConnect(ctx context.Context, client *codersdk.Client,
209
185
210
186
// watchWorkspaceUpdates processes workspace updates and returns error or nil
211
187
// once all expected workspaces and agents are seen.
212
- func (r * Runner )watchWorkspaceUpdates (ctx context.Context ,clients * tailnet.ControlProtocolClients ,user codersdk.User ,logs io.Writer )error {
213
- defer clients .Closer .Close ()
214
-
188
+ func (r * Runner )watchWorkspaceUpdates (ctx context.Context ,clients * tailnet.ControlProtocolClients ,user codersdk.User ,logger slog.Logger )error {
215
189
expectedWorkspaces := r .cfg .WorkspaceCount
216
- seenWorkspaces := 0
217
- // Workspace ID to agent update arrival time.
218
- // At the end, we reconcile to see which took longer, and mark that as the
219
- // latency.
220
- agents := make (map [uuid.UUID ]time.Time )
190
+ // workspace name to time the update was seen
191
+ seenWorkspaces := make (map [string ]time.Time )
221
192
222
- _ , _ = fmt .Fprintf ( logs , "Waiting for %d workspaces and their agents\n " ,expectedWorkspaces )
193
+ logger . Info ( ctx , fmt .Sprintf ( "waiting for %d workspaces and their agents" ,expectedWorkspaces ) )
223
194
for {
224
195
select {
225
196
case <- ctx .Done ():
226
- _ , _ = fmt . Fprintf ( logs ,"Context canceled while waiting for workspace updates: %v \n " ,ctx .Err ())
197
+ logger . Error ( ctx ,"context canceled while waiting for workspace updates" ,slog . Error ( ctx .Err () ))
227
198
r .cfg .Metrics .AddError (user .Username ,r .cfg .WorkspaceCount ,"context_done" )
228
199
return ctx .Err ()
229
200
default :
230
201
}
231
202
232
203
update ,err := clients .WorkspaceUpdates .Recv ()
233
204
if err != nil {
234
- _ , _ = fmt . Fprintf ( logs ,"Workspace updates stream error: %v \n " ,err )
205
+ logger . Error ( ctx ,"workspace updates stream error" ,slog . Error ( err ) )
235
206
r .cfg .Metrics .AddError (user .Username ,r .cfg .WorkspaceCount ,"recv" )
236
207
return xerrors .Errorf ("receive workspace update: %w" ,err )
237
208
}
209
+ recvTime := time .Now ()
238
210
239
211
for _ ,ws := range update .UpsertedWorkspaces {
240
- wsID ,err := uuid .FromBytes (ws .Id )
241
- if err != nil {
242
- _ ,_ = fmt .Fprintf (logs ,"Invalid workspace ID in update: %v\n " ,err )
243
- r .cfg .Metrics .AddError (user .Username ,r .cfg .WorkspaceCount ,"bad_workspace_id" )
244
- continue
245
- }
246
-
247
- if tracking ,ok := r .workspaces .Load (ws .GetName ());ok {
248
- if tracking .updateLatency == 0 {
249
- r .workspaces .Store (ws .GetName (),& workspace {
250
- workspaceID :wsID ,
251
- buildStartTime :tracking .buildStartTime ,
252
- updateLatency :time .Since (tracking .buildStartTime ),
253
- })
254
- seenWorkspaces ++
255
- }
256
- }else if ! ok {
257
- return xerrors .Errorf ("received update for unknown workspace %q (id: %s)" ,ws .GetName (),wsID )
258
- }
212
+ seenWorkspaces [ws .Name ]= recvTime
259
213
}
260
214
261
- for _ ,agent := range update .UpsertedAgents {
262
- wsID ,err := uuid .FromBytes (agent .WorkspaceId )
263
- if err != nil {
264
- _ ,_ = fmt .Fprintf (logs ,"Invalid workspace ID in agent update: %v\n " ,err )
265
- r .cfg .Metrics .AddError (user .Username ,r .cfg .WorkspaceCount ,"bad_agent_workspace_id" )
266
- continue
267
- }
268
-
269
- if _ ,ok := agents [wsID ];! ok {
270
- agents [wsID ]= time .Now ()
271
- }
272
- }
273
-
274
- if seenWorkspaces == int (expectedWorkspaces )&& len (agents )== int (expectedWorkspaces ) {
275
- // For each workspace, record the latency from build start to
276
- // workspace update, or agent update, whichever is later.
277
- r .workspaces .Range (func (wsName string ,ws * workspace )bool {
278
- if agentTime ,ok := agents [ws .workspaceID ];ok {
279
- agentLatency := agentTime .Sub (ws .buildStartTime )
280
- if agentLatency > ws .updateLatency {
281
- // Update in-place, so GetMetrics is accurate.
282
- ws .updateLatency = agentLatency
283
- }
284
- }else {
285
- // Unreachable, recorded for debugging
286
- r .cfg .Metrics .AddError (user .Username ,r .cfg .WorkspaceCount ,"missing_agent" )
215
+ if len (seenWorkspaces )== int (expectedWorkspaces ) {
216
+ for wsName ,seenTime := range seenWorkspaces {
217
+ ws ,ok := r .workspaces [wsName ]
218
+ if ! ok {
219
+ logger .Error (ctx ,"received update for unexpected workspace" ,slog .F ("workspace" ,wsName ),slog .F ("seen_workspaces" ,seenWorkspaces ))
220
+ r .cfg .Metrics .AddError (user .Username ,r .cfg .WorkspaceCount ,"unexpected_workspace" )
221
+ return xerrors .Errorf ("received update for unexpected workspace %q" ,wsName )
287
222
}
223
+ ws .updateLatency = seenTime .Sub (ws .buildStartTime )
288
224
r .cfg .Metrics .RecordCompletion (ws .updateLatency ,user .Username ,r .cfg .WorkspaceCount ,wsName )
289
- return true
290
- })
291
- _ ,_ = fmt .Fprintf (logs ,"Updates received for all %d workspaces and agents\n " ,expectedWorkspaces )
225
+ }
226
+ logger .Info (ctx ,fmt .Sprintf ("updates received for all %d workspaces and agents" ,expectedWorkspaces ))
292
227
return nil
293
228
}
294
229
}
295
230
}
296
231
297
232
const (
298
233
WorkspaceUpdatesLatencyMetric = "workspace_updates_latency_seconds"
299
- WorkspaceUpdatesErrorsTotal = "workspace_updates_errors_total"
300
234
)
301
235
302
236
func (r * Runner )GetMetrics ()map [string ]any {
303
237
latencyMap := make (map [string ]float64 )
304
- r . workspaces . Range ( func ( wsName string ,ws * workspace ) bool {
238
+ for wsName ,ws := range r . workspaces {
305
239
latencyMap [wsName ]= ws .updateLatency .Seconds ()
306
- return true
307
- })
240
+ }
308
241
return map [string ]any {
309
- WorkspaceUpdatesErrorsTotal :r .cfg .Metrics .numErrors .Load (),
310
242
WorkspaceUpdatesLatencyMetric :latencyMap ,
311
243
}
312
244
}
313
245
314
246
func (r * Runner )Cleanup (ctx context.Context ,id string ,logs io.Writer )error {
315
- if r .cfg .NoCleanup {
316
- _ ,_ = fmt .Fprintln (logs ,"skipping cleanup" )
317
- return nil
318
- }
319
-
320
247
for i ,runner := range r .workspacebuildRunners {
321
248
if runner != nil {
322
249
_ ,_ = fmt .Fprintf (logs ,"Cleaning up workspace %d/%d...\n " ,i + 1 ,len (r .workspacebuildRunners ))
@@ -327,6 +254,7 @@ func (r *Runner) Cleanup(ctx context.Context, id string, logs io.Writer) error {
327
254
}
328
255
329
256
if r .createUserRunner != nil {
257
+ _ ,_ = fmt .Fprintln (logs ,"Cleaning up user..." )
330
258
if err := r .createUserRunner .Cleanup (ctx ,id ,logs );err != nil {
331
259
return xerrors .Errorf ("cleanup user: %w" ,err )
332
260
}