@@ -71,35 +71,34 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error {
71
71
r .client .SetLogBodies (true )
72
72
73
73
r .createUserRunner = createusers .NewRunner (r .client ,r .cfg .User )
74
- newUser ,err := r .createUserRunner .RunReturningUser (ctx ,id ,logs )
74
+ newUserAndToken ,err := r .createUserRunner .RunReturningUser (ctx ,id ,logs )
75
75
if err != nil {
76
76
return xerrors .Errorf ("create user: %w" ,err )
77
77
}
78
- user := newUser .User
79
- client := codersdk .New (r .client .URL ,
80
- codersdk .WithSessionToken (newUser .SessionToken ),
78
+ newUser := newUserAndToken .User
79
+ newUserClient := codersdk .New (r .client .URL ,
80
+ codersdk .WithSessionToken (newUserAndToken .SessionToken ),
81
81
codersdk .WithLogger (logger ),
82
82
codersdk .WithLogBodies ())
83
83
84
- logger .Info (ctx ,fmt .Sprintf ("user %q created" ,user .Username ),slog .F ("id" ,user .ID .String ()))
84
+ logger .Info (ctx ,fmt .Sprintf ("user %q created" ,newUser .Username ),slog .F ("id" ,newUser .ID .String ()))
85
85
86
86
dialCtx ,cancel := context .WithTimeout (ctx ,r .cfg .DialTimeout )
87
87
defer cancel ()
88
88
89
89
logger .Info (ctx ,"connecting to workspace updates stream" )
90
- clients ,err := r .dialTailnet (dialCtx ,client , user ,logger )
90
+ clients ,err := r .dialTailnet (dialCtx ,newUserClient , newUser ,logger )
91
91
if err != nil {
92
92
return xerrors .Errorf ("tailnet dial failed: %w" ,err )
93
93
}
94
94
logger .Info (ctx ,"connected to workspace updates stream" )
95
- defer clients .Closer .Close ()
96
95
97
96
watchCtx ,cancelWatch := context .WithCancel (ctx )
98
97
defer cancelWatch ()
99
98
100
99
completionCh := make (chan error ,1 )
101
100
go func () {
102
- completionCh <- r .watchWorkspaceUpdates (watchCtx ,clients ,user ,logger )
101
+ completionCh <- r .watchWorkspaceUpdates (watchCtx ,clients ,newUser ,logger )
103
102
}()
104
103
105
104
reachedBarrier = true
@@ -110,14 +109,15 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error {
110
109
for i := range r .cfg .WorkspaceCount {
111
110
workspaceName ,err := loadtestutil .GenerateWorkspaceName (id )
112
111
if err != nil {
112
+ clients .Closer .Close ()
113
113
return xerrors .Errorf ("generate random name for workspace: %w" ,err )
114
114
}
115
115
workspaceBuildConfig := r .cfg .Workspace
116
116
workspaceBuildConfig .OrganizationID = r .cfg .User .OrganizationID
117
- workspaceBuildConfig .UserID = user .ID .String ()
117
+ workspaceBuildConfig .UserID = newUser .ID .String ()
118
118
workspaceBuildConfig .Request .Name = workspaceName
119
119
120
- runner := workspacebuild .NewRunner (client ,workspaceBuildConfig )
120
+ runner := workspacebuild .NewRunner (newUserClient ,workspaceBuildConfig )
121
121
r .workspacebuildRunners = append (r .workspacebuildRunners ,runner )
122
122
123
123
logger .Info (ctx ,fmt .Sprintf ("creating workspace %d/%d" ,i + 1 ,r .cfg .WorkspaceCount ))
@@ -128,6 +128,7 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error {
128
128
}
129
129
err = runner .Run (ctx ,fmt .Sprintf ("%s-%d" ,id ,i ),logs )
130
130
if err != nil {
131
+ clients .Closer .Close ()
131
132
return xerrors .Errorf ("create workspace %d: %w" ,i ,err )
132
133
}
133
134
}
@@ -143,8 +144,11 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error {
143
144
return xerrors .Errorf ("workspace updates streaming failed: %w" ,err )
144
145
}
145
146
logger .Info (ctx ,"workspace updates streaming completed successfully" )
147
+ clients .Closer .Close ()
146
148
return nil
147
149
case <- waitUpdatesCtx .Done ():
150
+ clients .Closer .Close ()
151
+ <- completionCh // ensure watch goroutine exits
148
152
if waitUpdatesCtx .Err ()== context .DeadlineExceeded {
149
153
return xerrors .Errorf ("timeout waiting for workspace updates after %v" ,r .cfg .WorkspaceUpdatesTimeout )
150
154
}