@@ -221,13 +221,16 @@ func TestPendingUpdatesMetric(t *testing.T) {
221221
222222// GIVEN: a notification manager whose store updates are intercepted so we can read the number of pending updates set in the metric
223223cfg := defaultNotificationsConfig (method )
224- cfg .FetchInterval = serpent .Duration (time .Millisecond * 50 )
225224cfg .RetryInterval = serpent .Duration (time .Hour )// Delay retries so they don't interfere.
226225cfg .StoreSyncInterval = serpent .Duration (time .Millisecond * 100 )
227226
228227syncer := & syncInterceptor {Store :api .Database }
229228interceptor := newUpdateSignallingInterceptor (syncer )
230- mgr ,err := notifications .NewManager (cfg ,interceptor ,defaultHelpers (),metrics ,api .Logger .Named ("manager" ))
229+ mClock := quartz .NewMock (t )
230+ trap := mClock .Trap ().NewTicker ("Manager" ,"storeSync" )
231+ defer trap .Close ()
232+ mgr ,err := notifications .NewManager (cfg ,interceptor ,defaultHelpers (),metrics ,api .Logger .Named ("manager" ),
233+ notifications .WithTestClock (mClock ))
231234require .NoError (t ,err )
232235t .Cleanup (func () {
233236assert .NoError (t ,mgr .Stop (ctx ))
@@ -249,6 +252,7 @@ func TestPendingUpdatesMetric(t *testing.T) {
249252require .NoError (t ,err )
250253
251254mgr .Run (ctx )
255+ trap .MustWait (ctx ).Release ()// ensures ticker has been set
252256
253257// THEN:
254258// Wait until the handler has dispatched the given notifications.
@@ -259,17 +263,20 @@ func TestPendingUpdatesMetric(t *testing.T) {
259263return len (handler .succeeded )== 1 && len (handler .failed )== 1
260264},testutil .WaitShort ,testutil .IntervalFast )
261265
262- // Wait until we intercept the calls to sync the pending updates to the store.
263- success := testutil .RequireRecvCtx (testutil .Context (t ,testutil .WaitShort ),t ,interceptor .updateSuccess )
264- failure := testutil .RequireRecvCtx (testutil .Context (t ,testutil .WaitShort ),t ,interceptor .updateFailure )
265-
266- // Wait for the metric to be updated with the expected count of metrics.
266+ // Both handler calls should be pending in the metrics.
267267require .Eventually (t ,func ()bool {
268- return promtest .ToFloat64 (metrics .PendingUpdates )== float64 (success + failure )
268+ return promtest .ToFloat64 (metrics .PendingUpdates )== float64 (2 )
269269},testutil .WaitShort ,testutil .IntervalFast )
270270
271- // Unpause the interceptor so the updates can proceed.
272- interceptor .unpause ()
271+ // THEN:
272+ // Trigger syncing updates
273+ mClock .Advance (cfg .StoreSyncInterval .Value ()).MustWait (ctx )
274+
275+ // Wait until we intercept the calls to sync the pending updates to the store.
276+ success := testutil .RequireRecvCtx (testutil .Context (t ,testutil .WaitShort ),t ,interceptor .updateSuccess )
277+ require .EqualValues (t ,1 ,success )
278+ failure := testutil .RequireRecvCtx (testutil .Context (t ,testutil .WaitShort ),t ,interceptor .updateFailure )
279+ require .EqualValues (t ,1 ,failure )
273280
274281// Validate that the store synced the expected number of updates.
275282require .Eventually (t ,func ()bool {
@@ -464,43 +471,25 @@ func fingerprintLabels(lbs ...string) model.Fingerprint {
464471// signaled by the caller so it can continue.
465472type updateSignallingInterceptor struct {
466473notifications.Store
467-
468- pause chan any
469-
470474updateSuccess chan int
471475updateFailure chan int
472476}
473477
474478func newUpdateSignallingInterceptor (interceptor notifications.Store )* updateSignallingInterceptor {
475479return & updateSignallingInterceptor {
476- Store :interceptor ,
477-
478- pause :make (chan any ,1 ),
479-
480+ Store :interceptor ,
480481updateSuccess :make (chan int ,1 ),
481482updateFailure :make (chan int ,1 ),
482483}
483484}
484485
485- func (u * updateSignallingInterceptor )unpause () {
486- close (u .pause )
487- }
488-
489486func (u * updateSignallingInterceptor )BulkMarkNotificationMessagesSent (ctx context.Context ,arg database.BulkMarkNotificationMessagesSentParams ) (int64 ,error ) {
490487u .updateSuccess <- len (arg .IDs )
491-
492- // Wait until signaled so we have a chance to read the number of pending updates.
493- <- u .pause
494-
495488return u .Store .BulkMarkNotificationMessagesSent (ctx ,arg )
496489}
497490
498491func (u * updateSignallingInterceptor )BulkMarkNotificationMessagesFailed (ctx context.Context ,arg database.BulkMarkNotificationMessagesFailedParams ) (int64 ,error ) {
499492u .updateFailure <- len (arg .IDs )
500-
501- // Wait until signaled so we have a chance to read the number of pending updates.
502- <- u .pause
503-
504493return u .Store .BulkMarkNotificationMessagesFailed (ctx ,arg )
505494}
506495