Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit613e074

Browse files
committed
Avoid race by exposing number of pending updates
Signed-off-by: Danny Kopping <danny@coder.com>
1 parentaff9e6c commit613e074

File tree

2 files changed

+55
-30
lines changed

2 files changed

+55
-30
lines changed

‎coderd/notifications/manager.go

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ type Manager struct {
4444
notifier*notifier
4545
handlersmap[database.NotificationMethod]Handler
4646

47+
success,failurechandispatchResult
48+
4749
runOnce sync.Once
4850
stopOnce sync.Once
4951
stopchanany
@@ -67,6 +69,15 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, log slog.Logger)
6769
cfg:cfg,
6870
store:store,
6971

72+
// Buffer successful/failed notification dispatches in memory to reduce load on the store.
73+
//
74+
// We keep separate buffered for success/failure right now because the bulk updates are already a bit janky,
75+
// see BulkMarkNotificationMessagesSent/BulkMarkNotificationMessagesFailed. If we had the ability to batch updates,
76+
// like is offered in https://docs.sqlc.dev/en/stable/reference/query-annotations.html#batchmany, we'd have a cleaner
77+
// approach to this - but for now this will work fine.
78+
success:make(chandispatchResult,cfg.StoreSyncBufferSize),
79+
failure:make(chandispatchResult,cfg.StoreSyncBufferSize),
80+
7081
stop:make(chanany),
7182
done:make(chanany),
7283

@@ -123,23 +134,12 @@ func (m *Manager) loop(ctx context.Context) error {
123134
default:
124135
}
125136

126-
var (
127-
// Buffer successful/failed notification dispatches in memory to reduce load on the store.
128-
//
129-
// We keep separate buffered for success/failure right now because the bulk updates are already a bit janky,
130-
// see BulkMarkNotificationMessagesSent/BulkMarkNotificationMessagesFailed. If we had the ability to batch updates,
131-
// like is offered in https://docs.sqlc.dev/en/stable/reference/query-annotations.html#batchmany, we'd have a cleaner
132-
// approach to this - but for now this will work fine.
133-
success=make(chandispatchResult,m.cfg.StoreSyncBufferSize)
134-
failure=make(chandispatchResult,m.cfg.StoreSyncBufferSize)
135-
)
136-
137137
vareg errgroup.Group
138138

139139
// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
140140
m.notifier=newNotifier(m.cfg,uuid.New(),m.log,m.store,m.handlers)
141141
eg.Go(func()error {
142-
returnm.notifier.run(ctx,success,failure)
142+
returnm.notifier.run(ctx,m.success,m.failure)
143143
})
144144

145145
// Periodically flush notification state changes to the store.
@@ -162,21 +162,21 @@ func (m *Manager) loop(ctx context.Context) error {
162162
// TODO: mention the above tradeoff in documentation.
163163
m.log.Warn(ctx,"exiting ungracefully",slog.Error(ctx.Err()))
164164

165-
iflen(success)+len(failure)>0 {
165+
iflen(m.success)+len(m.failure)>0 {
166166
m.log.Warn(ctx,"content canceled with pending updates in buffer, these messages will be sent again after lease expires",
167-
slog.F("success_count",len(success)),slog.F("failure_count",len(failure)))
167+
slog.F("success_count",len(m.success)),slog.F("failure_count",len(m.failure)))
168168
}
169169
returnctx.Err()
170170
case<-m.stop:
171-
iflen(success)+len(failure)>0 {
171+
iflen(m.success)+len(m.failure)>0 {
172172
m.log.Warn(ctx,"flushing buffered updates before stop",
173-
slog.F("success_count",len(success)),slog.F("failure_count",len(failure)))
174-
m.bulkUpdate(ctx,success,failure)
173+
slog.F("success_count",len(m.success)),slog.F("failure_count",len(m.failure)))
174+
m.bulkUpdate(ctx)
175175
m.log.Warn(ctx,"flushing updates done")
176176
}
177177
returnnil
178178
case<-tick.C:
179-
m.bulkUpdate(ctx,success,failure)
179+
m.bulkUpdate(ctx)
180180
}
181181
}
182182
})
@@ -188,16 +188,22 @@ func (m *Manager) loop(ctx context.Context) error {
188188
returnerr
189189
}
190190

191+
// BufferedUpdatesCount returns the number of buffered updates which are currently waiting to be flushed to the store.
192+
// The returned values are for success & failure, respectively.
193+
func (m*Manager)BufferedUpdatesCount() (successint,failureint) {
194+
returnlen(m.success),len(m.failure)
195+
}
196+
191197
// bulkUpdate updates messages in the store based on the given successful and failed message dispatch results.
192-
func (m*Manager)bulkUpdate(ctx context.Context,success,failure<-chandispatchResult) {
198+
func (m*Manager)bulkUpdate(ctx context.Context) {
193199
select {
194200
case<-ctx.Done():
195201
return
196202
default:
197203
}
198204

199-
nSuccess:=len(success)
200-
nFailure:=len(failure)
205+
nSuccess:=len(m.success)
206+
nFailure:=len(m.failure)
201207

202208
// Nothing to do.
203209
ifnSuccess+nFailure==0 {
@@ -217,12 +223,12 @@ func (m *Manager) bulkUpdate(ctx context.Context, success, failure <-chan dispat
217223
// will be processed on the next bulk update.
218224

219225
fori:=0;i<nSuccess;i++ {
220-
res:=<-success
226+
res:=<-m.success
221227
successParams.IDs=append(successParams.IDs,res.msg)
222228
successParams.SentAts=append(successParams.SentAts,res.ts)
223229
}
224230
fori:=0;i<nFailure;i++ {
225-
res:=<-failure
231+
res:=<-m.failure
226232

227233
status:=database.NotificationMessageStatusPermanentFailure
228234
ifres.retryable {

‎coderd/notifications/manager_test.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import (
55
"encoding/json"
66
"sync/atomic"
77
"testing"
8+
"time"
89

10+
"github.com/coder/serpent"
911
"github.com/google/uuid"
1012
"github.com/stretchr/testify/assert"
1113
"github.com/stretchr/testify/require"
@@ -31,11 +33,14 @@ func TestBufferedUpdates(t *testing.T) {
3133
if!dbtestutil.WillUsePostgres() {
3234
t.Skip("This test requires postgres")
3335
}
36+
3437
ctx,logger,db:=setup(t)
3538
interceptor:=&bulkUpdateInterceptor{Store:db}
36-
3739
santa:=&santaHandler{}
40+
3841
cfg:=defaultNotificationsConfig(database.NotificationMethodSmtp)
42+
cfg.StoreSyncInterval=serpent.Duration(time.Hour)// Ensure we don't sync the store automatically.
43+
3944
mgr,err:=notifications.NewManager(cfg,interceptor,logger.Named("notifications-manager"))
4045
require.NoError(t,err)
4146
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
@@ -47,20 +52,34 @@ func TestBufferedUpdates(t *testing.T) {
4752
user:=dbgen.User(t,db, database.User{})
4853

4954
// given
50-
_,err=enq.Enqueue(ctx,user.ID,notifications.TemplateWorkspaceDeleted,map[string]string{"nice":"true"},"")
55+
_,err=enq.Enqueue(ctx,user.ID,notifications.TemplateWorkspaceDeleted,map[string]string{"nice":"true"},"")// Will succeed.
5156
require.NoError(t,err)
52-
_,err=enq.Enqueue(ctx,user.ID,notifications.TemplateWorkspaceDeleted,map[string]string{"nice":"true"},"")
57+
_,err=enq.Enqueue(ctx,user.ID,notifications.TemplateWorkspaceDeleted,map[string]string{"nice":"true"},"")// Will succeed.
5358
require.NoError(t,err)
54-
_,err=enq.Enqueue(ctx,user.ID,notifications.TemplateWorkspaceDeleted,map[string]string{"nice":"false"},"")
59+
_,err=enq.Enqueue(ctx,user.ID,notifications.TemplateWorkspaceDeleted,map[string]string{"nice":"false"},"")// Will fail.
5560
require.NoError(t,err)
5661

5762
// when
5863
mgr.Run(ctx)
5964

6065
// then
6166

67+
const (
68+
expectedSuccess=2
69+
expectedFailure=1
70+
)
71+
6272
// Wait for messages to be dispatched.
63-
require.Eventually(t,func()bool {returnsanta.naughty.Load()==1&&santa.nice.Load()==2 },testutil.WaitMedium,testutil.IntervalFast)
73+
require.Eventually(t,func()bool {
74+
returnsanta.naughty.Load()==expectedFailure&&
75+
santa.nice.Load()==expectedSuccess
76+
},testutil.WaitMedium,testutil.IntervalFast)
77+
78+
// Wait for the expected number of buffered updates to be accumulated.
79+
require.Eventually(t,func()bool {
80+
success,failure:=mgr.BufferedUpdatesCount()
81+
returnsuccess==expectedSuccess&&failure==expectedFailure
82+
},testutil.WaitShort,testutil.IntervalFast)
6483

6584
// Stop the manager which forces an update of buffered updates.
6685
require.NoError(t,mgr.Stop(ctx))
@@ -73,8 +92,8 @@ func TestBufferedUpdates(t *testing.T) {
7392
ct.FailNow()
7493
}
7594

76-
assert.EqualValues(ct,1,interceptor.failed.Load())
77-
assert.EqualValues(ct,2,interceptor.sent.Load())
95+
assert.EqualValues(ct,expectedFailure,interceptor.failed.Load())
96+
assert.EqualValues(ct,expectedSuccess,interceptor.sent.Load())
7897
},testutil.WaitMedium,testutil.IntervalFast)
7998
}
8099

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp