Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat: implement thin vertical slice of system-generated notifications#13537

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
dannykopping merged 30 commits intomainfromdk/system-notifications-lib
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from1 commit
Commits
Show all changes
30 commits
Select commitHold shift + click to select a range
53c9cbb
feat: system-generated notifications
dannykoppingJun 11, 2024
4856aed
Fixing lint errors & minor tests
dannykoppingJun 11, 2024
cda6efb
Fixing dbauthz test
dannykoppingJun 11, 2024
86f937a
TestBufferedUpdates does not need a real db, altering test details sl…
dannykoppingJun 11, 2024
e8f1af2
Correct TestBufferedUpdates to count updated entries, use real db again
dannykoppingJun 12, 2024
a056f54
Use UUID for notifier IDs
dannykoppingJun 27, 2024
8c64d30
Small improvements from review suggestions
dannykoppingJun 27, 2024
ac149ec
Protect notifiers from modification during Stop()
dannykoppingJun 27, 2024
884fadf
Split out enqueuer as separate responsibility, get rid of singleton
dannykoppingJun 28, 2024
4e362e7
Remove unnecessary handler registry
dannykoppingJun 28, 2024
8097290
Remove unused context
dannykoppingJun 28, 2024
1b841ad
Centralise markdown rendering
dannykoppingJun 28, 2024
61f5bd6
Appease the linter
dannykoppingJun 28, 2024
3c8e33b
Only enqueue notification when not initiated by self
dannykoppingJul 1, 2024
757327c
Hide config flags which are unlikely to be modified by operators
dannykoppingJul 1, 2024
6f909ae
Remove unnecessary Labels struct
dannykoppingJul 1, 2024
36698c5
Enable experiment as safe
dannykoppingJul 1, 2024
c5701a6
Correcting bad refactor
dannykoppingJul 1, 2024
9d4c312
Initialize Enqueuer on API startup
dannykoppingJul 1, 2024
9380d8e
Only start one notifier since all dispatches are concurrent anyway
dannykoppingJul 1, 2024
4b7214d
Fix docs
dannykoppingJul 1, 2024
6679ef1
Fix lint error
dannykoppingJul 1, 2024
337997d
Merge branch 'main' of github.com:/coder/coder into dk/system-notific…
dannykoppingJul 2, 2024
ba5f7c6
Merge branch 'main' of github.com:/coder/coder into dk/system-notific…
dannykoppingJul 3, 2024
0f29293
Review feedback
dannykoppingJul 3, 2024
7c6c486
Merge branch 'main' of github.com:/coder/coder into dk/system-notific…
dannykoppingJul 4, 2024
c6e75c2
Fix lint failures
dannykoppingJul 4, 2024
aff9e6c
Review comments
dannykoppingJul 4, 2024
613e074
Avoid race by exposing number of pending updates
dannykoppingJul 4, 2024
faea7fc
Merge branch 'main' of github.com:/coder/coder into dk/system-notific…
dannykoppingJul 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
PrevPrevious commit
NextNext commit
Only start one notifier since all dispatches are concurrent anyway
Signed-off-by: Danny Kopping <danny@coder.com>
  • Loading branch information
@dannykopping
dannykopping committedJul 1, 2024
commit9380d8e6ee0e89b68a5a71dd352c37a447c48f04
11 changes: 6 additions & 5 deletionscli/server.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -55,6 +55,11 @@ import (

"cdr.dev/slog"
"cdr.dev/slog/sloggers/sloghuman"
"github.com/coder/pretty"
"github.com/coder/retry"
"github.com/coder/serpent"
"github.com/coder/wgtunnel/tunnelsdk"

"github.com/coder/coder/v2/buildinfo"
"github.com/coder/coder/v2/cli/clilog"
"github.com/coder/coder/v2/cli/cliui"
Expand DownExpand Up@@ -99,10 +104,6 @@ import (
"github.com/coder/coder/v2/provisionersdk"
sdkproto "github.com/coder/coder/v2/provisionersdk/proto"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/pretty"
"github.com/coder/retry"
"github.com/coder/serpent"
"github.com/coder/wgtunnel/tunnelsdk"
)

func createOIDCConfig(ctx context.Context, vals *codersdk.DeploymentValues) (*coderd.OIDCConfig, error) {
Expand DownExpand Up@@ -999,7 +1000,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
}

// nolint:gocritic // TODO: create own role.
notificationsManager.Run(dbauthz.AsSystemRestricted(ctx), int(cfg.WorkerCount.Value()))
notificationsManager.Run(dbauthz.AsSystemRestricted(ctx))
}

