@@ -6,9 +6,11 @@ import (
6
6
"fmt"
7
7
"io"
8
8
"net/http"
9
+ "sync"
9
10
"time"
10
11
11
12
"github.com/google/uuid"
13
+ "golang.org/x/sync/errgroup"
12
14
"golang.org/x/xerrors"
13
15
14
16
"cdr.dev/slog"
@@ -19,6 +21,7 @@ import (
19
21
"github.com/coder/coder/v2/scaletest/createusers"
20
22
"github.com/coder/coder/v2/scaletest/harness"
21
23
"github.com/coder/coder/v2/scaletest/loadtestutil"
24
+ "github.com/coder/coder/v2/scaletest/smtpmock"
22
25
"github.com/coder/websocket"
23
26
)
24
27
@@ -28,15 +31,21 @@ type Runner struct {
28
31
29
32
createUserRunner * createusers.Runner
30
33
31
- // notificationLatencies stores the latency for each notification type
32
- notificationLatencies map [uuid.UUID ]time.Duration
34
+ // websocketLatencies stores the latency for websocket notifications
35
+ websocketLatencies map [uuid.UUID ]time.Duration
36
+ websocketLatenciesMu sync.RWMutex
37
+
38
+ // smtpLatencies stores the latency for SMTP notifications
39
+ smtpLatencies map [uuid.UUID ]time.Duration
40
+ smtpLatenciesMu sync.RWMutex
33
41
}
34
42
35
43
func NewRunner (client * codersdk.Client ,cfg Config )* Runner {
36
44
return & Runner {
37
- client :client ,
38
- cfg :cfg ,
39
- notificationLatencies :make (map [uuid.UUID ]time.Duration ),
45
+ client :client ,
46
+ cfg :cfg ,
47
+ websocketLatencies :make (map [uuid.UUID ]time.Duration ),
48
+ smtpLatencies :make (map [uuid.UUID ]time.Duration ),
40
49
}
41
50
}
42
51
@@ -136,7 +145,20 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error {
136
145
watchCtx ,cancel := context .WithTimeout (ctx ,r .cfg .NotificationTimeout )
137
146
defer cancel ()
138
147
139
- if err := r .watchNotifications (watchCtx ,conn ,newUser ,logger ,r .cfg .ExpectedNotifications );err != nil {
148
+ eg ,egCtx := errgroup .WithContext (watchCtx )
149
+
150
+ eg .Go (func ()error {
151
+ return r .watchNotifications (egCtx ,conn ,newUser ,logger ,r .cfg .ExpectedNotifications )
152
+ })
153
+
154
+ if r .cfg .SMTPApiUrl != "" {
155
+ logger .Info (ctx ,"running SMTP notification watcher" )
156
+ eg .Go (func ()error {
157
+ return r .watchNotificationsSMTP (egCtx ,newUser ,logger ,r .cfg .ExpectedNotifications )
158
+ })
159
+ }
160
+
161
+ if err := eg .Wait ();err != nil {
140
162
return xerrors .Errorf ("notification watch failed: %w" ,err )
141
163
}
142
164
@@ -157,11 +179,29 @@ func (r *Runner) Cleanup(ctx context.Context, id string, logs io.Writer) error {
157
179
return nil
158
180
}
159
181
160
- const NotificationDeliveryLatencyMetric = "notification_delivery_latency_seconds"
182
+ const (
183
+ WebsocketNotificationLatencyMetric = "notification_websocket_latency_seconds"
184
+ SMTPNotificationLatencyMetric = "notification_smtp_latency_seconds"
185
+ )
161
186
162
187
func (r * Runner )GetMetrics ()map [string ]any {
188
+ r .websocketLatenciesMu .RLock ()
189
+ websocketLatencies := make (map [uuid.UUID ]time.Duration ,len (r .websocketLatencies ))
190
+ for id ,latency := range r .websocketLatencies {
191
+ websocketLatencies [id ]= latency
192
+ }
193
+ r .websocketLatenciesMu .RUnlock ()
194
+
195
+ r .smtpLatenciesMu .RLock ()
196
+ smtpLatencies := make (map [uuid.UUID ]time.Duration ,len (r .smtpLatencies ))
197
+ for id ,latency := range r .smtpLatencies {
198
+ smtpLatencies [id ]= latency
199
+ }
200
+ r .smtpLatenciesMu .RUnlock ()
201
+
163
202
return map [string ]any {
164
- NotificationDeliveryLatencyMetric :r .notificationLatencies ,
203
+ WebsocketNotificationLatencyMetric :websocketLatencies ,
204
+ SMTPNotificationLatencyMetric :smtpLatencies ,
165
205
}
166
206
}
167
207
@@ -228,8 +268,10 @@ func (r *Runner) watchNotifications(ctx context.Context, conn *websocket.Conn, u
228
268
select {
229
269
case triggerTime := <- triggerTimeChan :
230
270
latency := receiptTime .Sub (triggerTime )
231
- r .notificationLatencies [templateID ]= latency
232
- r .cfg .Metrics .RecordLatency (latency ,user .Username ,templateID .String ())
271
+ r .websocketLatenciesMu .Lock ()
272
+ r .websocketLatencies [templateID ]= latency
273
+ r .websocketLatenciesMu .Unlock ()
274
+ r .cfg .Metrics .RecordLatency (latency ,user .Username ,templateID .String (),NotificationTypeWebsocket )
233
275
receivedNotifications [templateID ]= struct {}{}
234
276
235
277
logger .Info (ctx ,"received expected notification" ,
@@ -248,6 +290,104 @@ func (r *Runner) watchNotifications(ctx context.Context, conn *websocket.Conn, u
248
290
}
249
291
}
250
292
293
+ const smtpPollInterval = 2 * time .Second
294
+
295
+ // watchNotificationsSMTP polls the SMTP HTTP API for notifications and returns error or nil
296
+ // once all expected notifications are received.
297
+ func (r * Runner )watchNotificationsSMTP (ctx context.Context ,user codersdk.User ,logger slog.Logger ,expectedNotifications map [uuid.UUID ]chan time.Time )error {
298
+ logger .Info (ctx ,"polling SMTP API for notifications" ,
299
+ slog .F ("username" ,user .Username ),
300
+ slog .F ("email" ,user .Email ),
301
+ slog .F ("expected_count" ,len (expectedNotifications )),
302
+ )
303
+
304
+ receivedNotifications := make (map [uuid.UUID ]struct {})
305
+ ticker := time .NewTicker (smtpPollInterval )
306
+ defer ticker .Stop ()
307
+
308
+ apiURL := fmt .Sprintf ("%s/messages?email=%s" ,r .cfg .SMTPApiUrl ,user .Email )
309
+ httpClient := & http.Client {
310
+ Timeout :10 * time .Second ,
311
+ }
312
+
313
+ for {
314
+ select {
315
+ case <- ctx .Done ():
316
+ return xerrors .Errorf ("context canceled while waiting for notifications: %w" ,ctx .Err ())
317
+ case <- ticker .C :
318
+ if len (receivedNotifications )== len (expectedNotifications ) {
319
+ logger .Info (ctx ,"received all expected notifications via SMTP" )
320
+ return nil
321
+ }
322
+
323
+ req ,err := http .NewRequestWithContext (ctx ,http .MethodGet ,apiURL ,nil )
324
+ if err != nil {
325
+ logger .Error (ctx ,"create SMTP API request" ,slog .Error (err ))
326
+ r .cfg .Metrics .AddError (user .Username ,"smtp_create_request" )
327
+ return xerrors .Errorf ("create SMTP API request: %w" ,err )
328
+ }
329
+
330
+ resp ,err := httpClient .Do (req )
331
+ if err != nil {
332
+ logger .Error (ctx ,"poll smtp api for notifications" ,slog .Error (err ))
333
+ r .cfg .Metrics .AddError (user .Username ,"smtp_poll" )
334
+ return xerrors .Errorf ("poll smtp api: %w" ,err )
335
+ }
336
+
337
+ if resp .StatusCode != http .StatusOK {
338
+ resp .Body .Close ()
339
+ logger .Error (ctx ,"smtp api returned non-200 status" ,slog .F ("status" ,resp .StatusCode ))
340
+ r .cfg .Metrics .AddError (user .Username ,"smtp_bad_status" )
341
+ return xerrors .Errorf ("smtp api returned status %d" ,resp .StatusCode )
342
+ }
343
+
344
+ var summaries []smtpmock.EmailSummary
345
+ if err := json .NewDecoder (resp .Body ).Decode (& summaries );err != nil {
346
+ resp .Body .Close ()
347
+ logger .Error (ctx ,"decode smtp api response" ,slog .Error (err ))
348
+ r .cfg .Metrics .AddError (user .Username ,"smtp_decode" )
349
+ return xerrors .Errorf ("decode smtp api response: %w" ,err )
350
+ }
351
+ resp .Body .Close ()
352
+
353
+ // Process each email summary
354
+ for _ ,summary := range summaries {
355
+ notificationID := summary .NotificationID
356
+ if notificationID == uuid .Nil {
357
+ continue
358
+ }
359
+
360
+ if triggerTimeChan ,exists := expectedNotifications [notificationID ];exists {
361
+ if _ ,received := receivedNotifications [notificationID ];! received {
362
+ receiptTime := summary .Date
363
+ if receiptTime .IsZero () {
364
+ receiptTime = time .Now ()
365
+ }
366
+
367
+ select {
368
+ case triggerTime := <- triggerTimeChan :
369
+ latency := receiptTime .Sub (triggerTime )
370
+ r .smtpLatenciesMu .Lock ()
371
+ r .smtpLatencies [notificationID ]= latency
372
+ r .smtpLatenciesMu .Unlock ()
373
+ r .cfg .Metrics .RecordLatency (latency ,user .Username ,notificationID .String (),NotificationTypeSMTP )
374
+ receivedNotifications [notificationID ]= struct {}{}
375
+
376
+ logger .Info (ctx ,"received expected notification via SMTP" ,
377
+ slog .F ("notification_id" ,notificationID ),
378
+ slog .F ("subject" ,summary .Subject ),
379
+ slog .F ("latency" ,latency ))
380
+ case <- ctx .Done ():
381
+ return xerrors .Errorf ("context canceled while waiting for trigger time: %w" ,ctx .Err ())
382
+ default :
383
+ }
384
+ }
385
+ }
386
+ }
387
+ }
388
+ }
389
+ }
390
+
251
391
func readNotification (ctx context.Context ,conn * websocket.Conn ) (codersdk.GetInboxNotificationResponse ,error ) {
252
392
_ ,message ,err := conn .Read (ctx )
253
393
if err != nil {