- Notifications
You must be signed in to change notification settings - Fork927
feat: add notification deduplication trigger#14172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.
Uh oh!
There was an error while loading.Please reload this page.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
DROP TRIGGER IF EXISTS update_notification_message_dedupe_hash ON notification_messages; | ||
DROP FUNCTION IF EXISTS compute_notification_message_dedupe_hash(); | ||
ALTER TABLE IF EXISTS notification_messages | ||
DROP COLUMN IF EXISTS dedupe_hash; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
-- Add a column to store the hash. | ||
ALTER TABLE IF EXISTS notification_messages | ||
ADD COLUMN IF NOT EXISTS dedupe_hash TEXT NULL; | ||
COMMENT ON COLUMN notification_messages.dedupe_hash IS 'Auto-generated by insert/update trigger, used to prevent duplicate notifications from being enqueued on the same day'; | ||
-- Ensure that multiple notifications with identical hashes cannot be inserted into the table. | ||
CREATE UNIQUE INDEX ON notification_messages (dedupe_hash); | ||
-- Computes a hash from all unique messages fields and the current day; this will help prevent duplicate messages from being sent within the same day. | ||
-- It is possible that a message could be sent at 23:59:59 and again at 00:00:00, but this should be good enough for now. | ||
-- This could have been a unique index, but we cannot immutably create an index on a timestamp with a timezone. | ||
CREATE OR REPLACE FUNCTION compute_notification_message_dedupe_hash() RETURNS TRIGGER AS | ||
$$ | ||
BEGIN | ||
NEW.dedupe_hash := MD5(CONCAT_WS(':', | ||
NEW.notification_template_id, | ||
NEW.user_id, | ||
NEW.method, | ||
NEW.payload::text, | ||
ARRAY_TO_STRING(NEW.targets, ','), | ||
DATE_TRUNC('day', NEW.created_at AT TIME ZONE 'UTC')::text | ||
)); | ||
RETURN NEW; | ||
END; | ||
$$ LANGUAGE plpgsql; | ||
COMMENT ON FUNCTION compute_notification_message_dedupe_hash IS 'Computes a unique hash which will be used to prevent duplicate messages from being enqueued on the same day'; | ||
CREATE TRIGGER update_notification_message_dedupe_hash | ||
BEFORE INSERT OR UPDATE | ||
ON notification_messages | ||
FOR EACH ROW | ||
EXECUTE FUNCTION compute_notification_message_dedupe_hash(); |
Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.
Uh oh!
There was an error while loading.Please reload this page.
Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.
Uh oh!
There was an error while loading.Please reload this page.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -13,14 +13,15 @@ WHERE nt.id = @notification_template_id | ||
AND u.id = @user_id; | ||
-- name: EnqueueNotificationMessage :exec | ||
INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by, created_at) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. I added | ||
VALUES (@id, | ||
@notification_template_id, | ||
@user_id, | ||
@method::notification_method, | ||
@payload::jsonb, | ||
@targets, | ||
@created_by, | ||
@created_at); | ||
-- Acquires the lease for a given count of notification messages, to enable concurrent dequeuing and subsequent sending. | ||
-- Only rows that aren't already leased (or ones which are leased but have exceeded their lease period) are returned. | ||
Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.
Uh oh!
There was an error while loading.Please reload this page.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -10,14 +10,19 @@ import ( | ||
"golang.org/x/xerrors" | ||
"cdr.dev/slog" | ||
"github.com/coder/quartz" | ||
"github.com/coder/coder/v2/coderd/database" | ||
"github.com/coder/coder/v2/coderd/database/dbtime" | ||
"github.com/coder/coder/v2/coderd/notifications/render" | ||
"github.com/coder/coder/v2/coderd/notifications/types" | ||
"github.com/coder/coder/v2/codersdk" | ||
) | ||
var ( | ||
ErrCannotEnqueueDisabledNotification = xerrors.New("user has disabled this notification") | ||
ErrDuplicate = xerrors.New("duplicate notification") | ||
) | ||
type StoreEnqueuer struct { | ||
store Store | ||
@@ -27,10 +32,12 @@ type StoreEnqueuer struct { | ||
// helpers holds a map of template funcs which are used when rendering templates. These need to be passed in because | ||
// the template funcs will return values which are inappropriately encapsulated in this struct. | ||
helpers template.FuncMap | ||
// Used to manipulate time in tests. | ||
clock quartz.Clock | ||
} | ||
// NewStoreEnqueuer creates an Enqueuer implementation which can persist notification messages in the store. | ||
func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, log slog.Logger, clock quartz.Clock) (*StoreEnqueuer, error) { | ||
var method database.NotificationMethod | ||
if err := method.Scan(cfg.Method.String()); err != nil { | ||
return nil, xerrors.Errorf("given notification method %q is invalid", cfg.Method) | ||
@@ -41,6 +48,7 @@ func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers tem | ||
log: log, | ||
defaultMethod: method, | ||
helpers: helpers, | ||
clock: clock, | ||
}, nil | ||
} | ||
@@ -81,6 +89,7 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI | ||
Payload: input, | ||
Targets: targets, | ||
CreatedBy: createdBy, | ||
CreatedAt: dbtime.Time(s.clock.Now().UTC()), | ||
}) | ||
if err != nil { | ||
// We have a trigger on the notification_messages table named `inhibit_enqueue_if_disabled` which prevents messages | ||
@@ -92,6 +101,13 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI | ||
return nil, ErrCannotEnqueueDisabledNotification | ||
} | ||
// If the enqueue fails due to a dedupe hash conflict, this means that a notification has already been enqueued | ||
// today with identical properties. It's far simpler to prevent duplicate sends in this central manner, rather than | ||
// having each notification enqueue handle its own logic. | ||
if database.IsUniqueViolation(err, database.UniqueNotificationMessagesDedupeHashIndex) { | ||
return nil, ErrDuplicate | ||
johnstcn marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
} | ||
s.log.Warn(ctx, "failed to enqueue notification", slog.F("template_id", templateID), slog.F("input", input), slog.Error(err)) | ||
return nil, xerrors.Errorf("enqueue notification: %w", err) | ||
} | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -12,13 +12,15 @@ import ( | ||
"github.com/stretchr/testify/require" | ||
"golang.org/x/xerrors" | ||
"github.com/coder/quartz" | ||
"github.com/coder/serpent" | ||
dannykopping marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
"github.com/coder/coder/v2/coderd/database" | ||
"github.com/coder/coder/v2/coderd/database/dbgen" | ||
"github.com/coder/coder/v2/coderd/notifications" | ||
"github.com/coder/coder/v2/coderd/notifications/dispatch" | ||
"github.com/coder/coder/v2/coderd/notifications/types" | ||
"github.com/coder/coder/v2/testutil" | ||
) | ||
func TestBufferedUpdates(t *testing.T) { | ||
@@ -39,7 +41,7 @@ func TestBufferedUpdates(t *testing.T) { | ||
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{ | ||
database.NotificationMethodSmtp: santa, | ||
}) | ||
enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, defaultHelpers(), logger.Named("notifications-enqueuer"), quartz.NewReal()) | ||
require.NoError(t, err) | ||
user := dbgen.User(t, db, database.User{}) | ||
@@ -127,7 +129,7 @@ func TestBuildPayload(t *testing.T) { | ||
} | ||
}) | ||
enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, helpers, logger.Named("notifications-enqueuer"), quartz.NewReal()) | ||
require.NoError(t, err) | ||
// WHEN: a notification is enqueued | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -13,6 +13,8 @@ import ( | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"github.com/coder/quartz" | ||
"github.com/coder/serpent" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Another instance of import grouping, this is the job of the tooling tbh (and it's failing), so up to you if you want to fix it 😄. (There are actually a few more of these too but I'll leave those uncommented.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. I'll leave this for now; we need the tooling to be updated as you suggest. | ||
"github.com/coder/coder/v2/coderd/database" | ||
@@ -61,7 +63,7 @@ func TestMetrics(t *testing.T) { | ||
method: handler, | ||
}) | ||
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) | ||
require.NoError(t, err) | ||
user := createSampleUser(t, store) | ||
@@ -228,7 +230,7 @@ func TestPendingUpdatesMetric(t *testing.T) { | ||
method: handler, | ||
}) | ||
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) | ||
require.NoError(t, err) | ||
user := createSampleUser(t, store) | ||
@@ -305,7 +307,7 @@ func TestInflightDispatchesMetric(t *testing.T) { | ||
method: delayer, | ||
}) | ||
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) | ||
require.NoError(t, err) | ||
user := createSampleUser(t, store) | ||
@@ -384,7 +386,7 @@ func TestCustomMethodMetricCollection(t *testing.T) { | ||
customMethod: webhookHandler, | ||
}) | ||
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal()) | ||
require.NoError(t, err) | ||
user := createSampleUser(t, store) | ||
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.