Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

fix: fix Listen/Unlisten race on Pubsub#15315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
spikecurtis merged 1 commit intomainfromspike/15312-listen-unlisten
Nov 1, 2024

Conversation

spikecurtis
Copy link
Contributor

@spikecurtisspikecurtis commentedNov 1, 2024
edited
Loading

Fixes#15312

When we need toUnlisten() for an event, instead of immediately removing the event from thep.queues, we store a channel to signal any goroutines trying to Subscribe to the same event when we are done. OnSubscribe, if the channel is present, wait for it before callingListen to ensure the ordering is correct.

@spikecurtisGraphite App
Copy link
ContributorAuthor

This stack of pull requests is managed by Graphite.Learn more about stacking.

Join@spikecurtis and the rest of your teammates onGraphiteGraphite

@spikecurtisspikecurtis marked this pull request as ready for reviewNovember 1, 2024 09:16
Copy link
Member

@mafredrimafredri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Solid fix! I think down the line this code could be/become hard to reason about as others look at it or make changes, but I don't have a better idea currently that doesn't have potential performance implications.

p.qMu.Lock()
defer p.qMu.Unlock()
qSet, ok := p.queues[event]
if ok && len(qSet.m) == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

If we wanted to be really careful here we could check that the channel on qSet is still ours. It'll work without the check though, just the delete could (very) theoretically be done by another routine, which would still only be a semantic difference.

@@ -188,6 +187,19 @@ func (l pqListenerShim) NotifyChan() <-chan *pq.Notification {
return l.Notify
}

type queueSet struct {
m map[*msgQueue]struct{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Suggestion: m => q/queues. I feel this would explain better what one is looking at when viewing the code.

@spikecurtisGraphite App
Copy link
ContributorAuthor

Yeah, I'm not feeling like it's a great design, but I also couldn't come up with anything better.

2 designs I rejected:

  1. I considered having thep.queues be the source of truth with a background goroutine reconciling theListen/Unlisten, sort of like we do elsewhere in the product, but the problem is that when youSubscribe, you don't want to return until you know the pubsub is listening. So, that kind of eventually consistent model won't work.

  2. We could go back to how things were and hold thep.qMu while callingListen/Unlisten. We'd need to ensure that we keep reading from the notification channel, even if thep.qMu is locked. One way to do that is by creating an unbounded FIFO queue of notifications, so we could keep reading from the channel, even if we can't immediately process the notifications due to the lock. An unbounded queue sounds dangerous though.

@spikecurtisspikecurtisforce-pushed thespike/15312-listen-unlisten branch from79d7fab toa6ca3e1CompareNovember 1, 2024 10:16
@spikecurtisspikecurtis merged commit005ea53 intomainNov 1, 2024
27 checks passed
@spikecurtisspikecurtis deleted the spike/15312-listen-unlisten branchNovember 1, 2024 10:35
@github-actionsgithub-actionsbot locked and limited conversation to collaboratorsNov 1, 2024
@mafredri
Copy link
Member

mafredri commentedNov 1, 2024
edited
Loading

Yeah, I think you were right to reject those options, what you ended up is still better.

My idea was to have a single goroutine responsible for coordinating sub/unsub, basically anything queues related. We'd still have to solve for not doing blocking operations synchronously, though.

Could look something like this:

func (p*PGPubsub)listenLoop() {varqueuesmap[string]...// p.queues no longer needs lockingfor {r:=<-p.listenswitchreq.(type) {caselisten:if_,ok:=queues[r.event];!ok {// ~go p.pqListener.Listen(); r.err <- err}else {r.err<-nil}caseunlisten:// like listen, but unlistencasereceive:// enqueue notif}}func (p*PGPubsub)subscribeQueue(eventstring,newQ*msgQueue) (cancelfunc(),errerror) {errc:=make(chanerror,1)p.listen<-listen{"event",newQ,errc}_=<-errcreturnfunc() {p.listen<-unlisten{"event",newQ,errc}},nil}

Sign up for freeto subscribe to this conversation on GitHub. Already have an account?Sign in.
Reviewers

@mafredrimafredrimafredri approved these changes

Assignees

@spikecurtisspikecurtis

Labels
None yet
Projects
None yet
Milestone
No milestone
Development

Successfully merging this pull request may close these issues.

Listen/Unlisten race in Pubsub
2 participants
@spikecurtis@mafredri

[8]ページ先頭

©2009-2025 Movatter.jp