@@ -71,35 +71,35 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error {
71
71
r .client .SetLogBodies (true )
72
72
73
73
r .createUserRunner = createusers .NewRunner (r .client ,r .cfg .User )
74
- newUser ,err := r .createUserRunner .RunReturningUser (ctx ,id ,logs )
74
+ newUserAndToken ,err := r .createUserRunner .RunReturningUser (ctx ,id ,logs )
75
75
if err != nil {
76
76
return xerrors .Errorf ("create user: %w" ,err )
77
77
}
78
- user := newUser .User
79
- client := codersdk .New (r .client .URL ,
80
- codersdk .WithSessionToken (newUser .SessionToken ),
78
+ newUser := newUserAndToken .User
79
+ newUserClient := codersdk .New (r .client .URL ,
80
+ codersdk .WithSessionToken (newUserAndToken .SessionToken ),
81
81
codersdk .WithLogger (logger ),
82
82
codersdk .WithLogBodies ())
83
83
84
- logger .Info (ctx ,fmt .Sprintf ("user %q created" ,user .Username ),slog .F ("id" ,user .ID .String ()))
84
+ logger .Info (ctx ,fmt .Sprintf ("user %q created" ,newUser .Username ),slog .F ("id" ,newUser .ID .String ()))
85
85
86
86
dialCtx ,cancel := context .WithTimeout (ctx ,r .cfg .DialTimeout )
87
87
defer cancel ()
88
88
89
89
logger .Info (ctx ,"connecting to workspace updates stream" )
90
- clients ,err := r .dialTailnet (dialCtx ,client , user ,logger )
90
+ clients ,err := r .dialTailnet (dialCtx ,newUserClient , newUser ,logger )
91
91
if err != nil {
92
92
return xerrors .Errorf ("tailnet dial failed: %w" ,err )
93
93
}
94
- logger .Info (ctx ,"connected to workspace updates stream" )
95
94
defer clients .Closer .Close ()
95
+ logger .Info (ctx ,"connected to workspace updates stream" )
96
96
97
97
watchCtx ,cancelWatch := context .WithCancel (ctx )
98
98
defer cancelWatch ()
99
99
100
100
completionCh := make (chan error ,1 )
101
101
go func () {
102
- completionCh <- r .watchWorkspaceUpdates (watchCtx ,clients ,user ,logger )
102
+ completionCh <- r .watchWorkspaceUpdates (watchCtx ,clients ,newUser ,logger )
103
103
}()
104
104
105
105
reachedBarrier = true
@@ -114,10 +114,10 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error {
114
114
}
115
115
workspaceBuildConfig := r .cfg .Workspace
116
116
workspaceBuildConfig .OrganizationID = r .cfg .User .OrganizationID
117
- workspaceBuildConfig .UserID = user .ID .String ()
117
+ workspaceBuildConfig .UserID = newUser .ID .String ()
118
118
workspaceBuildConfig .Request .Name = workspaceName
119
119
120
- runner := workspacebuild .NewRunner (client ,workspaceBuildConfig )
120
+ runner := workspacebuild .NewRunner (newUserClient ,workspaceBuildConfig )
121
121
r .workspacebuildRunners = append (r .workspacebuildRunners ,runner )
122
122
123
123
logger .Info (ctx ,fmt .Sprintf ("creating workspace %d/%d" ,i + 1 ,r .cfg .WorkspaceCount ))
@@ -145,6 +145,9 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error {
145
145
logger .Info (ctx ,"workspace updates streaming completed successfully" )
146
146
return nil
147
147
case <- waitUpdatesCtx .Done ():
148
+ cancelWatch ()
149
+ clients .Closer .Close ()
150
+ <- completionCh // ensure watch goroutine exits
148
151
if waitUpdatesCtx .Err ()== context .DeadlineExceeded {
149
152
return xerrors .Errorf ("timeout waiting for workspace updates after %v" ,r .cfg .WorkspaceUpdatesTimeout )
150
153
}
@@ -214,6 +217,10 @@ func (r *Runner) watchWorkspaceUpdates(ctx context.Context, clients *tailnet.Con
214
217
215
218
if len (seenWorkspaces )== int (expectedWorkspaces ) {
216
219
for wsName ,seenTime := range seenWorkspaces {
220
+ // We only receive workspace updates for those that we built.
221
+ // If we received a workspace update for a workspace we didn't build,
222
+ // we're risking racing with the code that writes workspace
223
+ // build start times to this map.
217
224
ws ,ok := r .workspaces [wsName ]
218
225
if ! ok {
219
226
logger .Error (ctx ,"received update for unexpected workspace" ,slog .F ("workspace" ,wsName ),slog .F ("seen_workspaces" ,seenWorkspaces ))