Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat: implement observability of notifications subsystem#13799

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
dannykopping merged 27 commits intomainfromdk/system-notifications-o11y
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from1 commit
Commits
Show all changes
27 commits
Select commitHold shift + click to select a range
7f60c0f
Implement observability of notification subsystem
dannykoppingJul 3, 2024
d62d704
make lint
dannykoppingJul 5, 2024
96dac65
make gen
dannykoppingJul 5, 2024
130de49
make fmt
dannykoppingJul 8, 2024
e868752
Small fixes
dannykoppingJul 8, 2024
cee93cb
Review comments
dannykoppingJul 8, 2024
387b557
Apply suggestions from code review
dannykoppingJul 8, 2024
114797d
Correcting query
dannykoppingJul 8, 2024
5ff29c0
Merge branch 'main' of github.com:/coder/coder into dk/system-notific…
dannykoppingJul 9, 2024
88451a1
Only return UUID from EnqueueNotificationMessage
dannykoppingJul 9, 2024
09f7305
Review feedback
dannykoppingJul 9, 2024
91e2a23
Minor fixups
dannykoppingJul 9, 2024
9f1d6b3
Revert hack, no output param needed
dannykoppingJul 10, 2024
15c4537
Small touch-ups
dannykoppingJul 10, 2024
53ecad4
Merge branch 'main' of https://github.com/coder/coder into dk/system-…
dannykoppingJul 10, 2024
bc2a4cb
Merge branch 'main' of https://github.com/coder/coder into dk/system-…
dannykoppingJul 10, 2024
716e591
Harden tests, fail early
dannykoppingJul 10, 2024
d408ed2
make fmt
dannykoppingJul 10, 2024
2b9eec3
Restoring deleted line
dannykoppingJul 10, 2024
4211c84
Comments
dannykoppingJul 10, 2024
24417c5
Lock before modification
dannykoppingJul 10, 2024
72bb1be
Remove TestNotifierPaused's unnecessarily fast fetch interval
dannykoppingJul 10, 2024
bfca2c1
Merge branch 'main' of https://github.com/coder/coder into dk/system-…
dannykoppingJul 10, 2024
6602682
Rename migration after numbering conflict
dannykoppingJul 10, 2024
00633a1
Small fixes
dannykoppingJul 11, 2024
f454184
Merge branch 'main' of https://github.com/coder/coder into dk/system-…
dannykoppingJul 11, 2024
84d07d4
Logging improvement
dannykoppingJul 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
NextNext commit
Implement observability of notification subsystem
Minor refactoring to make testing easierSigned-off-by: Danny Kopping <danny@coder.com>
  • Loading branch information
