@@ -11,7 +11,6 @@ import (
1111"sync/atomic"
1212"time"
1313
14- "github.com/google/uuid"
1514"github.com/lib/pq"
1615"github.com/prometheus/client_golang/prometheus"
1716"golang.org/x/xerrors"
@@ -188,6 +187,19 @@ func (l pqListenerShim) NotifyChan() <-chan *pq.Notification {
188187return l .Notify
189188}
190189
190+ type queueSet struct {
191+ m map [* msgQueue ]struct {}
192+ // unlistenInProgress will be non-nil if another goroutine is unlistening for the event this
193+ // queueSet corresponds to. If non-nil, that goroutine will close the channel when it is done.
194+ unlistenInProgress chan struct {}
195+ }
196+
197+ func newQueueSet ()* queueSet {
198+ return & queueSet {
199+ m :make (map [* msgQueue ]struct {}),
200+ }
201+ }
202+
191203// PGPubsub is a pubsub implementation using PostgreSQL.
192204type PGPubsub struct {
193205logger slog.Logger
@@ -196,7 +208,7 @@ type PGPubsub struct {
196208db * sql.DB
197209
198210qMu sync.Mutex
199- queues map [string ]map [uuid. UUID ] * msgQueue
211+ queues map [string ]* queueSet
200212
201213// making the close state its own mutex domain simplifies closing logic so
202214// that we don't have to hold the qMu --- which could block processing
@@ -243,6 +255,48 @@ func (p *PGPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
243255}
244256}()
245257
258+ var (
259+ unlistenInProgress <- chan struct {}
260+ // MUST hold the p.qMu lock to manipulate this!
261+ qs * queueSet
262+ )
263+ func () {
264+ p .qMu .Lock ()
265+ defer p .qMu .Unlock ()
266+
267+ var ok bool
268+ if qs ,ok = p .queues [event ];! ok {
269+ qs = newQueueSet ()
270+ p .queues [event ]= qs
271+ }
272+ qs .m [newQ ]= struct {}{}
273+ unlistenInProgress = qs .unlistenInProgress
274+ }()
275+ // NOTE there cannot be any `return` statements between here and the next +-+, otherwise the
276+ // assumptions the defer makes could be violated
277+ if unlistenInProgress != nil {
278+ // We have to wait here because we don't want our `Listen` call to happen before the other
279+ // goroutine calls `Unlisten`. That would result in this subscription not getting any
280+ // events. c.f. https://github.com/coder/coder/issues/15312
281+ p .logger .Debug (context .Background (),"waiting for Unlisten in progress" ,slog .F ("event" ,event ))
282+ <- unlistenInProgress
283+ p .logger .Debug (context .Background (),"unlistening complete" ,slog .F ("event" ,event ))
284+ }
285+ // +-+ (see above)
286+ defer func () {
287+ if err != nil {
288+ p .qMu .Lock ()
289+ defer p .qMu .Unlock ()
290+ delete (qs .m ,newQ )
291+ if len (qs .m )== 0 {
292+ // we know that newQ was in the queueSet since we last unlocked, so there cannot
293+ // have been any _new_ goroutines trying to Unlisten(). Therefore, if the queueSet
294+ // is now empty, it's safe to delete.
295+ delete (p .queues ,event )
296+ }
297+ }
298+ }()
299+
246300// The pgListener waits for the response to `LISTEN` on a mainloop that also dispatches
247301// notifies. We need to avoid holding the mutex while this happens, since holding the mutex
248302// blocks reading notifications and can deadlock the pgListener.
@@ -258,32 +312,40 @@ func (p *PGPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
258312if err != nil {
259313return nil ,xerrors .Errorf ("listen: %w" ,err )
260314}
261- p .qMu .Lock ()
262- defer p .qMu .Unlock ()
263315
264- var eventQs map [uuid.UUID ]* msgQueue
265- var ok bool
266- if eventQs ,ok = p .queues [event ];! ok {
267- eventQs = make (map [uuid.UUID ]* msgQueue )
268- p .queues [event ]= eventQs
269- }
270- id := uuid .New ()
271- eventQs [id ]= newQ
272316return func () {
273- p .qMu .Lock ()
274- listeners := p .queues [event ]
275- q := listeners [id ]
276- q .close ()
277- delete (listeners ,id )
278- if len (listeners )== 0 {
279- delete (p .queues ,event )
280- }
281- listenerCount := len (listeners )
282- p .qMu .Unlock ()
283- // as above, we must not hold the lock while calling into pgListener
317+ var unlistening chan struct {}
318+ func () {
319+ p .qMu .Lock ()
320+ defer p .qMu .Unlock ()
321+ newQ .close ()
322+ qSet ,ok := p .queues [event ]
323+ if ! ok {
324+ p .logger .Critical (context .Background (),"event was removed before cancel" ,slog .F ("event" ,event ))
325+ return
326+ }
327+ delete (qSet .m ,newQ )
328+ if len (qSet .m )== 0 {
329+ unlistening = make (chan struct {})
330+ qSet .unlistenInProgress = unlistening
331+ }
332+ }()
284333
285- if listenerCount == 0 {
334+ // as above, we must not hold the lock while calling into pgListener
335+ if unlistening != nil {
286336uErr := p .pgListener .Unlisten (event )
337+ close (unlistening )
338+ // we can now delete the queueSet if it is empty.
339+ func () {
340+ p .qMu .Lock ()
341+ defer p .qMu .Unlock ()
342+ qSet ,ok := p .queues [event ]
343+ if ok && len (qSet .m )== 0 {
344+ p .logger .Debug (context .Background (),"removing queueSet" ,slog .F ("event" ,event ))
345+ delete (p .queues ,event )
346+ }
347+ }()
348+
287349p .closeMu .Lock ()
288350defer p .closeMu .Unlock ()
289351if uErr != nil && ! p .closedListener {
@@ -361,21 +423,21 @@ func (p *PGPubsub) listenReceive(notif *pq.Notification) {
361423
362424p .qMu .Lock ()
363425defer p .qMu .Unlock ()
364- queues ,ok := p .queues [notif .Channel ]
426+ qSet ,ok := p .queues [notif .Channel ]
365427if ! ok {
366428return
367429}
368430extra := []byte (notif .Extra )
369- for _ , q := range queues {
431+ for q := range qSet . m {
370432q .enqueue (extra )
371433}
372434}
373435
374436func (p * PGPubsub )recordReconnect () {
375437p .qMu .Lock ()
376438defer p .qMu .Unlock ()
377- for _ ,listeners := range p .queues {
378- for _ , q := range listeners {
439+ for _ ,qSet := range p .queues {
440+ for q := range qSet . m {
379441q .dropped ()
380442}
381443}
@@ -590,8 +652,8 @@ func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
590652p .qMu .Lock ()
591653events := len (p .queues )
592654subs := 0
593- for _ ,subscriberMap := range p .queues {
594- subs += len (subscriberMap )
655+ for _ ,qSet := range p .queues {
656+ subs += len (qSet . m )
595657}
596658p .qMu .Unlock ()
597659metrics <- prometheus .MustNewConstMetric (currentSubscribersDesc ,prometheus .GaugeValue ,float64 (subs ))
@@ -629,7 +691,7 @@ func newWithoutListener(logger slog.Logger, db *sql.DB) *PGPubsub {
629691logger :logger ,
630692listenDone :make (chan struct {}),
631693db :db ,
632- queues :make (map [string ]map [uuid. UUID ] * msgQueue ),
694+ queues :make (map [string ]* queueSet ),
633695latencyMeasurer :NewLatencyMeasurer (logger .Named ("latency-measurer" )),
634696
635697publishesTotal :prometheus .NewCounterVec (prometheus.CounterOpts {