|
| 1 | +package notifications |
| 2 | + |
| 3 | +import ( |
| 4 | +"context" |
| 5 | +"encoding/json" |
| 6 | +"fmt" |
| 7 | +"io" |
| 8 | +"net/http" |
| 9 | +"time" |
| 10 | + |
| 11 | +"golang.org/x/xerrors" |
| 12 | + |
| 13 | +"cdr.dev/slog" |
| 14 | +"cdr.dev/slog/sloggers/sloghuman" |
| 15 | + |
| 16 | +"github.com/coder/coder/v2/coderd/notifications" |
| 17 | +"github.com/coder/coder/v2/coderd/tracing" |
| 18 | +"github.com/coder/coder/v2/codersdk" |
| 19 | +"github.com/coder/coder/v2/scaletest/createusers" |
| 20 | +"github.com/coder/coder/v2/scaletest/harness" |
| 21 | +"github.com/coder/coder/v2/scaletest/loadtestutil" |
| 22 | +"github.com/coder/websocket" |
| 23 | +) |
| 24 | + |
| 25 | +typeRunnerstruct { |
| 26 | +client*codersdk.Client |
| 27 | +cfgConfig |
| 28 | + |
| 29 | +createUserRunner*createusers.Runner |
| 30 | + |
| 31 | +userCreatedNotificationLatency time.Duration |
| 32 | +userDeletedNotificationLatency time.Duration |
| 33 | +} |
| 34 | + |
| 35 | +funcNewRunner(client*codersdk.Client,cfgConfig)*Runner { |
| 36 | +return&Runner{ |
| 37 | +client:client, |
| 38 | +cfg:cfg, |
| 39 | +} |
| 40 | +} |
| 41 | + |
| 42 | +var ( |
| 43 | +_ harness.Runnable=&Runner{} |
| 44 | +_ harness.Cleanable=&Runner{} |
| 45 | +_ harness.Collectable=&Runner{} |
| 46 | +) |
| 47 | + |
| 48 | +func (r*Runner)Run(ctx context.Context,idstring,logs io.Writer)error { |
| 49 | +ctx,span:=tracing.StartSpan(ctx) |
| 50 | +deferspan.End() |
| 51 | + |
| 52 | +reachedBarrier:=false |
| 53 | +deferfunc() { |
| 54 | +if!reachedBarrier { |
| 55 | +r.cfg.DialBarrier.Done() |
| 56 | +} |
| 57 | +}() |
| 58 | + |
| 59 | +logs=loadtestutil.NewSyncWriter(logs) |
| 60 | +logger:=slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug) |
| 61 | +r.client.SetLogger(logger) |
| 62 | +r.client.SetLogBodies(true) |
| 63 | + |
| 64 | +r.createUserRunner=createusers.NewRunner(r.client,r.cfg.User) |
| 65 | +newUserAndToken,err:=r.createUserRunner.RunReturningUser(ctx,id,logs) |
| 66 | +iferr!=nil { |
| 67 | +r.cfg.Metrics.AddError("","create_user") |
| 68 | +returnxerrors.Errorf("create user: %w",err) |
| 69 | +} |
| 70 | +newUser:=newUserAndToken.User |
| 71 | +newUserClient:=codersdk.New(r.client.URL, |
| 72 | +codersdk.WithSessionToken(newUserAndToken.SessionToken), |
| 73 | +codersdk.WithLogger(logger), |
| 74 | +codersdk.WithLogBodies()) |
| 75 | + |
| 76 | +logger.Info(ctx,fmt.Sprintf("user %q created",newUser.Username),slog.F("id",newUser.ID.String())) |
| 77 | + |
| 78 | +ifr.cfg.IsOwner { |
| 79 | +logger.Info(ctx,"assigning Owner role to user") |
| 80 | + |
| 81 | +_,err:=r.client.UpdateUserRoles(ctx,newUser.ID.String(), codersdk.UpdateRoles{ |
| 82 | +Roles: []string{codersdk.RoleOwner}, |
| 83 | +}) |
| 84 | +iferr!=nil { |
| 85 | +r.cfg.Metrics.AddError(newUser.Username,"assign_owner_role") |
| 86 | +returnxerrors.Errorf("assign owner role: %w",err) |
| 87 | +} |
| 88 | +} |
| 89 | + |
| 90 | +logger.Info(ctx,"notification runner is ready") |
| 91 | + |
| 92 | +// We don't need to wait for notifications since we're not an owner |
| 93 | +if!r.cfg.IsOwner { |
| 94 | +reachedBarrier=true |
| 95 | +r.cfg.DialBarrier.Done() |
| 96 | +returnnil |
| 97 | +} |
| 98 | + |
| 99 | +dialCtx,cancel:=context.WithTimeout(ctx,r.cfg.DialTimeout) |
| 100 | +defercancel() |
| 101 | + |
| 102 | +logger.Info(ctx,"connecting to notification websocket") |
| 103 | +conn,err:=r.dialNotificationWebsocket(dialCtx,newUserClient,newUser,logger) |
| 104 | +iferr!=nil { |
| 105 | +returnxerrors.Errorf("dial notification websocket: %w",err) |
| 106 | +} |
| 107 | +deferconn.Close(websocket.StatusNormalClosure,"done") |
| 108 | +logger.Info(ctx,"connected to notification websocket") |
| 109 | + |
| 110 | +reachedBarrier=true |
| 111 | +r.cfg.DialBarrier.Done() |
| 112 | +r.cfg.DialBarrier.Wait() |
| 113 | + |
| 114 | +logger.Info(ctx,fmt.Sprintf("waiting up to %v for notifications...",r.cfg.NotificationTimeout)) |
| 115 | + |
| 116 | +watchCtx,cancel:=context.WithTimeout(ctx,r.cfg.NotificationTimeout) |
| 117 | +defercancel() |
| 118 | + |
| 119 | +iferr:=r.watchNotifications(watchCtx,conn,newUser,logger);err!=nil { |
| 120 | +returnxerrors.Errorf("notification watch failed: %w",err) |
| 121 | +} |
| 122 | + |
| 123 | +returnnil |
| 124 | +} |
| 125 | + |
| 126 | +func (r*Runner)Cleanup(ctx context.Context,idstring,logs io.Writer)error { |
| 127 | +ifr.createUserRunner!=nil { |
| 128 | +_,_=fmt.Fprintln(logs,"Cleaning up user...") |
| 129 | +iferr:=r.createUserRunner.Cleanup(ctx,id,logs);err!=nil { |
| 130 | +returnxerrors.Errorf("cleanup user: %w",err) |
| 131 | +} |
| 132 | +} |
| 133 | + |
| 134 | +returnnil |
| 135 | +} |
| 136 | + |
| 137 | +const ( |
| 138 | +UserCreatedNotificationLatencyMetric="user_created_notification_latency_seconds" |
| 139 | +UserDeletedNotificationLatencyMetric="user_deleted_notification_latency_seconds" |
| 140 | +) |
| 141 | + |
| 142 | +func (r*Runner)GetMetrics()map[string]any { |
| 143 | +metrics:=map[string]any{} |
| 144 | + |
| 145 | +ifr.userCreatedNotificationLatency>0 { |
| 146 | +metrics[UserCreatedNotificationLatencyMetric]=r.userCreatedNotificationLatency.Seconds() |
| 147 | +} |
| 148 | + |
| 149 | +ifr.userDeletedNotificationLatency>0 { |
| 150 | +metrics[UserDeletedNotificationLatencyMetric]=r.userDeletedNotificationLatency.Seconds() |
| 151 | +} |
| 152 | + |
| 153 | +returnmetrics |
| 154 | +} |
| 155 | + |
| 156 | +func (r*Runner)dialNotificationWebsocket(ctx context.Context,client*codersdk.Client,user codersdk.User,logger slog.Logger) (*websocket.Conn,error) { |
| 157 | +u,err:=client.URL.Parse("/api/v2/notifications/inbox/watch") |
| 158 | +iferr!=nil { |
| 159 | +logger.Error(ctx,"parse notification URL",slog.Error(err)) |
| 160 | +r.cfg.Metrics.AddError(user.Username,"parse_url") |
| 161 | +returnnil,xerrors.Errorf("parse notification URL: %w",err) |
| 162 | +} |
| 163 | + |
| 164 | +conn,resp,err:=websocket.Dial(ctx,u.String(),&websocket.DialOptions{ |
| 165 | +HTTPHeader: http.Header{ |
| 166 | +"Coder-Session-Token": []string{client.SessionToken()}, |
| 167 | +}, |
| 168 | +}) |
| 169 | +iferr!=nil { |
| 170 | +ifresp!=nil { |
| 171 | +deferresp.Body.Close() |
| 172 | +ifresp.StatusCode!=http.StatusSwitchingProtocols { |
| 173 | +err=codersdk.ReadBodyAsError(resp) |
| 174 | +} |
| 175 | +} |
| 176 | +logger.Error(ctx,"dial notification websocket",slog.Error(err)) |
| 177 | +r.cfg.Metrics.AddError(user.Username,"dial") |
| 178 | +returnnil,xerrors.Errorf("dial notification websocket: %w",err) |
| 179 | +} |
| 180 | + |
| 181 | +returnconn,nil |
| 182 | +} |
| 183 | + |
| 184 | +// watchNotifications reads notifications from the websockert and returns error or nil |
| 185 | +// once both expected notifications are received. |
| 186 | +func (r*Runner)watchNotifications(ctx context.Context,conn*websocket.Conn,user codersdk.User,logger slog.Logger)error { |
| 187 | +notificationStartTime:=time.Now() |
| 188 | +logger.Info(ctx,"waiting for notifications",slog.F("username",user.Username)) |
| 189 | + |
| 190 | +receivedCreated:=false |
| 191 | +receivedDeleted:=false |
| 192 | + |
| 193 | +// Read notifications until we have both expected types |
| 194 | +for!receivedCreated||!receivedDeleted { |
| 195 | +notif,err:=r.readNotification(ctx,conn) |
| 196 | +iferr!=nil { |
| 197 | +logger.Error(ctx,"read notification",slog.Error(err)) |
| 198 | +r.cfg.Metrics.AddError(user.Username,"read_notification") |
| 199 | +returnxerrors.Errorf("read notification: %w",err) |
| 200 | +} |
| 201 | + |
| 202 | +switchnotif.Notification.TemplateID { |
| 203 | +casenotifications.TemplateUserAccountCreated: |
| 204 | +if!receivedCreated { |
| 205 | +r.userCreatedNotificationLatency=time.Since(notificationStartTime) |
| 206 | +r.cfg.Metrics.RecordLatency(r.userCreatedNotificationLatency,user.Username,"user_created") |
| 207 | +receivedCreated=true |
| 208 | +logger.Info(ctx,"received user created notification") |
| 209 | +} |
| 210 | +casenotifications.TemplateUserAccountDeleted: |
| 211 | +if!receivedDeleted { |
| 212 | +r.userDeletedNotificationLatency=time.Since(notificationStartTime) |
| 213 | +r.cfg.Metrics.RecordLatency(r.userDeletedNotificationLatency,user.Username,"user_deleted") |
| 214 | +receivedDeleted=true |
| 215 | +logger.Info(ctx,"received user deleted notification") |
| 216 | +} |
| 217 | +default: |
| 218 | +logger.Warn(ctx,"received unexpected notification type", |
| 219 | +slog.F("template_id",notif.Notification.TemplateID), |
| 220 | +slog.F("title",notif.Notification.Title)) |
| 221 | +} |
| 222 | +} |
| 223 | +logger.Info(ctx,"received both notifications successfully") |
| 224 | +returnnil |
| 225 | +} |
| 226 | + |
| 227 | +func (*Runner)readNotification(ctx context.Context,conn*websocket.Conn) (codersdk.GetInboxNotificationResponse,error) { |
| 228 | +_,message,err:=conn.Read(ctx) |
| 229 | +iferr!=nil { |
| 230 | +return codersdk.GetInboxNotificationResponse{},err |
| 231 | +} |
| 232 | + |
| 233 | +varnotif codersdk.GetInboxNotificationResponse |
| 234 | +iferr:=json.Unmarshal(message,¬if);err!=nil { |
| 235 | +return codersdk.GetInboxNotificationResponse{},xerrors.Errorf("unmarshal notification: %w",err) |
| 236 | +} |
| 237 | + |
| 238 | +returnnotif,nil |
| 239 | +} |