|
| 1 | +package workspaceupdates |
| 2 | + |
| 3 | +import ( |
| 4 | +"context" |
| 5 | +"fmt" |
| 6 | +"io" |
| 7 | +"net/http" |
| 8 | +"time" |
| 9 | + |
| 10 | +"golang.org/x/xerrors" |
| 11 | + |
| 12 | +"github.com/coder/websocket" |
| 13 | + |
| 14 | +"cdr.dev/slog" |
| 15 | +"cdr.dev/slog/sloggers/sloghuman" |
| 16 | +"github.com/coder/coder/v2/coderd/tracing" |
| 17 | +"github.com/coder/coder/v2/codersdk" |
| 18 | +"github.com/coder/coder/v2/codersdk/workspacesdk" |
| 19 | +"github.com/coder/coder/v2/scaletest/createusers" |
| 20 | +"github.com/coder/coder/v2/scaletest/harness" |
| 21 | +"github.com/coder/coder/v2/scaletest/loadtestutil" |
| 22 | +"github.com/coder/coder/v2/scaletest/workspacebuild" |
| 23 | +"github.com/coder/coder/v2/tailnet" |
| 24 | +tailnetproto"github.com/coder/coder/v2/tailnet/proto" |
| 25 | +) |
| 26 | + |
| 27 | +typeRunnerstruct { |
| 28 | +client*codersdk.Client |
| 29 | +cfgConfig |
| 30 | + |
| 31 | +createUserRunner*createusers.Runner |
| 32 | +workspacebuildRunners []*workspacebuild.Runner |
| 33 | + |
| 34 | +// workspace name to workspace |
| 35 | +workspacesmap[string]*workspace |
| 36 | +} |
| 37 | + |
| 38 | +typeworkspacestruct { |
| 39 | +buildStartTime time.Time |
| 40 | +updateLatency time.Duration |
| 41 | +} |
| 42 | + |
| 43 | +var ( |
| 44 | +_ harness.Runnable=&Runner{} |
| 45 | +_ harness.Cleanable=&Runner{} |
| 46 | +_ harness.Collectable=&Runner{} |
| 47 | +) |
| 48 | + |
| 49 | +funcNewRunner(client*codersdk.Client,cfgConfig)*Runner { |
| 50 | +return&Runner{ |
| 51 | +client:client, |
| 52 | +cfg:cfg, |
| 53 | +workspaces:make(map[string]*workspace), |
| 54 | +} |
| 55 | +} |
| 56 | + |
| 57 | +func (r*Runner)Run(ctx context.Context,idstring,logs io.Writer)error { |
| 58 | +ctx,span:=tracing.StartSpan(ctx) |
| 59 | +deferspan.End() |
| 60 | + |
| 61 | +reachedBarrier:=false |
| 62 | +deferfunc() { |
| 63 | +if!reachedBarrier { |
| 64 | +r.cfg.DialBarrier.Done() |
| 65 | +} |
| 66 | +}() |
| 67 | + |
| 68 | +logs=loadtestutil.NewSyncWriter(logs) |
| 69 | +logger:=slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug) |
| 70 | +r.client.SetLogger(logger) |
| 71 | +r.client.SetLogBodies(true) |
| 72 | + |
| 73 | +r.createUserRunner=createusers.NewRunner(r.client,r.cfg.User) |
| 74 | +newUserAndToken,err:=r.createUserRunner.RunReturningUser(ctx,id,logs) |
| 75 | +iferr!=nil { |
| 76 | +returnxerrors.Errorf("create user: %w",err) |
| 77 | +} |
| 78 | +newUser:=newUserAndToken.User |
| 79 | +newUserClient:=codersdk.New(r.client.URL, |
| 80 | +codersdk.WithSessionToken(newUserAndToken.SessionToken), |
| 81 | +codersdk.WithLogger(logger), |
| 82 | +codersdk.WithLogBodies()) |
| 83 | + |
| 84 | +logger.Info(ctx,fmt.Sprintf("user %q created",newUser.Username),slog.F("id",newUser.ID.String())) |
| 85 | + |
| 86 | +dialCtx,cancel:=context.WithTimeout(ctx,r.cfg.DialTimeout) |
| 87 | +defercancel() |
| 88 | + |
| 89 | +logger.Info(ctx,"connecting to workspace updates stream") |
| 90 | +clients,err:=r.dialTailnet(dialCtx,newUserClient,newUser,logger) |
| 91 | +iferr!=nil { |
| 92 | +returnxerrors.Errorf("tailnet dial failed: %w",err) |
| 93 | +} |
| 94 | +deferclients.Closer.Close() |
| 95 | +logger.Info(ctx,"connected to workspace updates stream") |
| 96 | + |
| 97 | +watchCtx,cancelWatch:=context.WithCancel(ctx) |
| 98 | +defercancelWatch() |
| 99 | + |
| 100 | +completionCh:=make(chanerror,1) |
| 101 | +gofunc() { |
| 102 | +completionCh<-r.watchWorkspaceUpdates(watchCtx,clients,newUser,logger) |
| 103 | +}() |
| 104 | + |
| 105 | +reachedBarrier=true |
| 106 | +r.cfg.DialBarrier.Done() |
| 107 | +r.cfg.DialBarrier.Wait() |
| 108 | + |
| 109 | +r.workspacebuildRunners=make([]*workspacebuild.Runner,0,r.cfg.WorkspaceCount) |
| 110 | +fori:=ranger.cfg.WorkspaceCount { |
| 111 | +workspaceName,err:=loadtestutil.GenerateWorkspaceName(id) |
| 112 | +iferr!=nil { |
| 113 | +returnxerrors.Errorf("generate random name for workspace: %w",err) |
| 114 | +} |
| 115 | +workspaceBuildConfig:=r.cfg.Workspace |
| 116 | +workspaceBuildConfig.OrganizationID=r.cfg.User.OrganizationID |
| 117 | +workspaceBuildConfig.UserID=newUser.ID.String() |
| 118 | +workspaceBuildConfig.Request.Name=workspaceName |
| 119 | + |
| 120 | +runner:=workspacebuild.NewRunner(newUserClient,workspaceBuildConfig) |
| 121 | +r.workspacebuildRunners=append(r.workspacebuildRunners,runner) |
| 122 | + |
| 123 | +logger.Info(ctx,fmt.Sprintf("creating workspace %d/%d",i+1,r.cfg.WorkspaceCount)) |
| 124 | + |
| 125 | +// Record build start time before running the workspace build |
| 126 | +r.workspaces[workspaceName]=&workspace{ |
| 127 | +buildStartTime:time.Now(), |
| 128 | +} |
| 129 | +err=runner.Run(ctx,fmt.Sprintf("%s-%d",id,i),logs) |
| 130 | +iferr!=nil { |
| 131 | +returnxerrors.Errorf("create workspace %d: %w",i,err) |
| 132 | +} |
| 133 | +} |
| 134 | + |
| 135 | +logger.Info(ctx,fmt.Sprintf("waiting up to %v for workspace updates to complete...",r.cfg.WorkspaceUpdatesTimeout)) |
| 136 | + |
| 137 | +waitUpdatesCtx,cancel:=context.WithTimeout(ctx,r.cfg.WorkspaceUpdatesTimeout) |
| 138 | +defercancel() |
| 139 | + |
| 140 | +select { |
| 141 | +caseerr:=<-completionCh: |
| 142 | +iferr!=nil { |
| 143 | +returnxerrors.Errorf("workspace updates streaming failed: %w",err) |
| 144 | +} |
| 145 | +logger.Info(ctx,"workspace updates streaming completed successfully") |
| 146 | +returnnil |
| 147 | +case<-waitUpdatesCtx.Done(): |
| 148 | +cancelWatch() |
| 149 | +clients.Closer.Close() |
| 150 | +<-completionCh// ensure watch goroutine exits |
| 151 | +ifwaitUpdatesCtx.Err()==context.DeadlineExceeded { |
| 152 | +returnxerrors.Errorf("timeout waiting for workspace updates after %v",r.cfg.WorkspaceUpdatesTimeout) |
| 153 | +} |
| 154 | +returnwaitUpdatesCtx.Err() |
| 155 | +} |
| 156 | +} |
| 157 | + |
| 158 | +func (r*Runner)dialTailnet(ctx context.Context,client*codersdk.Client,user codersdk.User,logger slog.Logger) (*tailnet.ControlProtocolClients,error) { |
| 159 | +u,err:=client.URL.Parse("/api/v2/tailnet") |
| 160 | +iferr!=nil { |
| 161 | +logger.Error(ctx,"failed to parse tailnet URL",slog.Error(err)) |
| 162 | +r.cfg.Metrics.AddError(user.Username,r.cfg.WorkspaceCount,"parse_url") |
| 163 | +returnnil,xerrors.Errorf("parse tailnet URL: %w",err) |
| 164 | +} |
| 165 | + |
| 166 | +dialer:=workspacesdk.NewWebsocketDialer( |
| 167 | +logger, |
| 168 | +u, |
| 169 | +&websocket.DialOptions{ |
| 170 | +HTTPHeader: http.Header{ |
| 171 | +"Coder-Session-Token": []string{client.SessionToken()}, |
| 172 | +}, |
| 173 | +}, |
| 174 | +workspacesdk.WithWorkspaceUpdates(&tailnetproto.WorkspaceUpdatesRequest{ |
| 175 | +WorkspaceOwnerId:tailnet.UUIDToByteSlice(user.ID), |
| 176 | +}), |
| 177 | +) |
| 178 | + |
| 179 | +clients,err:=dialer.Dial(ctx,nil) |
| 180 | +iferr!=nil { |
| 181 | +logger.Error(ctx,"failed to dial workspace updates",slog.Error(err)) |
| 182 | +r.cfg.Metrics.AddError(user.Username,r.cfg.WorkspaceCount,"dial") |
| 183 | +returnnil,xerrors.Errorf("dial workspace updates: %w",err) |
| 184 | +} |
| 185 | + |
| 186 | +return&clients,nil |
| 187 | +} |
| 188 | + |
| 189 | +// watchWorkspaceUpdates processes workspace updates and returns error or nil |
| 190 | +// once all expected workspaces and agents are seen. |
| 191 | +func (r*Runner)watchWorkspaceUpdates(ctx context.Context,clients*tailnet.ControlProtocolClients,user codersdk.User,logger slog.Logger)error { |
| 192 | +expectedWorkspaces:=r.cfg.WorkspaceCount |
| 193 | +// workspace name to time the update was seen |
| 194 | +seenWorkspaces:=make(map[string]time.Time) |
| 195 | + |
| 196 | +logger.Info(ctx,fmt.Sprintf("waiting for %d workspaces and their agents",expectedWorkspaces)) |
| 197 | +for { |
| 198 | +select { |
| 199 | +case<-ctx.Done(): |
| 200 | +logger.Error(ctx,"context canceled while waiting for workspace updates",slog.Error(ctx.Err())) |
| 201 | +r.cfg.Metrics.AddError(user.Username,r.cfg.WorkspaceCount,"context_done") |
| 202 | +returnctx.Err() |
| 203 | +default: |
| 204 | +} |
| 205 | + |
| 206 | +update,err:=clients.WorkspaceUpdates.Recv() |
| 207 | +iferr!=nil { |
| 208 | +logger.Error(ctx,"workspace updates stream error",slog.Error(err)) |
| 209 | +r.cfg.Metrics.AddError(user.Username,r.cfg.WorkspaceCount,"recv") |
| 210 | +returnxerrors.Errorf("receive workspace update: %w",err) |
| 211 | +} |
| 212 | +recvTime:=time.Now() |
| 213 | + |
| 214 | +for_,ws:=rangeupdate.UpsertedWorkspaces { |
| 215 | +seenWorkspaces[ws.Name]=recvTime |
| 216 | +} |
| 217 | + |
| 218 | +iflen(seenWorkspaces)==int(expectedWorkspaces) { |
| 219 | +forwsName,seenTime:=rangeseenWorkspaces { |
| 220 | +// We only receive workspace updates for those that we built. |
| 221 | +// If we received a workspace update for a workspace we didn't build, |
| 222 | +// we're risking racing with the code that writes workspace |
| 223 | +// build start times to this map. |
| 224 | +ws,ok:=r.workspaces[wsName] |
| 225 | +if!ok { |
| 226 | +logger.Error(ctx,"received update for unexpected workspace",slog.F("workspace",wsName),slog.F("seen_workspaces",seenWorkspaces)) |
| 227 | +r.cfg.Metrics.AddError(user.Username,r.cfg.WorkspaceCount,"unexpected_workspace") |
| 228 | +returnxerrors.Errorf("received update for unexpected workspace %q",wsName) |
| 229 | +} |
| 230 | +ws.updateLatency=seenTime.Sub(ws.buildStartTime) |
| 231 | +r.cfg.Metrics.RecordCompletion(ws.updateLatency,user.Username,r.cfg.WorkspaceCount,wsName) |
| 232 | +} |
| 233 | +logger.Info(ctx,fmt.Sprintf("updates received for all %d workspaces and agents",expectedWorkspaces)) |
| 234 | +returnnil |
| 235 | +} |
| 236 | +} |
| 237 | +} |
| 238 | + |
| 239 | +const ( |
| 240 | +WorkspaceUpdatesLatencyMetric="workspace_updates_latency_seconds" |
| 241 | +) |
| 242 | + |
| 243 | +func (r*Runner)GetMetrics()map[string]any { |
| 244 | +latencyMap:=make(map[string]float64) |
| 245 | +forwsName,ws:=ranger.workspaces { |
| 246 | +latencyMap[wsName]=ws.updateLatency.Seconds() |
| 247 | +} |
| 248 | +returnmap[string]any{ |
| 249 | +WorkspaceUpdatesLatencyMetric:latencyMap, |
| 250 | +} |
| 251 | +} |
| 252 | + |
| 253 | +func (r*Runner)Cleanup(ctx context.Context,idstring,logs io.Writer)error { |
| 254 | +fori,runner:=ranger.workspacebuildRunners { |
| 255 | +ifrunner!=nil { |
| 256 | +_,_=fmt.Fprintf(logs,"Cleaning up workspace %d/%d...\n",i+1,len(r.workspacebuildRunners)) |
| 257 | +iferr:=runner.Cleanup(ctx,fmt.Sprintf("%s-%d",id,i),logs);err!=nil { |
| 258 | +returnxerrors.Errorf("cleanup workspace %d: %w",i,err) |
| 259 | +} |
| 260 | +} |
| 261 | +} |
| 262 | + |
| 263 | +ifr.createUserRunner!=nil { |
| 264 | +_,_=fmt.Fprintln(logs,"Cleaning up user...") |
| 265 | +iferr:=r.createUserRunner.Cleanup(ctx,id,logs);err!=nil { |
| 266 | +returnxerrors.Errorf("cleanup user: %w",err) |
| 267 | +} |
| 268 | +} |
| 269 | + |
| 270 | +returnnil |
| 271 | +} |