Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite324a42

Browse files
committed
Implement observability of notification subsystem
Minor refactoring to make testing easierSigned-off-by: Danny Kopping <danny@coder.com>
1 parentbc4125b commite324a42

17 files changed

+634
-137
lines changed

‎cli/server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -983,6 +983,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
983983
)
984984
ifexperiments.Enabled(codersdk.ExperimentNotifications) {
985985
cfg:=options.DeploymentValues.Notifications
986+
metrics:=notifications.NewMetrics(options.PrometheusRegistry)
986987

987988
// The enqueuer is responsible for enqueueing notifications to the given store.
988989
enqueuer,err:=notifications.NewStoreEnqueuer(cfg,options.Database,templateHelpers(options),logger.Named("notifications.enqueuer"))
@@ -994,7 +995,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
994995
// The notification manager is responsible for:
995996
// - creating notifiers and managing their lifecycles (notifiers are responsible for dequeueing/sending notifications)
996997
// - keeping the store updated with status updates
997-
notificationsManager,err=notifications.NewManager(cfg,options.Database,logger.Named("notifications.manager"))
998+
notificationsManager,err=notifications.NewManager(cfg,options.Database,metrics,logger.Named("notifications.manager"))
998999
iferr!=nil {
9991000
returnxerrors.Errorf("failed to instantiate notification manager: %w",err)
10001001
}

‎coderd/database/dbmem/dbmem.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -929,15 +929,20 @@ func (q *FakeQuerier) AcquireNotificationMessages(_ context.Context, arg databas
929929
returnnil,err
930930
}
931931

932+
// Shift the first "Count" notifications off the slice (FIFO).
933+
sz:=len(q.notificationMessages)
934+
ifsz>int(arg.Count) {
935+
sz=int(arg.Count)
936+
}
937+
938+
list:=q.notificationMessages[:sz]
939+
q.notificationMessages=q.notificationMessages[sz:]
940+
932941
q.mutex.Lock()
933942
deferq.mutex.Unlock()
934943

935944
varout []database.AcquireNotificationMessagesRow
936-
for_,nm:=rangeq.notificationMessages {
937-
iflen(out)>=int(arg.Count) {
938-
break
939-
}
940-
945+
for_,nm:=rangelist {
941946
acquirableStatuses:= []database.NotificationMessageStatus{database.NotificationMessageStatusPending,database.NotificationMessageStatusTemporaryFailure}
942947
if!slices.Contains(acquirableStatuses,nm.Status) {
943948
continue
@@ -953,9 +958,9 @@ func (q *FakeQuerier) AcquireNotificationMessages(_ context.Context, arg databas
953958
ID:nm.ID,
954959
Payload:nm.Payload,
955960
Method:nm.Method,
956-
CreatedBy:nm.CreatedBy,
957961
TitleTemplate:"This is a title with {{.Labels.variable}}",
958962
BodyTemplate:"This is a body with {{.Labels.variable}}",
963+
TemplateID:nm.NotificationTemplateID,
959964
})
960965
}
961966

@@ -1229,15 +1234,15 @@ func (*FakeQuerier) BulkMarkNotificationMessagesFailed(_ context.Context, arg da
12291234
iferr!=nil {
12301235
return0,err
12311236
}
1232-
return-1,nil
1237+
returnint64(len(arg.IDs)),nil
12331238
}
12341239

12351240
func (*FakeQuerier)BulkMarkNotificationMessagesSent(_ context.Context,arg database.BulkMarkNotificationMessagesSentParams) (int64,error) {
12361241
err:=validateDatabaseType(arg)
12371242
iferr!=nil {
12381243
return0,err
12391244
}
1240-
return-1,nil
1245+
returnint64(len(arg.IDs)),nil
12411246
}
12421247

12431248
func (*FakeQuerier)CleanTailnetCoordinators(_ context.Context)error {

‎coderd/database/dump.sql

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTERTABLE notification_messages
2+
DROP COLUMN IF EXISTS queued_seconds;
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTERTABLE notification_messages
2+
ADD COLUMN queued_seconds FLOATNULL;

‎coderd/database/models.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.

‎coderd/database/queries.sql.go

Lines changed: 15 additions & 7 deletions
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.

‎coderd/database/queries/notifications.sql

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ RETURNING *;
3636
WITH acquiredAS (
3737
UPDATE
3838
notification_messages
39-
SET updated_at= NOW(),
39+
SET queued_seconds= GREATEST(0, EXTRACT(EPOCHFROM (NOW()- updated_at)))::FLOAT,
40+
updated_at= NOW(),
4041
status='leased'::notification_message_status,
4142
status_reason='Leased by notifier'||sqlc.arg('notifier_id')::uuid,
4243
leased_until= NOW()+ CONCAT(sqlc.arg('lease_seconds')::int,' seconds')::interval
@@ -78,8 +79,10 @@ SELECT
7879
nm.id,
7980
nm.payload,
8081
nm.method,
81-
nm.created_by,
82+
nm.attempt_count::intAS attempt_count,
83+
nm.queued_seconds::floatAS queued_seconds,
8284
-- template
85+
nt.idAS template_id,
8386
nt.title_template,
8487
nt.body_template
8588
FROM acquired nm
@@ -111,7 +114,7 @@ SET updated_at = new_values.sent_at,
111114
status_reason=NULL,
112115
leased_until=NULL,
113116
next_retry_after=NULL
114-
FROM (SELECT UNNEST(@ids::uuid[])AS id,
117+
FROM (SELECT UNNEST(@ids::uuid[])AS id,
115118
UNNEST(@sent_ats::timestamptz[])AS sent_at)
116119
AS new_values
117120
WHEREnotification_messages.id=new_values.id;

‎coderd/notifications/manager.go

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ type Manager struct {
4343

4444
notifier*notifier
4545
handlersmap[database.NotificationMethod]Handler
46+
method database.NotificationMethod
47+
48+
metrics*Metrics
4649

4750
success,failurechandispatchResult
4851

@@ -56,7 +59,16 @@ type Manager struct {
5659
//
5760
// helpers is a map of template helpers which are used to customize notification messages to use global settings like
5861
// access URL etc.
59-
funcNewManager(cfg codersdk.NotificationsConfig,storeStore,log slog.Logger) (*Manager,error) {
62+
funcNewManager(cfg codersdk.NotificationsConfig,storeStore,metrics*Metrics,log slog.Logger) (*Manager,error) {
63+
ifmetrics==nil {
64+
panic("nil metrics passed to notifications manager")
65+
}
66+
67+
method,err:=dispatchMethodFromCfg(cfg)
68+
iferr!=nil {
69+
returnnil,err
70+
}
71+
6072
// If dispatch timeout exceeds lease period, it is possible that messages can be delivered in duplicate because the
6173
// lease can expire before the notifier gives up on the dispatch, which results in the message becoming eligible for
6274
// being re-acquired.
@@ -78,6 +90,9 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, log slog.Logger)
7890
success:make(chandispatchResult,cfg.StoreSyncBufferSize),
7991
failure:make(chandispatchResult,cfg.StoreSyncBufferSize),
8092

93+
metrics:metrics,
94+
method:method,
95+
8196
stop:make(chanany),
8297
done:make(chanany),
8398

@@ -137,7 +152,7 @@ func (m *Manager) loop(ctx context.Context) error {
137152
vareg errgroup.Group
138153

139154
// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
140-
m.notifier=newNotifier(m.cfg,uuid.New(),m.log,m.store,m.handlers)
155+
m.notifier=newNotifier(m.cfg,uuid.New(),m.log,m.store,m.handlers,m.method,m.metrics)
141156
eg.Go(func()error {
142157
returnm.notifier.run(ctx,m.success,m.failure)
143158
})
@@ -171,12 +186,12 @@ func (m *Manager) loop(ctx context.Context) error {
171186
iflen(m.success)+len(m.failure)>0 {
172187
m.log.Warn(ctx,"flushing buffered updates before stop",
173188
slog.F("success_count",len(m.success)),slog.F("failure_count",len(m.failure)))
174-
m.bulkUpdate(ctx)
189+
m.syncUpdates(ctx)
175190
m.log.Warn(ctx,"flushing updates done")
176191
}
177192
returnnil
178193
case<-tick.C:
179-
m.bulkUpdate(ctx)
194+
m.syncUpdates(ctx)
180195
}
181196
}
182197
})
@@ -194,8 +209,8 @@ func (m *Manager) BufferedUpdatesCount() (success int, failure int) {
194209
returnlen(m.success),len(m.failure)
195210
}
196211

197-
//bulkUpdate updates messages in the store based on the given successful and failed message dispatch results.
198-
func (m*Manager)bulkUpdate(ctx context.Context) {
212+
//syncUpdates updates messages in the store based on the given successful and failed message dispatch results.
213+
func (m*Manager)syncUpdates(ctx context.Context) {
199214
select {
200215
case<-ctx.Done():
201216
return
@@ -205,6 +220,10 @@ func (m *Manager) bulkUpdate(ctx context.Context) {
205220
nSuccess:=len(m.success)
206221
nFailure:=len(m.failure)
207222

223+
deferfunc() {
224+
m.metrics.PendingUpdates.Set(float64(len(m.success)+len(m.failure)))
225+
}()
226+
208227
// Nothing to do.
209228
ifnSuccess+nFailure==0 {
210229
return
@@ -347,21 +366,3 @@ type dispatchResult struct {
347366
errerror
348367
retryablebool
349368
}
350-
351-
funcnewSuccessfulDispatch(notifier,msg uuid.UUID)dispatchResult {
352-
returndispatchResult{
353-
notifier:notifier,
354-
msg:msg,
355-
ts:time.Now(),
356-
}
357-
}
358-
359-
funcnewFailedDispatch(notifier,msg uuid.UUID,errerror,retryablebool)dispatchResult {
360-
returndispatchResult{
361-
notifier:notifier,
362-
msg:msg,
363-
ts:time.Now(),
364-
err:err,
365-
retryable:retryable,
366-
}
367-
}

‎coderd/notifications/manager_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@ func TestBufferedUpdates(t *testing.T) {
3535
}
3636

3737
ctx,logger,db:=setup(t)
38-
interceptor:=&bulkUpdateInterceptor{Store:db}
38+
interceptor:=&syncInterceptor{Store:db}
3939
santa:=&santaHandler{}
4040

4141
cfg:=defaultNotificationsConfig(database.NotificationMethodSmtp)
4242
cfg.StoreSyncInterval=serpent.Duration(time.Hour)// Ensure we don't sync the store automatically.
4343

44-
mgr,err:=notifications.NewManager(cfg,interceptor,logger.Named("notifications-manager"))
44+
mgr,err:=notifications.NewManager(cfg,interceptor,createMetrics(),logger.Named("notifications-manager"))
4545
require.NoError(t,err)
4646
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
4747
database.NotificationMethodSmtp:santa,
@@ -153,7 +153,7 @@ func TestStopBeforeRun(t *testing.T) {
153153

154154
ctx:=context.Background()
155155
logger:=slogtest.Make(t,&slogtest.Options{IgnoreErrors:true,IgnoredErrorIs: []error{}}).Leveled(slog.LevelDebug)
156-
mgr,err:=notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp),dbmem.New(),logger.Named("notifications-manager"))
156+
mgr,err:=notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp),dbmem.New(),createMetrics(),logger.Named("notifications-manager"))
157157
require.NoError(t,err)
158158

159159
// Call stop before notifier is started with Run().
@@ -163,15 +163,15 @@ func TestStopBeforeRun(t *testing.T) {
163163
},testutil.WaitShort,testutil.IntervalFast)
164164
}
165165

166-
typebulkUpdateInterceptorstruct {
166+
typesyncInterceptorstruct {
167167
notifications.Store
168168

169169
sent atomic.Int32
170170
failed atomic.Int32
171171
err atomic.Value
172172
}
173173

174-
func (b*bulkUpdateInterceptor)BulkMarkNotificationMessagesSent(ctx context.Context,arg database.BulkMarkNotificationMessagesSentParams) (int64,error) {
174+
func (b*syncInterceptor)BulkMarkNotificationMessagesSent(ctx context.Context,arg database.BulkMarkNotificationMessagesSentParams) (int64,error) {
175175
updated,err:=b.Store.BulkMarkNotificationMessagesSent(ctx,arg)
176176
b.sent.Add(int32(updated))
177177
iferr!=nil {
@@ -180,7 +180,7 @@ func (b *bulkUpdateInterceptor) BulkMarkNotificationMessagesSent(ctx context.Con
180180
returnupdated,err
181181
}
182182

183-
func (b*bulkUpdateInterceptor)BulkMarkNotificationMessagesFailed(ctx context.Context,arg database.BulkMarkNotificationMessagesFailedParams) (int64,error) {
183+
func (b*syncInterceptor)BulkMarkNotificationMessagesFailed(ctx context.Context,arg database.BulkMarkNotificationMessagesFailedParams) (int64,error) {
184184
updated,err:=b.Store.BulkMarkNotificationMessagesFailed(ctx,arg)
185185
b.failed.Add(int32(updated))
186186
iferr!=nil {

‎coderd/notifications/method.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package notifications
2+
3+
import (
4+
"golang.org/x/xerrors"
5+
6+
"github.com/coder/coder/v2/coderd/database"
7+
"github.com/coder/coder/v2/codersdk"
8+
)
9+
10+
funcdispatchMethodFromCfg(cfg codersdk.NotificationsConfig) (database.NotificationMethod,error) {
11+
varmethod database.NotificationMethod
12+
iferr:=method.Scan(cfg.Method.String());err!=nil {
13+
return"",xerrors.Errorf("given notification method %q is invalid",cfg.Method)
14+
}
15+
returnmethod,nil
16+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp