@@ -3,6 +3,7 @@ package notifications_test
3
3
import (
4
4
"context"
5
5
"strconv"
6
+ "sync"
6
7
"testing"
7
8
"time"
8
9
@@ -317,10 +318,12 @@ func TestInflightDispatchesMetric(t *testing.T) {
317
318
})
318
319
319
320
handler := & fakeHandler {}
320
- // Delayer will delay all dispatches by 2x fetch intervals to ensure we catch the requests inflight.
321
- delayer := newDelayingHandler (cfg .FetchInterval .Value ()* 2 ,handler )
321
+ const msgCount = 2
322
+
323
+ // Barrier handler will wait until all notification messages are in-flight.
324
+ barrier := newBarrierHandler (msgCount ,handler )
322
325
mgr .WithHandlers (map [database.NotificationMethod ]notifications.Handler {
323
- method :delayer ,
326
+ method :barrier ,
324
327
})
325
328
326
329
enq ,err := notifications .NewStoreEnqueuer (cfg ,api .Database ,defaultHelpers (),api .Logger .Named ("enqueuer" ),quartz .NewReal ())
@@ -329,7 +332,6 @@ func TestInflightDispatchesMetric(t *testing.T) {
329
332
user := createSampleUser (t ,api .Database )
330
333
331
334
// WHEN: notifications are enqueued which will succeed (and be delayed during dispatch)
332
- const msgCount = 2
333
335
for i := 0 ;i < msgCount ;i ++ {
334
336
_ ,err = enq .Enqueue (ctx ,user .ID ,template ,map [string ]string {"type" :"success" ,"i" :strconv .Itoa (i )},"test" )
335
337
require .NoError (t ,err )
@@ -343,6 +345,10 @@ func TestInflightDispatchesMetric(t *testing.T) {
343
345
return promtest .ToFloat64 (metrics .InflightDispatches .WithLabelValues (string (method ),template .String ()))== msgCount
344
346
},testutil .WaitShort ,testutil .IntervalFast )
345
347
348
+ for i := 0 ;i < msgCount ;i ++ {
349
+ barrier .wg .Done ()
350
+ }
351
+
346
352
// Wait until the handler has dispatched the given notifications.
347
353
require .Eventually (t ,func ()bool {
348
354
handler .mu .RLock ()
@@ -493,27 +499,30 @@ func (u *updateSignallingInterceptor) BulkMarkNotificationMessagesFailed(ctx con
493
499
return u .Store .BulkMarkNotificationMessagesFailed (ctx ,arg )
494
500
}
495
501
496
- type delayingHandler struct {
502
+ type barrierHandler struct {
497
503
h notifications.Handler
498
504
499
- delay time. Duration
505
+ wg * sync. WaitGroup
500
506
}
501
507
502
- func newDelayingHandler (delay time.Duration ,handler notifications.Handler )* delayingHandler {
503
- return & delayingHandler {
504
- delay :delay ,
505
- h :handler ,
508
+ func newBarrierHandler (total int ,handler notifications.Handler )* barrierHandler {
509
+ var wg sync.WaitGroup
510
+ wg .Add (total )
511
+
512
+ return & barrierHandler {
513
+ h :handler ,
514
+ wg :& wg ,
506
515
}
507
516
}
508
517
509
- func (d * delayingHandler )Dispatcher (payload types.MessagePayload ,title ,body string ) (dispatch.DeliveryFunc ,error ) {
510
- deliverFn ,err := d .h .Dispatcher (payload ,title ,body )
518
+ func (bh * barrierHandler )Dispatcher (payload types.MessagePayload ,title ,body string ) (dispatch.DeliveryFunc ,error ) {
519
+ deliverFn ,err := bh .h .Dispatcher (payload ,title ,body )
511
520
if err != nil {
512
521
return nil ,err
513
522
}
514
523
515
524
return func (ctx context.Context ,msgID uuid.UUID ) (retryable bool ,err error ) {
516
- time . Sleep ( d . delay )
525
+ bh . wg . Wait ( )
517
526
518
527
return deliverFn (ctx ,msgID )
519
528
},nil