|
| 1 | +package notifications |
| 2 | + |
| 3 | +import ( |
| 4 | +"context" |
| 5 | +"encoding/json" |
| 6 | +"fmt" |
| 7 | +"io" |
| 8 | +"net/http" |
| 9 | +"time" |
| 10 | + |
| 11 | +"github.com/google/uuid" |
| 12 | +"golang.org/x/xerrors" |
| 13 | + |
| 14 | +"cdr.dev/slog" |
| 15 | +"cdr.dev/slog/sloggers/sloghuman" |
| 16 | + |
| 17 | +"github.com/coder/coder/v2/coderd/tracing" |
| 18 | +"github.com/coder/coder/v2/codersdk" |
| 19 | +"github.com/coder/coder/v2/scaletest/createusers" |
| 20 | +"github.com/coder/coder/v2/scaletest/harness" |
| 21 | +"github.com/coder/coder/v2/scaletest/loadtestutil" |
| 22 | +"github.com/coder/websocket" |
| 23 | +) |
| 24 | + |
| 25 | +typeRunnerstruct { |
| 26 | +client*codersdk.Client |
| 27 | +cfgConfig |
| 28 | + |
| 29 | +createUserRunner*createusers.Runner |
| 30 | + |
| 31 | +// notificationLatencies stores the latency for each notification type |
| 32 | +notificationLatenciesmap[uuid.UUID]time.Duration |
| 33 | +} |
| 34 | + |
| 35 | +funcNewRunner(client*codersdk.Client,cfgConfig)*Runner { |
| 36 | +return&Runner{ |
| 37 | +client:client, |
| 38 | +cfg:cfg, |
| 39 | +notificationLatencies:make(map[uuid.UUID]time.Duration), |
| 40 | +} |
| 41 | +} |
| 42 | + |
| 43 | +var ( |
| 44 | +_ harness.Runnable=&Runner{} |
| 45 | +_ harness.Cleanable=&Runner{} |
| 46 | +_ harness.Collectable=&Runner{} |
| 47 | +) |
| 48 | + |
| 49 | +func (r*Runner)Run(ctx context.Context,idstring,logs io.Writer)error { |
| 50 | +ctx,span:=tracing.StartSpan(ctx) |
| 51 | +deferspan.End() |
| 52 | + |
| 53 | +reachedBarrier:=false |
| 54 | +deferfunc() { |
| 55 | +if!reachedBarrier { |
| 56 | +r.cfg.DialBarrier.Done() |
| 57 | +} |
| 58 | +}() |
| 59 | + |
| 60 | +reachedReceivingWatchBarrier:=false |
| 61 | +deferfunc() { |
| 62 | +iflen(r.cfg.ExpectedNotifications)>0&&!reachedReceivingWatchBarrier { |
| 63 | +r.cfg.ReceivingWatchBarrier.Done() |
| 64 | +} |
| 65 | +}() |
| 66 | + |
| 67 | +logs=loadtestutil.NewSyncWriter(logs) |
| 68 | +logger:=slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug) |
| 69 | +r.client.SetLogger(logger) |
| 70 | +r.client.SetLogBodies(true) |
| 71 | + |
| 72 | +r.createUserRunner=createusers.NewRunner(r.client,r.cfg.User) |
| 73 | +newUserAndToken,err:=r.createUserRunner.RunReturningUser(ctx,id,logs) |
| 74 | +iferr!=nil { |
| 75 | +r.cfg.Metrics.AddError("","create_user") |
| 76 | +returnxerrors.Errorf("create user: %w",err) |
| 77 | +} |
| 78 | +newUser:=newUserAndToken.User |
| 79 | +newUserClient:=codersdk.New(r.client.URL, |
| 80 | +codersdk.WithSessionToken(newUserAndToken.SessionToken), |
| 81 | +codersdk.WithLogger(logger), |
| 82 | +codersdk.WithLogBodies()) |
| 83 | + |
| 84 | +logger.Info(ctx,"runner user created",slog.F("username",newUser.Username),slog.F("user_id",newUser.ID.String())) |
| 85 | + |
| 86 | +iflen(r.cfg.Roles)>0 { |
| 87 | +logger.Info(ctx,"assigning roles to user",slog.F("roles",r.cfg.Roles)) |
| 88 | + |
| 89 | +_,err:=r.client.UpdateUserRoles(ctx,newUser.ID.String(), codersdk.UpdateRoles{ |
| 90 | +Roles:r.cfg.Roles, |
| 91 | +}) |
| 92 | +iferr!=nil { |
| 93 | +r.cfg.Metrics.AddError(newUser.Username,"assign_roles") |
| 94 | +returnxerrors.Errorf("assign roles: %w",err) |
| 95 | +} |
| 96 | +} |
| 97 | + |
| 98 | +logger.Info(ctx,"notification runner is ready") |
| 99 | + |
| 100 | +dialCtx,cancel:=context.WithTimeout(ctx,r.cfg.DialTimeout) |
| 101 | +defercancel() |
| 102 | + |
| 103 | +logger.Info(ctx,"connecting to notification websocket") |
| 104 | +conn,err:=r.dialNotificationWebsocket(dialCtx,newUserClient,newUser,logger) |
| 105 | +iferr!=nil { |
| 106 | +returnxerrors.Errorf("dial notification websocket: %w",err) |
| 107 | +} |
| 108 | +deferconn.Close(websocket.StatusNormalClosure,"done") |
| 109 | +logger.Info(ctx,"connected to notification websocket") |
| 110 | + |
| 111 | +reachedBarrier=true |
| 112 | +r.cfg.DialBarrier.Done() |
| 113 | +r.cfg.DialBarrier.Wait() |
| 114 | + |
| 115 | +iflen(r.cfg.ExpectedNotifications)==0 { |
| 116 | +logger.Info(ctx,"maintaining websocket connection, waiting for receiving users to complete") |
| 117 | + |
| 118 | +// Wait for receiving users to complete |
| 119 | +done:=make(chanstruct{}) |
| 120 | +gofunc() { |
| 121 | +r.cfg.ReceivingWatchBarrier.Wait() |
| 122 | +close(done) |
| 123 | +}() |
| 124 | + |
| 125 | +select { |
| 126 | +case<-done: |
| 127 | +logger.Info(ctx,"receiving users complete, closing connection") |
| 128 | +case<-ctx.Done(): |
| 129 | +logger.Info(ctx,"context canceled, closing connection") |
| 130 | +} |
| 131 | +returnnil |
| 132 | +} |
| 133 | + |
| 134 | +logger.Info(ctx,"waiting for notifications",slog.F("timeout",r.cfg.NotificationTimeout)) |
| 135 | + |
| 136 | +watchCtx,cancel:=context.WithTimeout(ctx,r.cfg.NotificationTimeout) |
| 137 | +defercancel() |
| 138 | + |
| 139 | +iferr:=r.watchNotifications(watchCtx,conn,newUser,logger,r.cfg.ExpectedNotifications);err!=nil { |
| 140 | +returnxerrors.Errorf("notification watch failed: %w",err) |
| 141 | +} |
| 142 | + |
| 143 | +reachedReceivingWatchBarrier=true |
| 144 | +r.cfg.ReceivingWatchBarrier.Done() |
| 145 | + |
| 146 | +returnnil |
| 147 | +} |
| 148 | + |
| 149 | +func (r*Runner)Cleanup(ctx context.Context,idstring,logs io.Writer)error { |
| 150 | +ifr.createUserRunner!=nil { |
| 151 | +_,_=fmt.Fprintln(logs,"Cleaning up user...") |
| 152 | +iferr:=r.createUserRunner.Cleanup(ctx,id,logs);err!=nil { |
| 153 | +returnxerrors.Errorf("cleanup user: %w",err) |
| 154 | +} |
| 155 | +} |
| 156 | + |
| 157 | +returnnil |
| 158 | +} |
| 159 | + |
| 160 | +constNotificationDeliveryLatencyMetric="notification_delivery_latency_seconds" |
| 161 | + |
| 162 | +func (r*Runner)GetMetrics()map[string]any { |
| 163 | +returnmap[string]any{ |
| 164 | +NotificationDeliveryLatencyMetric:r.notificationLatencies, |
| 165 | +} |
| 166 | +} |
| 167 | + |
| 168 | +func (r*Runner)dialNotificationWebsocket(ctx context.Context,client*codersdk.Client,user codersdk.User,logger slog.Logger) (*websocket.Conn,error) { |
| 169 | +u,err:=client.URL.Parse("/api/v2/notifications/inbox/watch") |
| 170 | +iferr!=nil { |
| 171 | +logger.Error(ctx,"parse notification URL",slog.Error(err)) |
| 172 | +r.cfg.Metrics.AddError(user.Username,"parse_url") |
| 173 | +returnnil,xerrors.Errorf("parse notification URL: %w",err) |
| 174 | +} |
| 175 | + |
| 176 | +conn,resp,err:=websocket.Dial(ctx,u.String(),&websocket.DialOptions{ |
| 177 | +HTTPHeader: http.Header{ |
| 178 | +"Coder-Session-Token": []string{client.SessionToken()}, |
| 179 | +}, |
| 180 | +}) |
| 181 | +iferr!=nil { |
| 182 | +ifresp!=nil { |
| 183 | +deferresp.Body.Close() |
| 184 | +ifresp.StatusCode!=http.StatusSwitchingProtocols { |
| 185 | +err=codersdk.ReadBodyAsError(resp) |
| 186 | +} |
| 187 | +} |
| 188 | +logger.Error(ctx,"dial notification websocket",slog.Error(err)) |
| 189 | +r.cfg.Metrics.AddError(user.Username,"dial") |
| 190 | +returnnil,xerrors.Errorf("dial notification websocket: %w",err) |
| 191 | +} |
| 192 | + |
| 193 | +returnconn,nil |
| 194 | +} |
| 195 | + |
| 196 | +// watchNotifications reads notifications from the websocket and returns error or nil |
| 197 | +// once all expected notifications are received. |
| 198 | +func (r*Runner)watchNotifications(ctx context.Context,conn*websocket.Conn,user codersdk.User,logger slog.Logger,expectedNotificationsmap[uuid.UUID]chan time.Time)error { |
| 199 | +logger.Info(ctx,"waiting for notifications", |
| 200 | +slog.F("username",user.Username), |
| 201 | +slog.F("expected_count",len(expectedNotifications))) |
| 202 | + |
| 203 | +receivedNotifications:=make(map[uuid.UUID]struct{}) |
| 204 | + |
| 205 | +for { |
| 206 | +select { |
| 207 | +case<-ctx.Done(): |
| 208 | +returnxerrors.Errorf("context canceled while waiting for notifications: %w",ctx.Err()) |
| 209 | +default: |
| 210 | +} |
| 211 | + |
| 212 | +iflen(receivedNotifications)==len(expectedNotifications) { |
| 213 | +logger.Info(ctx,"received all expected notifications") |
| 214 | +returnnil |
| 215 | +} |
| 216 | + |
| 217 | +notif,err:=readNotification(ctx,conn) |
| 218 | +iferr!=nil { |
| 219 | +logger.Error(ctx,"read notification",slog.Error(err)) |
| 220 | +r.cfg.Metrics.AddError(user.Username,"read_notification") |
| 221 | +returnxerrors.Errorf("read notification: %w",err) |
| 222 | +} |
| 223 | + |
| 224 | +templateID:=notif.Notification.TemplateID |
| 225 | +iftriggerTimeChan,exists:=expectedNotifications[templateID];exists { |
| 226 | +if_,exists:=receivedNotifications[templateID];!exists { |
| 227 | +receiptTime:=time.Now() |
| 228 | +select { |
| 229 | +casetriggerTime:=<-triggerTimeChan: |
| 230 | +latency:=receiptTime.Sub(triggerTime) |
| 231 | +r.notificationLatencies[templateID]=latency |
| 232 | +r.cfg.Metrics.RecordLatency(latency,user.Username,templateID.String()) |
| 233 | +receivedNotifications[templateID]=struct{}{} |
| 234 | + |
| 235 | +logger.Info(ctx,"received expected notification", |
| 236 | +slog.F("template_id",templateID), |
| 237 | +slog.F("title",notif.Notification.Title), |
| 238 | +slog.F("latency",latency)) |
| 239 | +case<-ctx.Done(): |
| 240 | +returnxerrors.Errorf("context canceled while waiting for trigger time: %w",ctx.Err()) |
| 241 | +} |
| 242 | +} |
| 243 | +}else { |
| 244 | +logger.Debug(ctx,"received notification not being tested", |
| 245 | +slog.F("template_id",templateID), |
| 246 | +slog.F("title",notif.Notification.Title)) |
| 247 | +} |
| 248 | +} |
| 249 | +} |
| 250 | + |
| 251 | +funcreadNotification(ctx context.Context,conn*websocket.Conn) (codersdk.GetInboxNotificationResponse,error) { |
| 252 | +_,message,err:=conn.Read(ctx) |
| 253 | +iferr!=nil { |
| 254 | +return codersdk.GetInboxNotificationResponse{},err |
| 255 | +} |
| 256 | + |
| 257 | +varnotif codersdk.GetInboxNotificationResponse |
| 258 | +iferr:=json.Unmarshal(message,¬if);err!=nil { |
| 259 | +return codersdk.GetInboxNotificationResponse{},xerrors.Errorf("unmarshal notification: %w",err) |
| 260 | +} |
| 261 | + |
| 262 | +returnnotif,nil |
| 263 | +} |