Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

fix: resolve flake test on manager#17702

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
mafredri merged 8 commits intomainfrominternal-544
May 8, 2025
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 52 additions & 58 deletionscoderd/notifications/manager.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -44,7 +44,6 @@ type Manager struct {
store Store
log slog.Logger

notifier *notifier
handlers map[database.NotificationMethod]Handler
method database.NotificationMethod
helpers template.FuncMap
Expand All@@ -53,11 +52,13 @@ type Manager struct {

success, failure chan dispatchResult

runOnce sync.Once
stopOnce sync.Once
doneOnce sync.Once
stop chan any
done chan any
mu sync.Mutex // Protects following.
closed bool
notifier *notifier

runOnce sync.Once
stop chan any
done chan any

// clock is for testing only
clock quartz.Clock
Expand DownExpand Up@@ -138,7 +139,7 @@ func (m *Manager) WithHandlers(reg map[database.NotificationMethod]Handler) {
// Manager requires system-level permissions to interact with the store.
// Run is only intended to be run once.
func (m *Manager) Run(ctx context.Context) {
m.log.Info(ctx, "started")
m.log.Debug(ctx, "notification managerstarted")

m.runOnce.Do(func() {
// Closes when Stop() is called or context is canceled.
Expand All@@ -155,31 +156,26 @@ func (m *Manager) Run(ctx context.Context) {
// events, creating a notifier, and publishing bulk dispatch result updates to the store.
func (m *Manager) loop(ctx context.Context) error {
defer func() {
m.doneOnce.Do(func() {
close(m.done)
})
m.log.Info(context.Background(), "notification manager stopped")
close(m.done)
m.log.Debug(context.Background(), "notification manager stopped")
}()

// Caught a terminal signal before notifier was created, exit immediately.
select {
case <-m.stop:
m.log.Warn(ctx, "gracefully stopped")
return xerrors.Errorf("gracefully stopped")
case <-ctx.Done():
m.log.Error(ctx, "ungracefully stopped", slog.Error(ctx.Err()))
return xerrors.Errorf("notifications: %w", ctx.Err())
default:
m.mu.Lock()
if m.closed {
m.mu.Unlock()
return xerrors.New("manager already closed")
}

var eg errgroup.Group

// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock)
eg.Go(func() error {
// run the notifier which will handle dequeueing and dispatching notifications.
return m.notifier.run(m.success, m.failure)
})

m.mu.Unlock()

// Periodically flush notification state changes to the store.
eg.Go(func() error {
// Every interval, collect the messages in the channels and bulk update them in the store.
Expand DownExpand Up@@ -355,48 +351,46 @@ func (m *Manager) syncUpdates(ctx context.Context) {

// Stop stops the notifier and waits until it has stopped.
func (m *Manager) Stop(ctx context.Context) error {
var err error
m.stopOnce.Do(func() {
select {
case <-ctx.Done():
err = ctx.Err()
return
default:
}
m.mu.Lock()
defer m.mu.Unlock()

m.log.Info(context.Background(), "graceful stop requested")
if m.closed {
return nil
}
m.closed = true

// If the notifier hasn't been started, we don't need to wait for anything.
// This is only really during testing when we want to enqueue messages only but not deliver them.
if m.notifier == nil {
m.doneOnce.Do(func() {
close(m.done)
})
} else {
m.notifier.stop()
}
m.log.Debug(context.Background(), "graceful stop requested")

// If the notifier hasn't been started, we don't need to wait for anything.
// This is only really during testing when we want to enqueue messages only but not deliver them.
if m.notifier != nil {
m.notifier.stop()
}

// Signal the stop channel to cause loop to exit.
close(m.stop)
// Signal the stop channel to cause loop to exit.
close(m.stop)

// Wait for the manager loop to exit or the context to be canceled, whichever comes first.
select {
case <-ctx.Done():
var errStr string
if ctx.Err() != nil {
errStr = ctx.Err().Error()
}
// For some reason, slog.Error returns {} for a context error.
m.log.Error(context.Background(), "graceful stop failed", slog.F("err", errStr))
err = ctx.Err()
return
case <-m.done:
m.log.Info(context.Background(), "gracefully stopped")
return
}
})
if m.notifier == nil {
return nil
}

return err
m.mu.Unlock() // Unlock to avoid blocking loop.
defer m.mu.Lock() // Re-lock the mutex due to earlier defer.

// Wait for the manager loop to exit or the context to be canceled, whichever comes first.
select {
case <-ctx.Done():
var errStr string
if ctx.Err() != nil {
errStr = ctx.Err().Error()
}
// For some reason, slog.Error returns {} for a context error.
m.log.Error(context.Background(), "graceful stop failed", slog.F("err", errStr))
return ctx.Err()
case <-m.done:
m.log.Debug(context.Background(), "gracefully stopped")
return nil
}
}

type dispatchResult struct {
Expand Down
22 changes: 22 additions & 0 deletionscoderd/notifications/manager_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -182,6 +182,28 @@ func TestStopBeforeRun(t *testing.T) {
}, testutil.WaitShort, testutil.IntervalFast)
}

func TestRunStopRace(t *testing.T) {
t.Parallel()

// SETUP

// nolint:gocritic // Unit test.
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitMedium))
store, ps := dbtestutil.NewDB(t)
logger := testutil.Logger(t)

// GIVEN: a standard manager
mgr, err := notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp), store, ps, defaultHelpers(), createMetrics(), logger.Named("notifications-manager"))
require.NoError(t, err)

// Start Run and Stop after each other (run does "go loop()").
// This is to catch a (now fixed) race condition where the manager
// would be accessed/stopped while it was being created/starting up.
mgr.Run(ctx)
err = mgr.Stop(ctx)
require.NoError(t, err)
}

type syncInterceptor struct {
notifications.Store

Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp