@@ -44,6 +44,8 @@ type Manager struct {
44
44
notifier * notifier
45
45
handlers map [database.NotificationMethod ]Handler
46
46
47
+ success ,failure chan dispatchResult
48
+
47
49
runOnce sync.Once
48
50
stopOnce sync.Once
49
51
stop chan any
@@ -67,6 +69,15 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, log slog.Logger)
67
69
cfg :cfg ,
68
70
store :store ,
69
71
72
+ // Buffer successful/failed notification dispatches in memory to reduce load on the store.
73
+ //
74
+ // We keep separate buffered for success/failure right now because the bulk updates are already a bit janky,
75
+ // see BulkMarkNotificationMessagesSent/BulkMarkNotificationMessagesFailed. If we had the ability to batch updates,
76
+ // like is offered in https://docs.sqlc.dev/en/stable/reference/query-annotations.html#batchmany, we'd have a cleaner
77
+ // approach to this - but for now this will work fine.
78
+ success :make (chan dispatchResult ,cfg .StoreSyncBufferSize ),
79
+ failure :make (chan dispatchResult ,cfg .StoreSyncBufferSize ),
80
+
70
81
stop :make (chan any ),
71
82
done :make (chan any ),
72
83
@@ -123,23 +134,12 @@ func (m *Manager) loop(ctx context.Context) error {
123
134
default :
124
135
}
125
136
126
- var (
127
- // Buffer successful/failed notification dispatches in memory to reduce load on the store.
128
- //
129
- // We keep separate buffered for success/failure right now because the bulk updates are already a bit janky,
130
- // see BulkMarkNotificationMessagesSent/BulkMarkNotificationMessagesFailed. If we had the ability to batch updates,
131
- // like is offered in https://docs.sqlc.dev/en/stable/reference/query-annotations.html#batchmany, we'd have a cleaner
132
- // approach to this - but for now this will work fine.
133
- success = make (chan dispatchResult ,m .cfg .StoreSyncBufferSize )
134
- failure = make (chan dispatchResult ,m .cfg .StoreSyncBufferSize )
135
- )
136
-
137
137
var eg errgroup.Group
138
138
139
139
// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
140
140
m .notifier = newNotifier (m .cfg ,uuid .New (),m .log ,m .store ,m .handlers )
141
141
eg .Go (func ()error {
142
- return m .notifier .run (ctx ,success ,failure )
142
+ return m .notifier .run (ctx ,m . success ,m . failure )
143
143
})
144
144
145
145
// Periodically flush notification state changes to the store.
@@ -162,21 +162,21 @@ func (m *Manager) loop(ctx context.Context) error {
162
162
// TODO: mention the above tradeoff in documentation.
163
163
m .log .Warn (ctx ,"exiting ungracefully" ,slog .Error (ctx .Err ()))
164
164
165
- if len (success )+ len (failure )> 0 {
165
+ if len (m . success )+ len (m . failure )> 0 {
166
166
m .log .Warn (ctx ,"content canceled with pending updates in buffer, these messages will be sent again after lease expires" ,
167
- slog .F ("success_count" ,len (success )),slog .F ("failure_count" ,len (failure )))
167
+ slog .F ("success_count" ,len (m . success )),slog .F ("failure_count" ,len (m . failure )))
168
168
}
169
169
return ctx .Err ()
170
170
case <- m .stop :
171
- if len (success )+ len (failure )> 0 {
171
+ if len (m . success )+ len (m . failure )> 0 {
172
172
m .log .Warn (ctx ,"flushing buffered updates before stop" ,
173
- slog .F ("success_count" ,len (success )),slog .F ("failure_count" ,len (failure )))
174
- m .bulkUpdate (ctx , success , failure )
173
+ slog .F ("success_count" ,len (m . success )),slog .F ("failure_count" ,len (m . failure )))
174
+ m .bulkUpdate (ctx )
175
175
m .log .Warn (ctx ,"flushing updates done" )
176
176
}
177
177
return nil
178
178
case <- tick .C :
179
- m .bulkUpdate (ctx , success , failure )
179
+ m .bulkUpdate (ctx )
180
180
}
181
181
}
182
182
})
@@ -188,16 +188,22 @@ func (m *Manager) loop(ctx context.Context) error {
188
188
return err
189
189
}
190
190
191
+ // BufferedUpdatesCount returns the number of buffered updates which are currently waiting to be flushed to the store.
192
+ // The returned values are for success & failure, respectively.
193
+ func (m * Manager )BufferedUpdatesCount () (success int ,failure int ) {
194
+ return len (m .success ),len (m .failure )
195
+ }
196
+
191
197
// bulkUpdate updates messages in the store based on the given successful and failed message dispatch results.
192
- func (m * Manager )bulkUpdate (ctx context.Context , success , failure <- chan dispatchResult ) {
198
+ func (m * Manager )bulkUpdate (ctx context.Context ) {
193
199
select {
194
200
case <- ctx .Done ():
195
201
return
196
202
default :
197
203
}
198
204
199
- nSuccess := len (success )
200
- nFailure := len (failure )
205
+ nSuccess := len (m . success )
206
+ nFailure := len (m . failure )
201
207
202
208
// Nothing to do.
203
209
if nSuccess + nFailure == 0 {
@@ -217,12 +223,12 @@ func (m *Manager) bulkUpdate(ctx context.Context, success, failure <-chan dispat
217
223
// will be processed on the next bulk update.
218
224
219
225
for i := 0 ;i < nSuccess ;i ++ {
220
- res := <- success
226
+ res := <- m . success
221
227
successParams .IDs = append (successParams .IDs ,res .msg )
222
228
successParams .SentAts = append (successParams .SentAts ,res .ts )
223
229
}
224
230
for i := 0 ;i < nFailure ;i ++ {
225
- res := <- failure
231
+ res := <- m . failure
226
232
227
233
status := database .NotificationMessageStatusPermanentFailure
228
234
if res .retryable {