Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit79b2e92

Browse files
committed
fix: stop holding Pubsub mutex while calling pq.Listener
1 parentbae0a74 commit79b2e92

File tree

3 files changed

+221
-139
lines changed

3 files changed

+221
-139
lines changed

‎coderd/database/pubsub/pubsub.go

Lines changed: 104 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"database/sql"
66
"errors"
7+
"io"
78
"net"
89
"sync"
910
"time"
@@ -164,16 +165,36 @@ func (q *msgQueue) dropped() {
164165
q.cond.Broadcast()
165166
}
166167

168+
// pqListener is an interface that represents a *pq.Listener for testing
169+
typepqListenerinterface {
170+
io.Closer
171+
Listen(string)error
172+
Unlisten(string)error
173+
NotifyChan()<-chan*pq.Notification
174+
}
175+
176+
typepqListenerShimstruct {
177+
*pq.Listener
178+
}
179+
180+
func (lpqListenerShim)NotifyChan()<-chan*pq.Notification {
181+
returnl.Notify
182+
}
183+
167184
// PGPubsub is a pubsub implementation using PostgreSQL.
168185
typePGPubsubstruct {
169-
ctx context.Context
170-
cancel context.CancelFunc
171-
logger slog.Logger
172-
listenDonechanstruct{}
173-
pgListener*pq.Listener
174-
db*sql.DB
175-
mut sync.Mutex
176-
queuesmap[string]map[uuid.UUID]*msgQueue
186+
logger slog.Logger
187+
listenDonechanstruct{}
188+
pgListenerpqListener
189+
db*sql.DB
190+
191+
qMu sync.Mutex
192+
queuesmap[string]map[uuid.UUID]*msgQueue
193+
194+
// making the close state its own mutex domain simplifies closing logic so
195+
// that we don't have to hold the qMu --- which could block processing
196+
// notifications while the pqListener is closing.
197+
closeMu sync.Mutex
177198
closedListenerbool
178199
closeListenerErrerror
179200

@@ -192,16 +213,14 @@ const BufferSize = 2048
192213

193214
// Subscribe calls the listener when an event matching the name is received.
194215
func (p*PGPubsub)Subscribe(eventstring,listenerListener) (cancelfunc(),errerror) {
195-
returnp.subscribeQueue(event,newMsgQueue(p.ctx,listener,nil))
216+
returnp.subscribeQueue(event,newMsgQueue(context.Background(),listener,nil))
196217
}
197218

198219
func (p*PGPubsub)SubscribeWithErr(eventstring,listenerListenerWithErr) (cancelfunc(),errerror) {
199-
returnp.subscribeQueue(event,newMsgQueue(p.ctx,nil,listener))
220+
returnp.subscribeQueue(event,newMsgQueue(context.Background(),nil,listener))
200221
}
201222

202223
func (p*PGPubsub)subscribeQueue(eventstring,newQ*msgQueue) (cancelfunc(),errerror) {
203-
p.mut.Lock()
204-
deferp.mut.Unlock()
205224
deferfunc() {
206225
iferr!=nil {
207226
// if we hit an error, we need to close the queue so we don't
@@ -213,9 +232,13 @@ func (p *PGPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
213232
}
214233
}()
215234

235+
// The pgListener waits for the response to `LISTEN` on a mainloop that also dispatches
236+
// notifies. We need to avoid holding the mutex while this happens, since holding the mutex
237+
// blocks reading notifications and can deadlock the pgListener.
238+
// c.f. https://github.com/coder/coder/issues/11950
216239
err=p.pgListener.Listen(event)
217240
iferr==nil {
218-
p.logger.Debug(p.ctx,"started listening to event channel",slog.F("event",event))
241+
p.logger.Debug(context.Background(),"started listening to event channel",slog.F("event",event))
219242
}
220243
iferrors.Is(err,pq.ErrChannelAlreadyOpen) {
221244
// It's ok if it's already open!
@@ -224,6 +247,8 @@ func (p *PGPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
224247
iferr!=nil {
225248
returnnil,xerrors.Errorf("listen: %w",err)
226249
}
250+
p.qMu.Lock()
251+
deferp.qMu.Unlock()
227252

228253
vareventQsmap[uuid.UUID]*msgQueue
229254
varokbool
@@ -234,30 +259,36 @@ func (p *PGPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
234259
id:=uuid.New()
235260
eventQs[id]=newQ
236261
returnfunc() {
237-
p.mut.Lock()
238-
deferp.mut.Unlock()
262+
p.qMu.Lock()
239263
listeners:=p.queues[event]
240264
q:=listeners[id]
241265
q.close()
242266
delete(listeners,id)
267+
iflen(listeners)==0 {
268+
delete(p.queues,event)
269+
}
270+
p.qMu.Unlock()
271+
// as above, we must not hold the lock while calling into pgListener
243272

244273
iflen(listeners)==0 {
245274
uErr:=p.pgListener.Unlisten(event)
275+
p.closeMu.Lock()
276+
deferp.closeMu.Unlock()
246277
ifuErr!=nil&&!p.closedListener {
247-
p.logger.Warn(p.ctx,"failed to unlisten",slog.Error(uErr),slog.F("event",event))
278+
p.logger.Warn(context.Background(),"failed to unlisten",slog.Error(uErr),slog.F("event",event))
248279
}else {
249-
p.logger.Debug(p.ctx,"stopped listening to event channel",slog.F("event",event))
280+
p.logger.Debug(context.Background(),"stopped listening to event channel",slog.F("event",event))
250281
}
251282
}
252283
},nil
253284
}
254285

255286
func (p*PGPubsub)Publish(eventstring,message []byte)error {
256-
p.logger.Debug(p.ctx,"publish",slog.F("event",event),slog.F("message_len",len(message)))
287+
p.logger.Debug(context.Background(),"publish",slog.F("event",event),slog.F("message_len",len(message)))
257288
// This is safe because we are calling pq.QuoteLiteral. pg_notify doesn't
258289
// support the first parameter being a prepared statement.
259290
//nolint:gosec
260-
_,err:=p.db.ExecContext(p.ctx,`select pg_notify(`+pq.QuoteLiteral(event)+`, $1)`,message)
291+
_,err:=p.db.ExecContext(context.Background(),`select pg_notify(`+pq.QuoteLiteral(event)+`, $1)`,message)
261292
iferr!=nil {
262293
p.publishesTotal.WithLabelValues("false").Inc()
263294
returnxerrors.Errorf("exec pg_notify: %w",err)
@@ -269,53 +300,38 @@ func (p *PGPubsub) Publish(event string, message []byte) error {
269300

270301
// Close closes the pubsub instance.
271302
func (p*PGPubsub)Close()error {
272-
p.logger.Info(p.ctx,"pubsub is closing")
273-
p.cancel()
303+
p.logger.Info(context.Background(),"pubsub is closing")
274304
err:=p.closeListener()
275305
<-p.listenDone
276-
p.logger.Debug(p.ctx,"pubsub closed")
306+
p.logger.Debug(context.Background(),"pubsub closed")
277307
returnerr
278308
}
279309

280310
// closeListener closes the pgListener, unless it has already been closed.
281311
func (p*PGPubsub)closeListener()error {
282-
p.mut.Lock()
283-
deferp.mut.Unlock()
312+
p.closeMu.Lock()
313+
deferp.closeMu.Unlock()
284314
ifp.closedListener {
285315
returnp.closeListenerErr
286316
}
287-
p.closeListenerErr=p.pgListener.Close()
288317
p.closedListener=true
318+
p.closeListenerErr=p.pgListener.Close()
319+
289320
returnp.closeListenerErr
290321
}
291322

292323
// listen begins receiving messages on the pq listener.
293324
func (p*PGPubsub)listen() {
294325
deferfunc() {
295-
p.logger.Info(p.ctx,"pubsub listen stopped receiving notify")
296-
cErr:=p.closeListener()
297-
ifcErr!=nil {
298-
p.logger.Error(p.ctx,"failed to close listener")
299-
}
326+
p.logger.Info(context.Background(),"pubsub listen stopped receiving notify")
300327
close(p.listenDone)
301328
}()
302329

303-
var (
304-
notif*pq.Notification
305-
okbool
306-
)
307-
for {
308-
select {
309-
case<-p.ctx.Done():
310-
return
311-
casenotif,ok=<-p.pgListener.Notify:
312-
if!ok {
313-
return
314-
}
315-
}
330+
notify:=p.pgListener.NotifyChan()
331+
fornotif:=rangenotify {
316332
// A nil notification can be dispatched on reconnect.
317333
ifnotif==nil {
318-
p.logger.Debug(p.ctx,"notifying subscribers of a reconnection")
334+
p.logger.Debug(context.Background(),"notifying subscribers of a reconnection")
319335
p.recordReconnect()
320336
continue
321337
}
@@ -331,8 +347,8 @@ func (p *PGPubsub) listenReceive(notif *pq.Notification) {
331347
p.messagesTotal.WithLabelValues(sizeLabel).Inc()
332348
p.receivedBytesTotal.Add(float64(len(notif.Extra)))
333349

334-
p.mut.Lock()
335-
deferp.mut.Unlock()
350+
p.qMu.Lock()
351+
deferp.qMu.Unlock()
336352
queues,ok:=p.queues[notif.Channel]
337353
if!ok {
338354
return
@@ -344,8 +360,8 @@ func (p *PGPubsub) listenReceive(notif *pq.Notification) {
344360
}
345361

346362
func (p*PGPubsub)recordReconnect() {
347-
p.mut.Lock()
348-
deferp.mut.Unlock()
363+
p.qMu.Lock()
364+
deferp.qMu.Unlock()
349365
for_,listeners:=rangep.queues {
350366
for_,q:=rangelisteners {
351367
q.dropped()
@@ -409,30 +425,32 @@ func (p *PGPubsub) startListener(ctx context.Context, connectURL string) error {
409425
d: net.Dialer{},
410426
}
411427
)
412-
p.pgListener=pq.NewDialListener(dialer,connectURL,time.Second,time.Minute,func(t pq.ListenerEventType,errerror) {
413-
switcht {
414-
casepq.ListenerEventConnected:
415-
p.logger.Info(ctx,"pubsub connected to postgres")
416-
p.connected.Set(1.0)
417-
casepq.ListenerEventDisconnected:
418-
p.logger.Error(ctx,"pubsub disconnected from postgres",slog.Error(err))
419-
p.connected.Set(0)
420-
casepq.ListenerEventReconnected:
421-
p.logger.Info(ctx,"pubsub reconnected to postgres")
422-
p.connected.Set(1)
423-
casepq.ListenerEventConnectionAttemptFailed:
424-
p.logger.Error(ctx,"pubsub failed to connect to postgres",slog.Error(err))
425-
}
426-
// This callback gets events whenever the connection state changes.
427-
// Don't send if the errChannel has already been closed.
428-
select {
429-
case<-errCh:
430-
return
431-
default:
432-
errCh<-err
433-
close(errCh)
434-
}
435-
})
428+
p.pgListener=pqListenerShim{
429+
Listener:pq.NewDialListener(dialer,connectURL,time.Second,time.Minute,func(t pq.ListenerEventType,errerror) {
430+
switcht {
431+
casepq.ListenerEventConnected:
432+
p.logger.Info(ctx,"pubsub connected to postgres")
433+
p.connected.Set(1.0)
434+
casepq.ListenerEventDisconnected:
435+
p.logger.Error(ctx,"pubsub disconnected from postgres",slog.Error(err))
436+
p.connected.Set(0)
437+
casepq.ListenerEventReconnected:
438+
p.logger.Info(ctx,"pubsub reconnected to postgres")
439+
p.connected.Set(1)
440+
casepq.ListenerEventConnectionAttemptFailed:
441+
p.logger.Error(ctx,"pubsub failed to connect to postgres",slog.Error(err))
442+
}
443+
// This callback gets events whenever the connection state changes.
444+
// Don't send if the errChannel has already been closed.
445+
select {
446+
case<-errCh:
447+
return
448+
default:
449+
errCh<-err
450+
close(errCh)
451+
}
452+
}),
453+
}
436454
select {
437455
caseerr:=<-errCh:
438456
iferr!=nil {
@@ -501,24 +519,31 @@ func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
501519
p.connected.Collect(metrics)
502520

503521
// implicit metrics
504-
p.mut.Lock()
522+
p.qMu.Lock()
505523
events:=len(p.queues)
506524
subs:=0
507525
for_,subscriberMap:=rangep.queues {
508526
subs+=len(subscriberMap)
509527
}
510-
p.mut.Unlock()
528+
p.qMu.Unlock()
511529
metrics<-prometheus.MustNewConstMetric(currentSubscribersDesc,prometheus.GaugeValue,float64(subs))
512530
metrics<-prometheus.MustNewConstMetric(currentEventsDesc,prometheus.GaugeValue,float64(events))
513531
}
514532

515533
// New creates a new Pubsub implementation using a PostgreSQL connection.
516534
funcNew(startCtx context.Context,logger slog.Logger,database*sql.DB,connectURLstring) (*PGPubsub,error) {
517-
// Start a new context that will be canceled when the pubsub is closed.
518-
ctx,cancel:=context.WithCancel(context.Background())
519-
p:=&PGPubsub{
520-
ctx:ctx,
521-
cancel:cancel,
535+
p:=newWithoutListener(logger,database)
536+
iferr:=p.startListener(startCtx,connectURL);err!=nil {
537+
returnnil,err
538+
}
539+
gop.listen()
540+
logger.Info(startCtx,"pubsub has started")
541+
returnp,nil
542+
}
543+
544+
// newWithoutListener creates a new PGPubsub without creating the pqListener.
545+
funcnewWithoutListener(logger slog.Logger,database*sql.DB)*PGPubsub {
546+
return&PGPubsub{
522547
logger:logger,
523548
listenDone:make(chanstruct{}),
524549
db:database,
@@ -567,10 +592,4 @@ func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connect
567592
Help:"Whether we are connected (1) or not connected (0) to postgres",
568593
}),
569594
}
570-
iferr:=p.startListener(startCtx,connectURL);err!=nil {
571-
returnnil,err
572-
}
573-
gop.listen()
574-
logger.Info(ctx,"pubsub has started")
575-
returnp,nil
576595
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp