Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb2dab33

Browse files
authored
feat: implement observability of notifications subsystem (#13799)
1 parenta6d66cc commitb2dab33

22 files changed

+769
-186
lines changed

‎cli/server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -983,6 +983,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
983983
)
984984
ifexperiments.Enabled(codersdk.ExperimentNotifications) {
985985
cfg:=options.DeploymentValues.Notifications
986+
metrics:=notifications.NewMetrics(options.PrometheusRegistry)
986987

987988
// The enqueuer is responsible for enqueueing notifications to the given store.
988989
enqueuer,err:=notifications.NewStoreEnqueuer(cfg,options.Database,templateHelpers(options),logger.Named("notifications.enqueuer"))
@@ -994,7 +995,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
994995
// The notification manager is responsible for:
995996
// - creating notifiers and managing their lifecycles (notifiers are responsible for dequeueing/sending notifications)
996997
// - keeping the store updated with status updates
997-
notificationsManager,err=notifications.NewManager(cfg,options.Database,logger.Named("notifications.manager"))
998+
notificationsManager,err=notifications.NewManager(cfg,options.Database,metrics,logger.Named("notifications.manager"))
998999
iferr!=nil {
9991000
returnxerrors.Errorf("failed to instantiate notification manager: %w",err)
10001001
}

‎coderd/database/dbauthz/dbauthz.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1143,9 +1143,9 @@ func (q *querier) DeleteWorkspaceAgentPortSharesByTemplate(ctx context.Context,
11431143
returnq.db.DeleteWorkspaceAgentPortSharesByTemplate(ctx,templateID)
11441144
}
11451145

1146-
func (q*querier)EnqueueNotificationMessage(ctx context.Context,arg database.EnqueueNotificationMessageParams)(database.NotificationMessage,error) {
1146+
func (q*querier)EnqueueNotificationMessage(ctx context.Context,arg database.EnqueueNotificationMessageParams)error {
11471147
iferr:=q.authorizeContext(ctx,policy.ActionCreate,rbac.ResourceSystem);err!=nil {
1148-
returndatabase.NotificationMessage{},err
1148+
returnerr
11491149
}
11501150
returnq.db.EnqueueNotificationMessage(ctx,arg)
11511151
}

‎coderd/database/dbmem/dbmem.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -935,12 +935,17 @@ func (q *FakeQuerier) AcquireNotificationMessages(_ context.Context, arg databas
935935
q.mutex.Lock()
936936
deferq.mutex.Unlock()
937937

938-
varout []database.AcquireNotificationMessagesRow
939-
for_,nm:=rangeq.notificationMessages {
940-
iflen(out)>=int(arg.Count) {
941-
break
942-
}
938+
// Shift the first "Count" notifications off the slice (FIFO).
939+
sz:=len(q.notificationMessages)
940+
ifsz>int(arg.Count) {
941+
sz=int(arg.Count)
942+
}
943943

944+
list:=q.notificationMessages[:sz]
945+
q.notificationMessages=q.notificationMessages[sz:]
946+
947+
varout []database.AcquireNotificationMessagesRow
948+
for_,nm:=rangelist {
944949
acquirableStatuses:= []database.NotificationMessageStatus{database.NotificationMessageStatusPending,database.NotificationMessageStatusTemporaryFailure}
945950
if!slices.Contains(acquirableStatuses,nm.Status) {
946951
continue
@@ -956,9 +961,9 @@ func (q *FakeQuerier) AcquireNotificationMessages(_ context.Context, arg databas
956961
ID:nm.ID,
957962
Payload:nm.Payload,
958963
Method:nm.Method,
959-
CreatedBy:nm.CreatedBy,
960964
TitleTemplate:"This is a title with {{.Labels.variable}}",
961965
BodyTemplate:"This is a body with {{.Labels.variable}}",
966+
TemplateID:nm.NotificationTemplateID,
962967
})
963968
}
964969

@@ -1815,10 +1820,10 @@ func (q *FakeQuerier) DeleteWorkspaceAgentPortSharesByTemplate(_ context.Context
18151820
returnnil
18161821
}
18171822

1818-
func (q*FakeQuerier)EnqueueNotificationMessage(_ context.Context,arg database.EnqueueNotificationMessageParams)(database.NotificationMessage,error) {
1823+
func (q*FakeQuerier)EnqueueNotificationMessage(_ context.Context,arg database.EnqueueNotificationMessageParams)error {
18191824
err:=validateDatabaseType(arg)
18201825
iferr!=nil {
1821-
returndatabase.NotificationMessage{},err
1826+
returnerr
18221827
}
18231828

18241829
q.mutex.Lock()
@@ -1827,7 +1832,7 @@ func (q *FakeQuerier) EnqueueNotificationMessage(_ context.Context, arg database
18271832
varpayload types.MessagePayload
18281833
err=json.Unmarshal(arg.Payload,&payload)
18291834
iferr!=nil {
1830-
returndatabase.NotificationMessage{},err
1835+
returnerr
18311836
}
18321837

18331838
nm:= database.NotificationMessage{
@@ -1845,7 +1850,7 @@ func (q *FakeQuerier) EnqueueNotificationMessage(_ context.Context, arg database
18451850

18461851
q.notificationMessages=append(q.notificationMessages,nm)
18471852

1848-
returnnm,err
1853+
returnerr
18491854
}
18501855

18511856
func (q*FakeQuerier)FavoriteWorkspace(_ context.Context,arg uuid.UUID)error {

‎coderd/database/dbmetrics/dbmetrics.go

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.

‎coderd/database/dbmock/dbmock.go

Lines changed: 3 additions & 4 deletions
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.

‎coderd/database/dump.sql

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTERTABLE notification_messages
2+
DROP COLUMN IF EXISTS queued_seconds;
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTERTABLE notification_messages
2+
ADD COLUMN queued_seconds FLOATNULL;

‎coderd/database/models.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.

‎coderd/database/querier.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.

‎coderd/database/queries.sql.go

Lines changed: 23 additions & 31 deletions
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.

‎coderd/database/queries/notifications.sql

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,15 @@ FROM notification_templates nt,
1010
WHEREnt.id= @notification_template_id
1111
ANDu.id= @user_id;
1212

13-
-- name: EnqueueNotificationMessage :one
13+
-- name: EnqueueNotificationMessage :exec
1414
INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by)
1515
VALUES (@id,
1616
@notification_template_id,
1717
@user_id,
1818
@method::notification_method,
1919
@payload::jsonb,
2020
@targets,
21-
@created_by)
22-
RETURNING*;
21+
@created_by);
2322

2423
-- Acquires the lease for a given count of notification messages, to enable concurrent dequeuing and subsequent sending.
2524
-- Only rows that aren't already leased (or ones which are leased but have exceeded their lease period) are returned.
@@ -36,7 +35,8 @@ RETURNING *;
3635
WITH acquiredAS (
3736
UPDATE
3837
notification_messages
39-
SET updated_at= NOW(),
38+
SET queued_seconds= GREATEST(0, EXTRACT(EPOCHFROM (NOW()- updated_at)))::FLOAT,
39+
updated_at= NOW(),
4040
status='leased'::notification_message_status,
4141
status_reason='Leased by notifier'||sqlc.arg('notifier_id')::uuid,
4242
leased_until= NOW()+ CONCAT(sqlc.arg('lease_seconds')::int,' seconds')::interval
@@ -78,16 +78,19 @@ SELECT
7878
nm.id,
7979
nm.payload,
8080
nm.method,
81-
nm.created_by,
81+
nm.attempt_count::intAS attempt_count,
82+
nm.queued_seconds::floatAS queued_seconds,
8283
-- template
84+
nt.idAS template_id,
8385
nt.title_template,
8486
nt.body_template
8587
FROM acquired nm
8688
JOIN notification_templates ntONnm.notification_template_id=nt.id;
8789

8890
-- name: BulkMarkNotificationMessagesFailed :execrows
8991
UPDATE notification_messages
90-
SET updated_at=subquery.failed_at,
92+
SET queued_seconds=0,
93+
updated_at=subquery.failed_at,
9194
attempt_count= attempt_count+1,
9295
status= CASE
9396
WHEN attempt_count+1< @max_attempts::int THENsubquery.status
@@ -105,13 +108,14 @@ WHERE notification_messages.id = subquery.id;
105108

106109
-- name: BulkMarkNotificationMessagesSent :execrows
107110
UPDATE notification_messages
108-
SET updated_at=new_values.sent_at,
111+
SET queued_seconds=0,
112+
updated_at=new_values.sent_at,
109113
attempt_count= attempt_count+1,
110114
status='sent'::notification_message_status,
111115
status_reason=NULL,
112116
leased_until=NULL,
113117
next_retry_after=NULL
114-
FROM (SELECT UNNEST(@ids::uuid[])AS id,
118+
FROM (SELECT UNNEST(@ids::uuid[])AS id,
115119
UNNEST(@sent_ats::timestamptz[])AS sent_at)
116120
AS new_values
117121
WHEREnotification_messages.id=new_values.id;

‎coderd/notifications/enqueuer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI
5959
}
6060

6161
id:=uuid.New()
62-
msg,err:=s.store.EnqueueNotificationMessage(ctx, database.EnqueueNotificationMessageParams{
62+
err=s.store.EnqueueNotificationMessage(ctx, database.EnqueueNotificationMessageParams{
6363
ID:id,
6464
UserID:userID,
6565
NotificationTemplateID:templateID,
@@ -73,7 +73,7 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI
7373
returnnil,xerrors.Errorf("enqueue notification: %w",err)
7474
}
7575

76-
s.log.Debug(ctx,"enqueued notification",slog.F("msg_id",msg.ID))
76+
s.log.Debug(ctx,"enqueued notification",slog.F("msg_id",id))
7777
return&id,nil
7878
}
7979

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp