@@ -44,7 +44,6 @@ type Manager struct {
4444store Store
4545log slog.Logger
4646
47- notifier * notifier
4847handlers map [database.NotificationMethod ]Handler
4948method database.NotificationMethod
5049helpers template.FuncMap
@@ -53,11 +52,13 @@ type Manager struct {
5352
5453success ,failure chan dispatchResult
5554
56- runOnce sync.Once
57- stopOnce sync.Once
58- doneOnce sync.Once
59- stop chan any
60- done chan any
55+ mu sync.Mutex // Protects following.
56+ closed bool
57+ notifier * notifier
58+
59+ runOnce sync.Once
60+ stop chan any
61+ done chan any
6162
6263// clock is for testing only
6364clock quartz.Clock
@@ -138,7 +139,7 @@ func (m *Manager) WithHandlers(reg map[database.NotificationMethod]Handler) {
138139// Manager requires system-level permissions to interact with the store.
139140// Run is only intended to be run once.
140141func (m * Manager )Run (ctx context.Context ) {
141- m .log .Info (ctx ,"started" )
142+ m .log .Debug (ctx ,"notification manager started" )
142143
143144m .runOnce .Do (func () {
144145// Closes when Stop() is called or context is canceled.
@@ -155,31 +156,26 @@ func (m *Manager) Run(ctx context.Context) {
155156// events, creating a notifier, and publishing bulk dispatch result updates to the store.
156157func (m * Manager )loop (ctx context.Context )error {
157158defer func () {
158- m .doneOnce .Do (func () {
159- close (m .done )
160- })
161- m .log .Info (context .Background (),"notification manager stopped" )
159+ close (m .done )
160+ m .log .Debug (context .Background (),"notification manager stopped" )
162161}()
163162
164- // Caught a terminal signal before notifier was created, exit immediately.
165- select {
166- case <- m .stop :
167- m .log .Warn (ctx ,"gracefully stopped" )
168- return xerrors .Errorf ("gracefully stopped" )
169- case <- ctx .Done ():
170- m .log .Error (ctx ,"ungracefully stopped" ,slog .Error (ctx .Err ()))
171- return xerrors .Errorf ("notifications: %w" ,ctx .Err ())
172- default :
163+ m .mu .Lock ()
164+ if m .closed {
165+ m .mu .Unlock ()
166+ return xerrors .New ("manager already closed" )
173167}
174168
175169var eg errgroup.Group
176170
177- // Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
178171m .notifier = newNotifier (ctx ,m .cfg ,uuid .New (),m .log ,m .store ,m .handlers ,m .helpers ,m .metrics ,m .clock )
179172eg .Go (func ()error {
173+ // run the notifier which will handle dequeueing and dispatching notifications.
180174return m .notifier .run (m .success ,m .failure )
181175})
182176
177+ m .mu .Unlock ()
178+
183179// Periodically flush notification state changes to the store.
184180eg .Go (func ()error {
185181// Every interval, collect the messages in the channels and bulk update them in the store.
@@ -355,48 +351,46 @@ func (m *Manager) syncUpdates(ctx context.Context) {
355351
356352// Stop stops the notifier and waits until it has stopped.
357353func (m * Manager )Stop (ctx context.Context )error {
358- var err error
359- m .stopOnce .Do (func () {
360- select {
361- case <- ctx .Done ():
362- err = ctx .Err ()
363- return
364- default :
365- }
354+ m .mu .Lock ()
355+ defer m .mu .Unlock ()
366356
367- m .log .Info (context .Background (),"graceful stop requested" )
357+ if m .closed {
358+ return nil
359+ }
360+ m .closed = true
368361
369- // If the notifier hasn't been started, we don't need to wait for anything.
370- // This is only really during testing when we want to enqueue messages only but not deliver them.
371- if m .notifier == nil {
372- m .doneOnce .Do (func () {
373- close (m .done )
374- })
375- }else {
376- m .notifier .stop ()
377- }
362+ m .log .Debug (context .Background (),"graceful stop requested" )
363+
364+ // If the notifier hasn't been started, we don't need to wait for anything.
365+ // This is only really during testing when we want to enqueue messages only but not deliver them.
366+ if m .notifier != nil {
367+ m .notifier .stop ()
368+ }
378369
379- // Signal the stop channel to cause loop to exit.
380- close (m .stop )
370+ // Signal the stop channel to cause loop to exit.
371+ close (m .stop )
381372
382- // Wait for the manager loop to exit or the context to be canceled, whichever comes first.
383- select {
384- case <- ctx .Done ():
385- var errStr string
386- if ctx .Err ()!= nil {
387- errStr = ctx .Err ().Error ()
388- }
389- // For some reason, slog.Error returns {} for a context error.
390- m .log .Error (context .Background (),"graceful stop failed" ,slog .F ("err" ,errStr ))
391- err = ctx .Err ()
392- return
393- case <- m .done :
394- m .log .Info (context .Background (),"gracefully stopped" )
395- return
396- }
397- })
373+ if m .notifier == nil {
374+ return nil
375+ }
398376
399- return err
377+ m .mu .Unlock ()// Unlock to avoid blocking loop.
378+ defer m .mu .Lock ()// Re-lock the mutex due to earlier defer.
379+
380+ // Wait for the manager loop to exit or the context to be canceled, whichever comes first.
381+ select {
382+ case <- ctx .Done ():
383+ var errStr string
384+ if ctx .Err ()!= nil {
385+ errStr = ctx .Err ().Error ()
386+ }
387+ // For some reason, slog.Error returns {} for a context error.
388+ m .log .Error (context .Background (),"graceful stop failed" ,slog .F ("err" ,errStr ))
389+ return ctx .Err ()
390+ case <- m .done :
391+ m .log .Debug (context .Background (),"gracefully stopped" )
392+ return nil
393+ }
400394}
401395
402396type dispatchResult struct {