// Wrap the server in middleware that redirects to the access URL if
Expand Down
8 changes: 2 additions & 6 deletionscli/testdata/server-config.yaml.golden
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -534,10 +534,6 @@ notifications:
# this option at its default value.
# (default: 50, type: int)
store-sync-buffer-size: 50
# How many workers should be processing messages in the queue; increase this count
# if notifications are not being processed fast enough.
# (default: 2, type: int)
worker-count: 2
# How long a notifier should lease a message. This is effectively how long a
# notification is 'owned' by a notifier, and once this period expires it will be
# available for lease by another notifier. Leasing is important in order for
Expand All@@ -547,8 +543,8 @@ notifications:
# (default: 2m0s, type: duration)
lease-period: 2m0s
# How many notifications a notifier should lease per fetch interval.
# (default:10, type: int)
lease-count:10
# (default:20, type: int)
lease-count:20
# How often to query the database for queued notifications.
# (default: 15s, type: duration)
fetch-interval: 15s
5 changes: 1 addition & 4 deletionscoderd/apidoc/docs.go
View file
Open in desktop

Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.

5 changes: 1 addition & 4 deletionscoderd/apidoc/swagger.json
View file
Open in desktop

Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.

84 changes: 35 additions & 49 deletionscoderd/notifications/manager.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -18,10 +18,10 @@ import (

// Manager manages all notifications being enqueued and dispatched.
//
// Manager maintains agroup of notifiers: these consume the queue of notification messages in the store.
// Manager maintains anotifier: this consumes the queue of notification messages in the store.
//
//Notifiers dequeuemessages from the store _CODER_NOTIFICATIONS_LEASE_COUNT_ at a time and concurrently "dispatch" these messages, meaning they are
// sent by their respective methods (email, webhook, etc).
//The notifier dequeuesmessages from the store _CODER_NOTIFICATIONS_LEASE_COUNT_ at a time and concurrently "dispatches"
//these messages, meaning they aresent by their respective methods (email, webhook, etc).
//
// To reduce load on the store, successful and failed dispatches are accumulated in two separate buffers (success/failure)
// of size CODER_NOTIFICATIONS_STORE_SYNC_INTERVAL in the Manager, and updates are sent to the store about which messages
Expand All@@ -30,20 +30,19 @@ import (
// sent but they start failing too quickly, the buffers (receive channels) will fill up and block senders, which will
// slow down the dispatch rate.
//
// NOTE: The above backpressure mechanism only worksif all notifiers livewithin the same process, which may not be true
//forever, such as ifwe split notifiers out into separate targets for greater processing throughput; in this case we
//will need analternative mechanism for handling backpressure.
// NOTE: The above backpressure mechanism only works within the same process, which may not be true forever, such as if
// we split notifiers out into separate targets for greater processing throughput; in this case we will need an
// alternative mechanism for handling backpressure.
type Manager struct {
cfg codersdk.NotificationsConfig

store Store
log slog.Logger

notifiers []*notifier
notifierMu sync.Mutex

notifier *notifier
handlers map[database.NotificationMethod]Handler

runOnce sync.Once
stopOnce sync.Once
stop chan any
done chan any
Expand DownExpand Up@@ -81,25 +80,28 @@ func (m *Manager) WithHandlers(reg map[database.NotificationMethod]Handler) {

// Run initiates the control loop in the background, which spawns a given number of notifier goroutines.
// Manager requires system-level permissions to interact with the store.
func (m *Manager) Run(ctx context.Context, notifiers int) {
// Closes when Stop() is called or context is canceled.
go func() {
err := m.loop(ctx, notifiers)
if err != nil {
m.log.Error(ctx, "notification manager stopped with error", slog.Error(err))
}
}()
// Run is only intended to be run once.
func (m *Manager) Run(ctx context.Context) {
m.runOnce.Do(func() {
// Closes when Stop() is called or context is canceled.
go func() {
err := m.loop(ctx)
if err != nil {
m.log.Error(ctx, "notification manager stopped with error", slog.Error(err))
}
}()
})
}

// loop contains the main business logic of the notification manager. It is responsible for subscribing to notification
// events, creatingnotifiers, and publishing bulk dispatch result updates to the store.
func (m *Manager) loop(ctx context.Context, notifiers int) error {
// events, creatinga notifier, and publishing bulk dispatch result updates to the store.
func (m *Manager) loop(ctx context.Context) error {
defer func() {
close(m.done)
m.log.Info(context.Background(), "notification manager stopped")
}()

// Caught a terminal signal beforenotifiers were created, exit immediately.
// Caught a terminal signal beforenotifier was created, exit immediately.
select {
case <-m.stop:
m.log.Warn(ctx, "gracefully stopped")
Expand All@@ -121,21 +123,17 @@ func (m *Manager) loop(ctx context.Context, notifiers int) error {
failure = make(chan dispatchResult, m.cfg.StoreSyncBufferSize)
)

// Create a specific number of notifiers to run, and run them concurrently.
var eg errgroup.Group
m.notifierMu.Lock()
for i := 0; i < notifiers; i++ {
n := newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers)
m.notifiers = append(m.notifiers, n)

eg.Go(func() error {
return n.run(ctx, success, failure)
})
}
m.notifierMu.Unlock()

// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers)
eg.Go(func() error {
// Every interval, collect the messages in the channels and bulk update them in the database.
return m.notifier.run(ctx, success, failure)
})

// Periodically flush notification state changes to the store.
eg.Go(func() error {
// Every interval, collect the messages in the channels and bulk update them in the store.
tick := time.NewTicker(m.cfg.StoreSyncInterval.Value())
defer tick.Stop()
for {
Expand DownExpand Up@@ -281,12 +279,8 @@ func (m *Manager) bulkUpdate(ctx context.Context, success, failure <-chan dispat
wg.Wait()
}

// Stop stopsall notifiers and waits untilthey have stopped.
// Stop stopsthe notifier and waits untilit has stopped.
func (m *Manager) Stop(ctx context.Context) error {
// Prevent notifiers from being modified while we're stopping them.
m.notifierMu.Lock()
defer m.notifierMu.Unlock()

var err error
m.stopOnce.Do(func() {
select {
Expand All@@ -298,22 +292,14 @@ func (m *Manager) Stop(ctx context.Context) error {

m.log.Info(context.Background(), "graceful stop requested")

// If thenotifiers haven't been started, we don't need to wait for anything.
// If thenotifier hasn't been started, we don't need to wait for anything.
// This is only really during testing when we want to enqueue messages only but not deliver them.
iflen(m.notifiers) ==0 {
ifm.notifier ==nil {
close(m.done)
} else {
m.notifier.stop()
}

// Stop all notifiers.
var eg errgroup.Group
for _, n := range m.notifiers {
eg.Go(func() error {
n.stop()
return nil
})
}
_ = eg.Wait()

// Signal the stop channel to cause loop to exit.
close(m.stop)

Expand Down
15 changes: 14 additions & 1 deletioncoderd/notifications/manager_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -60,7 +60,7 @@ func TestBufferedUpdates(t *testing.T) {
}

// when
mgr.Run(ctx, 1)
mgr.Run(ctx)

// then

Expand DownExpand Up@@ -137,6 +137,19 @@ func TestBuildPayload(t *testing.T) {
}
}

func TestStopBeforeRun(t *testing.T) {
ctx := context.Background()
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true, IgnoredErrorIs: []error{}}).Leveled(slog.LevelDebug)
mgr, err := notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp), dbmem.New(), logger.Named("notifications-manager"))
require.NoError(t, err)

// Call stop before notifier is started with Run().
require.Eventually(t, func() bool {
assert.NoError(t, mgr.Stop(ctx))
return true
}, testutil.WaitShort, testutil.IntervalFast)
}

type bulkUpdateInterceptor struct {
notifications.Store

Expand Down
27 changes: 12 additions & 15 deletionscoderd/notifications/notifications_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -68,7 +68,7 @@ func TestBasicNotificationRoundtrip(t *testing.T) {
fid, err := enq.Enqueue(ctx, user.UserID, notifications.TemplateWorkspaceDeleted, map[string]string{"type": "failure"}, "test")
require.NoError(t, err)

mgr.Run(ctx, 1)
mgr.Run(ctx)

// then
require.Eventually(t, func() bool { return handler.succeeded == sid.String() }, testutil.WaitLong, testutil.IntervalMedium)
Expand DownExpand Up@@ -124,7 +124,7 @@ func TestSMTPDispatch(t *testing.T) {
msgID, err := enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{}, "test")
require.NoError(t, err)

mgr.Run(ctx, 1)
mgr.Run(ctx)

// then
require.Eventually(t, func() bool {
Expand DownExpand Up@@ -209,7 +209,7 @@ func TestWebhookDispatch(t *testing.T) {
msgID, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, input, "test")
require.NoError(t, err)

mgr.Run(ctx, 1)
mgr.Run(ctx)

// then
require.Eventually(t, func() bool { return <-sent }, testutil.WaitShort, testutil.IntervalFast)
Expand DownExpand Up@@ -289,26 +289,25 @@ func TestBackpressure(t *testing.T) {
require.NoError(t, err)
}

// Start two notifiers.
const notifiers = 2
mgr.Run(ctx, notifiers)
// Start the notifier.
mgr.Run(ctx)

// then

// Wait for 3 fetch intervals, then check progress.
time.Sleep(fetchInterval * 3)

// We expect thenotifiers will have dispatched ONLY the initial batch of messages.
// In other words, thenotifiers should have dispatched 3 batches by now, but because the buffered updates have not
// been processed there is backpressure.
require.EqualValues(t,notifiers*batchSize, handler.sent.Load()+handler.err.Load())
// We expect thenotifier will have dispatched ONLY the initial batch of messages.
// In other words, thenotifier should have dispatched 3 batches by now, but because the buffered updates have not
// been processed: there is backpressure.
require.EqualValues(t, batchSize, handler.sent.Load()+handler.err.Load())
// We expect that the store will have received NO updates.
require.EqualValues(t, 0, storeInterceptor.sent.Load()+storeInterceptor.failed.Load())

// However, when we Stop() the manager the backpressure will be relieved and the buffered updates will ALL be flushed,
// since all the goroutines blocked on writing updates to the buffer will be unblocked and will complete.
// since all the goroutinesthat wereblocked(on writing updates to the buffer) will be unblocked and will complete.
require.NoError(t, mgr.Stop(ctx))
require.EqualValues(t,notifiers*batchSize, storeInterceptor.sent.Load()+storeInterceptor.failed.Load())
require.EqualValues(t, batchSize, storeInterceptor.sent.Load()+storeInterceptor.failed.Load())
}

func TestRetries(t *testing.T) {
Expand DownExpand Up@@ -394,9 +393,7 @@ func TestRetries(t *testing.T) {
require.NoError(t, err)
}

// Start two notifiers.
const notifiers = 2
mgr.Run(ctx, notifiers)
mgr.Run(ctx)

// then
require.Eventually(t, func() bool {
Expand Down
15 changes: 1 addition & 14 deletionscodersdk/deployment.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -467,7 +467,6 @@ type NotificationsConfig struct {
StoreSyncBufferSize serpent.Int64 `json:"sync_buffer_size" typescript:",notnull"`

// Queue.
WorkerCount serpent.Int64 `json:"worker_count"`
LeasePeriod serpent.Duration `json:"lease_period"`
LeaseCount serpent.Int64 `json:"lease_count"`
FetchInterval serpent.Duration `json:"fetch_interval"`
Expand DownExpand Up@@ -2196,18 +2195,6 @@ Write out the current server config as YAML to stdout.`,
YAML: "store-sync-buffer-size",
Hidden: true, // Hidden because most operators should not need to modify this.
},
{
Name: "Notifications: Worker Count",
Description: "How many workers should be processing messages in the queue; increase this count if notifications " +
"are not being processed fast enough.",
Flag: "notifications-worker-count",
Env: "CODER_NOTIFICATIONS_WORKER_COUNT",
Value: &c.Notifications.WorkerCount,
Default: "2",
Group: &deploymentGroupNotifications,
YAML: "worker-count",
Hidden: true, // Hidden because most operators should not need to modify this.
},
{
Name: "Notifications: Lease Period",
Description: "How long a notifier should lease a message. This is effectively how long a notification is 'owned' " +
Expand All@@ -2230,7 +2217,7 @@ Write out the current server config as YAML to stdout.`,
Flag: "notifications-lease-count",
Env: "CODER_NOTIFICATIONS_LEASE_COUNT",
Value: &c.Notifications.LeaseCount,
Default: "10",
Default: "20",
Group: &deploymentGroupNotifications,
YAML: "lease-count",
Hidden: true, // Hidden because most operators should not need to modify this.
Expand Down
3 changes: 1 addition & 2 deletionsdocs/api/general.md
View file
Open in desktop

Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.

Loading

[8]ページ先頭

©2009-2025 Movatter.jp