@dannykopping
dannykopping committedJul 8, 2024
commit7f60c0faca328d10af48aa643d86f0b37d277d4b
3 changes: 2 additions & 1 deletioncli/server.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -983,6 +983,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
)
if experiments.Enabled(codersdk.ExperimentNotifications) {
cfg := options.DeploymentValues.Notifications
metrics := notifications.NewMetrics(options.PrometheusRegistry)

// The enqueuer is responsible for enqueueing notifications to the given store.
enqueuer, err := notifications.NewStoreEnqueuer(cfg, options.Database, templateHelpers(options), logger.Named("notifications.enqueuer"))
Expand All@@ -994,7 +995,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
// The notification manager is responsible for:
// - creating notifiers and managing their lifecycles (notifiers are responsible for dequeueing/sending notifications)
// - keeping the store updated with status updates
notificationsManager, err = notifications.NewManager(cfg, options.Database, logger.Named("notifications.manager"))
notificationsManager, err = notifications.NewManager(cfg, options.Database,metrics,logger.Named("notifications.manager"))
if err != nil {
return xerrors.Errorf("failed to instantiate notification manager: %w", err)
}
Expand Down
21 changes: 13 additions & 8 deletionscoderd/database/dbmem/dbmem.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -929,15 +929,20 @@ func (q *FakeQuerier) AcquireNotificationMessages(_ context.Context, arg databas
return nil, err
}

// Shift the first "Count" notifications off the slice (FIFO).
sz := len(q.notificationMessages)
if sz > int(arg.Count) {
sz = int(arg.Count)
}

list := q.notificationMessages[:sz]
q.notificationMessages = q.notificationMessages[sz:]

q.mutex.Lock()
defer q.mutex.Unlock()

var out []database.AcquireNotificationMessagesRow
for _, nm := range q.notificationMessages {
if len(out) >= int(arg.Count) {
break
}

for _, nm := range list {
acquirableStatuses := []database.NotificationMessageStatus{database.NotificationMessageStatusPending, database.NotificationMessageStatusTemporaryFailure}
if !slices.Contains(acquirableStatuses, nm.Status) {
continue
Expand All@@ -953,9 +958,9 @@ func (q *FakeQuerier) AcquireNotificationMessages(_ context.Context, arg databas
ID: nm.ID,
Payload: nm.Payload,
Method: nm.Method,
CreatedBy: nm.CreatedBy,
TitleTemplate: "This is a title with {{.Labels.variable}}",
BodyTemplate: "This is a body with {{.Labels.variable}}",
TemplateID: nm.NotificationTemplateID,
})
}

Expand DownExpand Up@@ -1229,15 +1234,15 @@ func (*FakeQuerier) BulkMarkNotificationMessagesFailed(_ context.Context, arg da
if err != nil {
return 0, err
}
return-1, nil
returnint64(len(arg.IDs)), nil
}

func (*FakeQuerier) BulkMarkNotificationMessagesSent(_ context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) {
err := validateDatabaseType(arg)
if err != nil {
return 0, err
}
return-1, nil
returnint64(len(arg.IDs)), nil
}

func (*FakeQuerier) CleanTailnetCoordinators(_ context.Context) error {
Expand Down
3 changes: 2 additions & 1 deletioncoderd/database/dump.sql
View file
Open in desktop

Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.

View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
ALTER TABLE notification_messages
DROP COLUMN IF EXISTS queued_seconds;
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
ALTERTABLE notification_messages
ADD COLUMN queued_seconds FLOATNULL;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Nit: The namequeued_seconds implies an integer, consider changing the type or the name.

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I don't think it necessarily implies integer;time.Duration'sSeconds() returns a float64.

Copy link
Member

@mafredrimafredriJul 8, 2024
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Fair enough 😄.

In the DB, usually it means seconds:

    lifetime_seconds bigint DEFAULT 86400 NOT NULL,    timeout_seconds integer NOT NULL    connection_timeout_seconds integer DEFAULT 0 NOT NULL,

But fine to keep as float if we care about the precision (although I'd prefer to multiply the number to required precision in int).

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I suppose I don't strictlyneed to keep sub-second precision on the length of time messages were queued for.
I'll switch tointeger 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This is backing a Prometheus metric, which are always floats, and by convention end in the SI unit, which for time is seconds. It implies no precision.

This should be a float: there just isn't any compelling reason to pre-judge the required precision. This is not a banking application, nor do we need to worry about the relative speed of integer vs floating point arithmetic.

The namequeued_seconds is appropriate.

dannykopping reacted with thumbs up emoji
1 change: 1 addition & 0 deletionscoderd/database/models.go
View file
Open in desktop

Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.

22 changes: 15 additions & 7 deletionscoderd/database/queries.sql.go
View file
Open in desktop

Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.

9 changes: 6 additions & 3 deletionscoderd/database/queries/notifications.sql
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -36,7 +36,8 @@ RETURNING *;
WITH acquired AS (
UPDATE
notification_messages
SET updated_at = NOW(),
SET queued_seconds = GREATEST(0, EXTRACT(EPOCH FROM (NOW() - updated_at)))::FLOAT,
updated_at = NOW(),
status = 'leased'::notification_message_status,
status_reason = 'Leased by notifier ' || sqlc.arg('notifier_id')::uuid,
leased_until = NOW() + CONCAT(sqlc.arg('lease_seconds')::int, ' seconds')::interval
Expand DownExpand Up@@ -78,8 +79,10 @@ SELECT
nm.id,
nm.payload,
nm.method,
nm.created_by,
nm.attempt_count::int AS attempt_count,
nm.queued_seconds::float AS queued_seconds,
-- template
nt.id AS template_id,
nt.title_template,
nt.body_template
FROM acquired nm
Expand DownExpand Up@@ -111,7 +114,7 @@ SET updated_at = new_values.sent_at,
status_reason = NULL,
leased_until = NULL,
next_retry_after = NULL
FROM (SELECT UNNEST(@ids::uuid[]) AS id,
FROM (SELECT UNNEST(@ids::uuid[])AS id,
UNNEST(@sent_ats::timestamptz[]) AS sent_at)
AS new_values
WHERE notification_messages.id = new_values.id;
Expand Down
49 changes: 25 additions & 24 deletionscoderd/notifications/manager.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -43,6 +43,9 @@ type Manager struct {

notifier *notifier
handlers map[database.NotificationMethod]Handler
method database.NotificationMethod

metrics *Metrics

success, failure chan dispatchResult

Expand All@@ -56,7 +59,16 @@ type Manager struct {
//
// helpers is a map of template helpers which are used to customize notification messages to use global settings like
// access URL etc.
func NewManager(cfg codersdk.NotificationsConfig, store Store, log slog.Logger) (*Manager, error) {
func NewManager(cfg codersdk.NotificationsConfig, store Store, metrics *Metrics, log slog.Logger) (*Manager, error) {
if metrics == nil {
panic("nil metrics passed to notifications manager")
}

method, err := dispatchMethodFromCfg(cfg)
if err != nil {
return nil, err
}

// If dispatch timeout exceeds lease period, it is possible that messages can be delivered in duplicate because the
// lease can expire before the notifier gives up on the dispatch, which results in the message becoming eligible for
// being re-acquired.
Expand All@@ -78,6 +90,9 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, log slog.Logger)
success: make(chan dispatchResult, cfg.StoreSyncBufferSize),
failure: make(chan dispatchResult, cfg.StoreSyncBufferSize),

metrics: metrics,
method: method,

stop: make(chan any),
done: make(chan any),

Expand DownExpand Up@@ -137,7 +152,7 @@ func (m *Manager) loop(ctx context.Context) error {
var eg errgroup.Group

// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers)
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.method, m.metrics)
eg.Go(func() error {
return m.notifier.run(ctx, m.success, m.failure)
})
Expand DownExpand Up@@ -171,12 +186,12 @@ func (m *Manager) loop(ctx context.Context) error {
if len(m.success)+len(m.failure) > 0 {
m.log.Warn(ctx, "flushing buffered updates before stop",
slog.F("success_count", len(m.success)), slog.F("failure_count", len(m.failure)))
m.bulkUpdate(ctx)
m.syncUpdates(ctx)
m.log.Warn(ctx, "flushing updates done")
}
return nil
case <-tick.C:
m.bulkUpdate(ctx)
m.syncUpdates(ctx)
}
}
})
Expand All@@ -194,8 +209,8 @@ func (m *Manager) BufferedUpdatesCount() (success int, failure int) {
return len(m.success), len(m.failure)
}

//bulkUpdate updates messages in the store based on the given successful and failed message dispatch results.
func (m *Manager)bulkUpdate(ctx context.Context) {
//syncUpdates updates messages in the store based on the given successful and failed message dispatch results.
func (m *Manager)syncUpdates(ctx context.Context) {
select {
case <-ctx.Done():
return
Expand All@@ -205,6 +220,10 @@ func (m *Manager) bulkUpdate(ctx context.Context) {
nSuccess := len(m.success)
nFailure := len(m.failure)

defer func() {
m.metrics.PendingUpdates.Set(float64(len(m.success) + len(m.failure)))
}()

// Nothing to do.
if nSuccess+nFailure == 0 {
return
Expand DownExpand Up@@ -347,21 +366,3 @@ type dispatchResult struct {
err error
retryable bool
}

func newSuccessfulDispatch(notifier, msg uuid.UUID) dispatchResult {
return dispatchResult{
notifier: notifier,
msg: msg,
ts: time.Now(),
}
}

func newFailedDispatch(notifier, msg uuid.UUID, err error, retryable bool) dispatchResult {
return dispatchResult{
notifier: notifier,
msg: msg,
ts: time.Now(),
err: err,
retryable: retryable,
}
}
12 changes: 6 additions & 6 deletionscoderd/notifications/manager_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -35,13 +35,13 @@ func TestBufferedUpdates(t *testing.T) {
}

ctx, logger, db := setup(t)
interceptor := &bulkUpdateInterceptor{Store: db}
interceptor := &syncInterceptor{Store: db}
santa := &santaHandler{}

cfg := defaultNotificationsConfig(database.NotificationMethodSmtp)
cfg.StoreSyncInterval = serpent.Duration(time.Hour) // Ensure we don't sync the store automatically.

mgr, err := notifications.NewManager(cfg, interceptor, logger.Named("notifications-manager"))
mgr, err := notifications.NewManager(cfg, interceptor,createMetrics(),logger.Named("notifications-manager"))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
database.NotificationMethodSmtp: santa,
Expand DownExpand Up@@ -153,7 +153,7 @@ func TestStopBeforeRun(t *testing.T) {

ctx := context.Background()
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true, IgnoredErrorIs: []error{}}).Leveled(slog.LevelDebug)
mgr, err := notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp), dbmem.New(), logger.Named("notifications-manager"))
mgr, err := notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp), dbmem.New(),createMetrics(),logger.Named("notifications-manager"))
require.NoError(t, err)

// Call stop before notifier is started with Run().
Expand All@@ -163,15 +163,15 @@ func TestStopBeforeRun(t *testing.T) {
}, testutil.WaitShort, testutil.IntervalFast)
}

typebulkUpdateInterceptor struct {
typesyncInterceptor struct {
notifications.Store

sent atomic.Int32
failed atomic.Int32
err atomic.Value
}

func (b *bulkUpdateInterceptor) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) {
func (b *syncInterceptor) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) {
updated, err := b.Store.BulkMarkNotificationMessagesSent(ctx, arg)
b.sent.Add(int32(updated))
if err != nil {
Expand All@@ -180,7 +180,7 @@ func (b *bulkUpdateInterceptor) BulkMarkNotificationMessagesSent(ctx context.Con
return updated, err
}

func (b *bulkUpdateInterceptor) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) {
func (b *syncInterceptor) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) {
updated, err := b.Store.BulkMarkNotificationMessagesFailed(ctx, arg)
b.failed.Add(int32(updated))
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletionscoderd/notifications/method.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
package notifications

import (
"golang.org/x/xerrors"

"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/codersdk"
)

func dispatchMethodFromCfg(cfg codersdk.NotificationsConfig) (database.NotificationMethod, error) {
var method database.NotificationMethod
if err := method.Scan(cfg.Method.String()); err != nil {
return "", xerrors.Errorf("given notification method %q is invalid", cfg.Method)
}
return method, nil
}
Loading

[8]ページ先頭

©2009-2025 Movatter.jp