Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit387723a

Browse files
authored
fix: close pg PubSub listener to avoid race (#11640)
Fixes flake as seen here:https://github.com/coder/coder/runs/20528529187
1 parent72d9ec0 commit387723a

File tree

2 files changed

+26
-10
lines changed

2 files changed

+26
-10
lines changed

‎coderd/database/pubsub/pubsub.go

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -162,13 +162,15 @@ func (q *msgQueue) dropped() {
162162

163163
// Pubsub implementation using PostgreSQL.
164164
typepgPubsubstruct {
165-
ctx context.Context
166-
cancel context.CancelFunc
167-
listenDonechanstruct{}
168-
pgListener*pq.Listener
169-
db*sql.DB
170-
mut sync.Mutex
171-
queuesmap[string]map[uuid.UUID]*msgQueue
165+
ctx context.Context
166+
cancel context.CancelFunc
167+
listenDonechanstruct{}
168+
pgListener*pq.Listener
169+
db*sql.DB
170+
mut sync.Mutex
171+
queuesmap[string]map[uuid.UUID]*msgQueue
172+
closedListenerbool
173+
closeListenerErrerror
172174
}
173175

174176
// BufferSize is the maximum number of unhandled messages we will buffer
@@ -240,15 +242,29 @@ func (p *pgPubsub) Publish(event string, message []byte) error {
240242
// Close closes the pubsub instance.
241243
func (p*pgPubsub)Close()error {
242244
p.cancel()
243-
err:=p.pgListener.Close()
245+
err:=p.closeListener()
244246
<-p.listenDone
245247
returnerr
246248
}
247249

250+
// closeListener closes the pgListener, unless it has already been closed.
251+
func (p*pgPubsub)closeListener()error {
252+
p.mut.Lock()
253+
deferp.mut.Unlock()
254+
ifp.closedListener {
255+
returnp.closeListenerErr
256+
}
257+
p.closeListenerErr=p.pgListener.Close()
258+
p.closedListener=true
259+
returnp.closeListenerErr
260+
}
261+
248262
// listen begins receiving messages on the pq listener.
249263
func (p*pgPubsub)listen() {
250-
deferclose(p.listenDone)
251-
deferp.pgListener.Close()
264+
deferfunc() {
265+
_=p.closeListener()
266+
close(p.listenDone)
267+
}()
252268

253269
var (
254270
notif*pq.Notification
-3 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp