- Notifications
You must be signed in to change notification settings - Fork927
fix: fix Listen/Unlisten race on Pubsub#15315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Conversation
This stack of pull requests is managed by Graphite.Learn more about stacking. Join@spikecurtis and the rest of your teammates on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Solid fix! I think down the line this code could be/become hard to reason about as others look at it or make changes, but I don't have a better idea currently that doesn't have potential performance implications.
p.qMu.Lock() | ||
defer p.qMu.Unlock() | ||
qSet, ok := p.queues[event] | ||
if ok && len(qSet.m) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
If we wanted to be really careful here we could check that the channel on qSet is still ours. It'll work without the check though, just the delete could (very) theoretically be done by another routine, which would still only be a semantic difference.
@@ -188,6 +187,19 @@ func (l pqListenerShim) NotifyChan() <-chan *pq.Notification { | |||
return l.Notify | |||
} | |||
type queueSet struct { | |||
m map[*msgQueue]struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Suggestion: m => q/queues. I feel this would explain better what one is looking at when viewing the code.
Yeah, I'm not feeling like it's a great design, but I also couldn't come up with anything better. 2 designs I rejected:
|
79d7fab
toa6ca3e1
Compare005ea53
intomainUh oh!
There was an error while loading.Please reload this page.
mafredri commentedNov 1, 2024 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
Yeah, I think you were right to reject those options, what you ended up is still better. My idea was to have a single goroutine responsible for coordinating sub/unsub, basically anything queues related. We'd still have to solve for not doing blocking operations synchronously, though. Could look something like this: func (p*PGPubsub)listenLoop() {varqueuesmap[string]...// p.queues no longer needs lockingfor {r:=<-p.listenswitchreq.(type) {caselisten:if_,ok:=queues[r.event];!ok {// ~go p.pqListener.Listen(); r.err <- err}else {r.err<-nil}caseunlisten:// like listen, but unlistencasereceive:// enqueue notif}}func (p*PGPubsub)subscribeQueue(eventstring,newQ*msgQueue) (cancelfunc(),errerror) {errc:=make(chanerror,1)p.listen<-listen{"event",newQ,errc}_=<-errcreturnfunc() {p.listen<-unlisten{"event",newQ,errc}},nil} |
Uh oh!
There was an error while loading.Please reload this page.
Fixes#15312
When we need to
Unlisten()
for an event, instead of immediately removing the event from thep.queues
, we store a channel to signal any goroutines trying to Subscribe to the same event when we are done. OnSubscribe
, if the channel is present, wait for it before callingListen
to ensure the ordering is correct.