Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1bb96b8

Browse files
defelmnqmafredri
andauthored
fix: resolve flake test on manager (#17702)
Fixescoder/internal#544---------Co-authored-by: Mathias Fredriksson <mafredri@gmail.com>
1 parent857587b commit1bb96b8

File tree

2 files changed

+74
-58
lines changed

2 files changed

+74
-58
lines changed

‎coderd/notifications/manager.go

Lines changed: 52 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ type Manager struct {
4444
storeStore
4545
log slog.Logger
4646

47-
notifier*notifier
4847
handlersmap[database.NotificationMethod]Handler
4948
method database.NotificationMethod
5049
helpers template.FuncMap
@@ -53,11 +52,13 @@ type Manager struct {
5352

5453
success,failurechandispatchResult
5554

56-
runOnce sync.Once
57-
stopOnce sync.Once
58-
doneOnce sync.Once
59-
stopchanany
60-
donechanany
55+
mu sync.Mutex// Protects following.
56+
closedbool
57+
notifier*notifier
58+
59+
runOnce sync.Once
60+
stopchanany
61+
donechanany
6162

6263
// clock is for testing only
6364
clock quartz.Clock
@@ -138,7 +139,7 @@ func (m *Manager) WithHandlers(reg map[database.NotificationMethod]Handler) {
138139
// Manager requires system-level permissions to interact with the store.
139140
// Run is only intended to be run once.
140141
func (m*Manager)Run(ctx context.Context) {
141-
m.log.Info(ctx,"started")
142+
m.log.Debug(ctx,"notification managerstarted")
142143

143144
m.runOnce.Do(func() {
144145
// Closes when Stop() is called or context is canceled.
@@ -155,31 +156,26 @@ func (m *Manager) Run(ctx context.Context) {
155156
// events, creating a notifier, and publishing bulk dispatch result updates to the store.
156157
func (m*Manager)loop(ctx context.Context)error {
157158
deferfunc() {
158-
m.doneOnce.Do(func() {
159-
close(m.done)
160-
})
161-
m.log.Info(context.Background(),"notification manager stopped")
159+
close(m.done)
160+
m.log.Debug(context.Background(),"notification manager stopped")
162161
}()
163162

164-
// Caught a terminal signal before notifier was created, exit immediately.
165-
select {
166-
case<-m.stop:
167-
m.log.Warn(ctx,"gracefully stopped")
168-
returnxerrors.Errorf("gracefully stopped")
169-
case<-ctx.Done():
170-
m.log.Error(ctx,"ungracefully stopped",slog.Error(ctx.Err()))
171-
returnxerrors.Errorf("notifications: %w",ctx.Err())
172-
default:
163+
m.mu.Lock()
164+
ifm.closed {
165+
m.mu.Unlock()
166+
returnxerrors.New("manager already closed")
173167
}
174168

175169
vareg errgroup.Group
176170

177-
// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
178171
m.notifier=newNotifier(ctx,m.cfg,uuid.New(),m.log,m.store,m.handlers,m.helpers,m.metrics,m.clock)
179172
eg.Go(func()error {
173+
// run the notifier which will handle dequeueing and dispatching notifications.
180174
returnm.notifier.run(m.success,m.failure)
181175
})
182176

177+
m.mu.Unlock()
178+
183179
// Periodically flush notification state changes to the store.
184180
eg.Go(func()error {
185181
// Every interval, collect the messages in the channels and bulk update them in the store.
@@ -355,48 +351,46 @@ func (m *Manager) syncUpdates(ctx context.Context) {
355351

356352
// Stop stops the notifier and waits until it has stopped.
357353
func (m*Manager)Stop(ctx context.Context)error {
358-
varerrerror
359-
m.stopOnce.Do(func() {
360-
select {
361-
case<-ctx.Done():
362-
err=ctx.Err()
363-
return
364-
default:
365-
}
354+
m.mu.Lock()
355+
deferm.mu.Unlock()
366356

367-
m.log.Info(context.Background(),"graceful stop requested")
357+
ifm.closed {
358+
returnnil
359+
}
360+
m.closed=true
368361

369-
// If the notifier hasn't been started, we don't need to wait for anything.
370-
// This is only really during testing when we want to enqueue messages only but not deliver them.
371-
ifm.notifier==nil {
372-
m.doneOnce.Do(func() {
373-
close(m.done)
374-
})
375-
}else {
376-
m.notifier.stop()
377-
}
362+
m.log.Debug(context.Background(),"graceful stop requested")
363+
364+
// If the notifier hasn't been started, we don't need to wait for anything.
365+
// This is only really during testing when we want to enqueue messages only but not deliver them.
366+
ifm.notifier!=nil {
367+
m.notifier.stop()
368+
}
378369

379-
// Signal the stop channel to cause loop to exit.
380-
close(m.stop)
370+
// Signal the stop channel to cause loop to exit.
371+
close(m.stop)
381372

382-
// Wait for the manager loop to exit or the context to be canceled, whichever comes first.
383-
select {
384-
case<-ctx.Done():
385-
varerrStrstring
386-
ifctx.Err()!=nil {
387-
errStr=ctx.Err().Error()
388-
}
389-
// For some reason, slog.Error returns {} for a context error.
390-
m.log.Error(context.Background(),"graceful stop failed",slog.F("err",errStr))
391-
err=ctx.Err()
392-
return
393-
case<-m.done:
394-
m.log.Info(context.Background(),"gracefully stopped")
395-
return
396-
}
397-
})
373+
ifm.notifier==nil {
374+
returnnil
375+
}
398376

399-
returnerr
377+
m.mu.Unlock()// Unlock to avoid blocking loop.
378+
deferm.mu.Lock()// Re-lock the mutex due to earlier defer.
379+
380+
// Wait for the manager loop to exit or the context to be canceled, whichever comes first.
381+
select {
382+
case<-ctx.Done():
383+
varerrStrstring
384+
ifctx.Err()!=nil {
385+
errStr=ctx.Err().Error()
386+
}
387+
// For some reason, slog.Error returns {} for a context error.
388+
m.log.Error(context.Background(),"graceful stop failed",slog.F("err",errStr))
389+
returnctx.Err()
390+
case<-m.done:
391+
m.log.Debug(context.Background(),"gracefully stopped")
392+
returnnil
393+
}
400394
}
401395

402396
typedispatchResultstruct {

‎coderd/notifications/manager_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,28 @@ func TestStopBeforeRun(t *testing.T) {
182182
},testutil.WaitShort,testutil.IntervalFast)
183183
}
184184

185+
funcTestRunStopRace(t*testing.T) {
186+
t.Parallel()
187+
188+
// SETUP
189+
190+
// nolint:gocritic // Unit test.
191+
ctx:=dbauthz.AsSystemRestricted(testutil.Context(t,testutil.WaitMedium))
192+
store,ps:=dbtestutil.NewDB(t)
193+
logger:=testutil.Logger(t)
194+
195+
// GIVEN: a standard manager
196+
mgr,err:=notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp),store,ps,defaultHelpers(),createMetrics(),logger.Named("notifications-manager"))
197+
require.NoError(t,err)
198+
199+
// Start Run and Stop after each other (run does "go loop()").
200+
// This is to catch a (now fixed) race condition where the manager
201+
// would be accessed/stopped while it was being created/starting up.
202+
mgr.Run(ctx)
203+
err=mgr.Stop(ctx)
204+
require.NoError(t,err)
205+
}
206+
185207
typesyncInterceptorstruct {
186208
notifications.Store
187209

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp