- Notifications
You must be signed in to change notification settings - Fork1k
feat: implement thin vertical slice of system-generated notifications#13537
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes from1 commit
53c9cbb
4856aed
cda6efb
86f937a
e8f1af2
a056f54
8c64d30
ac149ec
884fadf
4e362e7
8097290
1b841ad
61f5bd6
3c8e33b
757327c
6f909ae
36698c5
c5701a6
9d4c312
9380d8e
4b7214d
6679ef1
337997d
ba5f7c6
0f29293
7c6c486
c6e75c2
aff9e6c
613e074
faea7fc
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
Signed-off-by: Danny Kopping <danny@coder.com>
- Loading branch information
Uh oh!
There was an error while loading.Please reload this page.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.
Uh oh!
There was an error while loading.Please reload this page.
Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.
Uh oh!
There was an error while loading.Please reload this page.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -18,10 +18,10 @@ import ( | ||
// Manager manages all notifications being enqueued and dispatched. | ||
// | ||
// Manager maintains anotifier: this consumes the queue of notification messages in the store. | ||
// | ||
//The notifier dequeuesmessages from the store _CODER_NOTIFICATIONS_LEASE_COUNT_ at a time and concurrently "dispatches" | ||
//these messages, meaning they aresent by their respective methods (email, webhook, etc). | ||
// | ||
// To reduce load on the store, successful and failed dispatches are accumulated in two separate buffers (success/failure) | ||
// of size CODER_NOTIFICATIONS_STORE_SYNC_INTERVAL in the Manager, and updates are sent to the store about which messages | ||
@@ -30,20 +30,19 @@ import ( | ||
// sent but they start failing too quickly, the buffers (receive channels) will fill up and block senders, which will | ||
// slow down the dispatch rate. | ||
// | ||
// NOTE: The above backpressure mechanism only works within the same process, which may not be true forever, such as if | ||
// we split notifiers out into separate targets for greater processing throughput; in this case we will need an | ||
// alternative mechanism for handling backpressure. | ||
type Manager struct { | ||
cfg codersdk.NotificationsConfig | ||
store Store | ||
log slog.Logger | ||
notifier *notifier | ||
handlers map[database.NotificationMethod]Handler | ||
runOnce sync.Once | ||
stopOnce sync.Once | ||
stop chan any | ||
done chan any | ||
@@ -81,25 +80,28 @@ func (m *Manager) WithHandlers(reg map[database.NotificationMethod]Handler) { | ||
// Run initiates the control loop in the background, which spawns a given number of notifier goroutines. | ||
// Manager requires system-level permissions to interact with the store. | ||
// Run is only intended to be run once. | ||
func (m *Manager) Run(ctx context.Context) { | ||
m.runOnce.Do(func() { | ||
// Closes when Stop() is called or context is canceled. | ||
go func() { | ||
err := m.loop(ctx) | ||
dannykopping marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
if err != nil { | ||
m.log.Error(ctx, "notification manager stopped with error", slog.Error(err)) | ||
} | ||
}() | ||
}) | ||
} | ||
// loop contains the main business logic of the notification manager. It is responsible for subscribing to notification | ||
// events, creatinga notifier, and publishing bulk dispatch result updates to the store. | ||
func (m *Manager) loop(ctx context.Context) error { | ||
defer func() { | ||
close(m.done) | ||
m.log.Info(context.Background(), "notification manager stopped") | ||
}() | ||
// Caught a terminal signal beforenotifier was created, exit immediately. | ||
select { | ||
case <-m.stop: | ||
m.log.Warn(ctx, "gracefully stopped") | ||
@@ -121,21 +123,17 @@ func (m *Manager) loop(ctx context.Context, notifiers int) error { | ||
failure = make(chan dispatchResult, m.cfg.StoreSyncBufferSize) | ||
) | ||
var eg errgroup.Group | ||
// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications. | ||
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers) | ||
eg.Go(func() error { | ||
return m.notifier.run(ctx, success, failure) | ||
}) | ||
// Periodically flush notification state changes to the store. | ||
eg.Go(func() error { | ||
// Every interval, collect the messages in the channels and bulk update them in the store. | ||
tick := time.NewTicker(m.cfg.StoreSyncInterval.Value()) | ||
dannykopping marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
defer tick.Stop() | ||
for { | ||
@@ -281,12 +279,8 @@ func (m *Manager) bulkUpdate(ctx context.Context, success, failure <-chan dispat | ||
wg.Wait() | ||
} | ||
// Stop stopsthe notifier and waits untilit has stopped. | ||
func (m *Manager) Stop(ctx context.Context) error { | ||
var err error | ||
m.stopOnce.Do(func() { | ||
select { | ||
@@ -298,22 +292,14 @@ func (m *Manager) Stop(ctx context.Context) error { | ||
m.log.Info(context.Background(), "graceful stop requested") | ||
// If thenotifier hasn't been started, we don't need to wait for anything. | ||
// This is only really during testing when we want to enqueue messages only but not deliver them. | ||
ifm.notifier ==nil { | ||
close(m.done) | ||
} else { | ||
m.notifier.stop() | ||
} | ||
// Signal the stop channel to cause loop to exit. | ||
close(m.stop) | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -68,7 +68,7 @@ func TestBasicNotificationRoundtrip(t *testing.T) { | ||
fid, err := enq.Enqueue(ctx, user.UserID, notifications.TemplateWorkspaceDeleted, map[string]string{"type": "failure"}, "test") | ||
require.NoError(t, err) | ||
mgr.Run(ctx) | ||
// then | ||
require.Eventually(t, func() bool { return handler.succeeded == sid.String() }, testutil.WaitLong, testutil.IntervalMedium) | ||
@@ -124,7 +124,7 @@ func TestSMTPDispatch(t *testing.T) { | ||
msgID, err := enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{}, "test") | ||
require.NoError(t, err) | ||
mgr.Run(ctx) | ||
// then | ||
require.Eventually(t, func() bool { | ||
@@ -209,7 +209,7 @@ func TestWebhookDispatch(t *testing.T) { | ||
msgID, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, input, "test") | ||
require.NoError(t, err) | ||
mgr.Run(ctx) | ||
// then | ||
require.Eventually(t, func() bool { return <-sent }, testutil.WaitShort, testutil.IntervalFast) | ||
dannykopping marked this conversation as resolved. OutdatedShow resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
@@ -289,26 +289,25 @@ func TestBackpressure(t *testing.T) { | ||
require.NoError(t, err) | ||
} | ||
// Start the notifier. | ||
mgr.Run(ctx) | ||
// then | ||
// Wait for 3 fetch intervals, then check progress. | ||
time.Sleep(fetchInterval * 3) | ||
// We expect thenotifier will have dispatched ONLY the initial batch of messages. | ||
// In other words, thenotifier should have dispatched 3 batches by now, but because the buffered updates have not | ||
// been processed: there is backpressure. | ||
require.EqualValues(t, batchSize, handler.sent.Load()+handler.err.Load()) | ||
// We expect that the store will have received NO updates. | ||
require.EqualValues(t, 0, storeInterceptor.sent.Load()+storeInterceptor.failed.Load()) | ||
// However, when we Stop() the manager the backpressure will be relieved and the buffered updates will ALL be flushed, | ||
// since all the goroutinesthat wereblocked(on writing updates to the buffer) will be unblocked and will complete. | ||
require.NoError(t, mgr.Stop(ctx)) | ||
require.EqualValues(t, batchSize, storeInterceptor.sent.Load()+storeInterceptor.failed.Load()) | ||
} | ||
func TestRetries(t *testing.T) { | ||
@@ -394,9 +393,7 @@ func TestRetries(t *testing.T) { | ||
require.NoError(t, err) | ||
} | ||
mgr.Run(ctx) | ||
// then | ||
require.Eventually(t, func() bool { | ||
Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.