Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat: implement thin vertical slice of system-generated notifications#13537

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
dannykopping merged 30 commits intomainfromdk/system-notifications-lib
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from1 commit
Commits
Show all changes
30 commits
Select commitHold shift + click to select a range
53c9cbb
feat: system-generated notifications
dannykoppingJun 11, 2024
4856aed
Fixing lint errors & minor tests
dannykoppingJun 11, 2024
cda6efb
Fixing dbauthz test
dannykoppingJun 11, 2024
86f937a
TestBufferedUpdates does not need a real db, altering test details sl…
dannykoppingJun 11, 2024
e8f1af2
Correct TestBufferedUpdates to count updated entries, use real db again
dannykoppingJun 12, 2024
a056f54
Use UUID for notifier IDs
dannykoppingJun 27, 2024
8c64d30
Small improvements from review suggestions
dannykoppingJun 27, 2024
ac149ec
Protect notifiers from modification during Stop()
dannykoppingJun 27, 2024
884fadf
Split out enqueuer as separate responsibility, get rid of singleton
dannykoppingJun 28, 2024
4e362e7
Remove unnecessary handler registry
dannykoppingJun 28, 2024
8097290
Remove unused context
dannykoppingJun 28, 2024
1b841ad
Centralise markdown rendering
dannykoppingJun 28, 2024
61f5bd6
Appease the linter
dannykoppingJun 28, 2024
3c8e33b
Only enqueue notification when not initiated by self
dannykoppingJul 1, 2024
757327c
Hide config flags which are unlikely to be modified by operators
dannykoppingJul 1, 2024
6f909ae
Remove unnecessary Labels struct
dannykoppingJul 1, 2024
36698c5
Enable experiment as safe
dannykoppingJul 1, 2024
c5701a6
Correcting bad refactor
dannykoppingJul 1, 2024
9d4c312
Initialize Enqueuer on API startup
dannykoppingJul 1, 2024
9380d8e
Only start one notifier since all dispatches are concurrent anyway
dannykoppingJul 1, 2024
4b7214d
Fix docs
dannykoppingJul 1, 2024
6679ef1
Fix lint error
dannykoppingJul 1, 2024
337997d
Merge branch 'main' of github.com:/coder/coder into dk/system-notific…
dannykoppingJul 2, 2024
ba5f7c6
Merge branch 'main' of github.com:/coder/coder into dk/system-notific…
dannykoppingJul 3, 2024
0f29293
Review feedback
dannykoppingJul 3, 2024
7c6c486
Merge branch 'main' of github.com:/coder/coder into dk/system-notific…
dannykoppingJul 4, 2024
c6e75c2
Fix lint failures
dannykoppingJul 4, 2024
aff9e6c
Review comments
dannykoppingJul 4, 2024
613e074
Avoid race by exposing number of pending updates
dannykoppingJul 4, 2024
faea7fc
Merge branch 'main' of github.com:/coder/coder into dk/system-notific…
dannykoppingJul 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
PrevPrevious commit
NextNext commit
Avoid race by exposing number of pending updates
Signed-off-by: Danny Kopping <danny@coder.com>
  • Loading branch information
@dannykopping
dannykopping committedJul 4, 2024
commit613e07444884ca33fadd3bd60fdb6452e18e84b4
52 changes: 29 additions & 23 deletionscoderd/notifications/manager.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -44,6 +44,8 @@ type Manager struct {
notifier *notifier
handlers map[database.NotificationMethod]Handler

success, failure chan dispatchResult

runOnce sync.Once
stopOnce sync.Once
stop chan any
Expand All@@ -67,6 +69,15 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, log slog.Logger)
cfg: cfg,
store: store,

// Buffer successful/failed notification dispatches in memory to reduce load on the store.
//
// We keep separate buffered for success/failure right now because the bulk updates are already a bit janky,
// see BulkMarkNotificationMessagesSent/BulkMarkNotificationMessagesFailed. If we had the ability to batch updates,
// like is offered in https://docs.sqlc.dev/en/stable/reference/query-annotations.html#batchmany, we'd have a cleaner
// approach to this - but for now this will work fine.
success: make(chan dispatchResult, cfg.StoreSyncBufferSize),
failure: make(chan dispatchResult, cfg.StoreSyncBufferSize),

stop: make(chan any),
done: make(chan any),

Expand DownExpand Up@@ -123,23 +134,12 @@ func (m *Manager) loop(ctx context.Context) error {
default:
}

var (
// Buffer successful/failed notification dispatches in memory to reduce load on the store.
//
// We keep separate buffered for success/failure right now because the bulk updates are already a bit janky,
// see BulkMarkNotificationMessagesSent/BulkMarkNotificationMessagesFailed. If we had the ability to batch updates,
// like is offered in https://docs.sqlc.dev/en/stable/reference/query-annotations.html#batchmany, we'd have a cleaner
// approach to this - but for now this will work fine.
success = make(chan dispatchResult, m.cfg.StoreSyncBufferSize)
failure = make(chan dispatchResult, m.cfg.StoreSyncBufferSize)
)

var eg errgroup.Group

// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers)
eg.Go(func() error {
return m.notifier.run(ctx, success, failure)
return m.notifier.run(ctx,m.success,m.failure)
})

// Periodically flush notification state changes to the store.
Expand All@@ -162,21 +162,21 @@ func (m *Manager) loop(ctx context.Context) error {
// TODO: mention the above tradeoff in documentation.
m.log.Warn(ctx, "exiting ungracefully", slog.Error(ctx.Err()))

if len(success)+len(failure) > 0 {
if len(m.success)+len(m.failure) > 0 {
m.log.Warn(ctx, "content canceled with pending updates in buffer, these messages will be sent again after lease expires",
slog.F("success_count", len(success)), slog.F("failure_count", len(failure)))
slog.F("success_count", len(m.success)), slog.F("failure_count", len(m.failure)))
}
return ctx.Err()
case <-m.stop:
if len(success)+len(failure) > 0 {
if len(m.success)+len(m.failure) > 0 {
m.log.Warn(ctx, "flushing buffered updates before stop",
slog.F("success_count", len(success)), slog.F("failure_count", len(failure)))
m.bulkUpdate(ctx, success, failure)
slog.F("success_count", len(m.success)), slog.F("failure_count", len(m.failure)))
m.bulkUpdate(ctx)
m.log.Warn(ctx, "flushing updates done")
}
return nil
case <-tick.C:
m.bulkUpdate(ctx, success, failure)
m.bulkUpdate(ctx)
}
}
})
Expand All@@ -188,16 +188,22 @@ func (m *Manager) loop(ctx context.Context) error {
return err
}

// BufferedUpdatesCount returns the number of buffered updates which are currently waiting to be flushed to the store.
// The returned values are for success & failure, respectively.
func (m *Manager) BufferedUpdatesCount() (success int, failure int) {
return len(m.success), len(m.failure)
}

// bulkUpdate updates messages in the store based on the given successful and failed message dispatch results.
func (m *Manager) bulkUpdate(ctx context.Context, success, failure <-chan dispatchResult) {
func (m *Manager) bulkUpdate(ctx context.Context) {
select {
case <-ctx.Done():
return
default:
}

nSuccess := len(success)
nFailure := len(failure)
nSuccess := len(m.success)
nFailure := len(m.failure)

// Nothing to do.
if nSuccess+nFailure == 0 {
Expand All@@ -217,12 +223,12 @@ func (m *Manager) bulkUpdate(ctx context.Context, success, failure <-chan dispat
// will be processed on the next bulk update.

for i := 0; i < nSuccess; i++ {
res := <-success
res := <-m.success
successParams.IDs = append(successParams.IDs, res.msg)
successParams.SentAts = append(successParams.SentAts, res.ts)
}
for i := 0; i < nFailure; i++ {
res := <-failure
res := <-m.failure

status := database.NotificationMessageStatusPermanentFailure
if res.retryable {
Expand Down
33 changes: 26 additions & 7 deletionscoderd/notifications/manager_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -5,7 +5,9 @@ import (
"encoding/json"
"sync/atomic"
"testing"
"time"

"github.com/coder/serpent"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All@@ -31,11 +33,14 @@ func TestBufferedUpdates(t *testing.T) {
if !dbtestutil.WillUsePostgres() {
t.Skip("This test requires postgres")
}

ctx, logger, db := setup(t)
interceptor := &bulkUpdateInterceptor{Store: db}

santa := &santaHandler{}

cfg := defaultNotificationsConfig(database.NotificationMethodSmtp)
cfg.StoreSyncInterval = serpent.Duration(time.Hour) // Ensure we don't sync the store automatically.

mgr, err := notifications.NewManager(cfg, interceptor, logger.Named("notifications-manager"))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
Expand All@@ -47,20 +52,34 @@ func TestBufferedUpdates(t *testing.T) {
user := dbgen.User(t, db, database.User{})

// given
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"nice": "true"}, "")
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"nice": "true"}, "") // Will succeed.
require.NoError(t, err)
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"nice": "true"}, "")
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"nice": "true"}, "") // Will succeed.
require.NoError(t, err)
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"nice": "false"}, "")
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"nice": "false"}, "") // Will fail.
require.NoError(t, err)

// when
mgr.Run(ctx)

// then

const (
expectedSuccess = 2
expectedFailure = 1
)

// Wait for messages to be dispatched.
require.Eventually(t, func() bool { return santa.naughty.Load() == 1 && santa.nice.Load() == 2 }, testutil.WaitMedium, testutil.IntervalFast)
require.Eventually(t, func() bool {
return santa.naughty.Load() == expectedFailure &&
santa.nice.Load() == expectedSuccess
}, testutil.WaitMedium, testutil.IntervalFast)

// Wait for the expected number of buffered updates to be accumulated.
require.Eventually(t, func() bool {
success, failure := mgr.BufferedUpdatesCount()
return success == expectedSuccess && failure == expectedFailure
}, testutil.WaitShort, testutil.IntervalFast)

// Stop the manager which forces an update of buffered updates.
require.NoError(t, mgr.Stop(ctx))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

there is a race condition here, which is that above we wait for notifications to be sent to santa, but sending to santa happensbefore sending on thesuccess andfailure channels. So, triggering stop and the final bulk update might miss one or more success/failure updates and cause this test to flake.

Quartz can help here. The manager's bulk update and the notifier's processing loop depend on Tickers. If you convert them to use aquartz.TickerFunc, then you can control the mock clock and trigger the processing loop and bulk update, and wait for them to finish.

This lets us get rid of theEventually that checks for dispatched notifications (Quartz allows you to wait until the tick processing is done, so there is no race in just asserting santa got what we expected).

It also means we can get rid of theEventually below for the same reason --- we can wait for bulk processing to complete before asserting theinterceptor's state.

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I've got a follow-up item to implement Quartz in a subsequent PR.

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Addressed in613e074

Expand All@@ -73,8 +92,8 @@ func TestBufferedUpdates(t *testing.T) {
ct.FailNow()
}

assert.EqualValues(ct,1, interceptor.failed.Load())
assert.EqualValues(ct,2, interceptor.sent.Load())
assert.EqualValues(ct,expectedFailure, interceptor.failed.Load())
assert.EqualValues(ct,expectedSuccess, interceptor.sent.Load())
}, testutil.WaitMedium, testutil.IntervalFast)
}

Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp