Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

fix: fix pgcoord to delete coordinator row last#12155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
spikecurtis merged 1 commit intomainfromspike/12141-flake-write-binding
Feb 15, 2024
Merged
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletionenterprise/tailnet/pgcoord.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -129,6 +129,10 @@ func newPGCoordInternal(
// signals when first heartbeat has been sent, so it's safe to start binding.
fHB := make(chan struct{})

// we need to arrange for the querier to stop _after_ the tunneler and binder, since we delete
// the coordinator when the querier stops (via the heartbeats). If the tunneler and binder are
// still running, they could run afoul of foreign key constraints.
querierCtx, querierCancel := context.WithCancel(dbauthz.As(context.Background(), pgCoordSubject))
c := &pgCoord{
ctx: ctx,
cancel: cancel,
Expand All@@ -142,9 +146,17 @@ func newPGCoordInternal(
tunneler: newTunneler(ctx, logger, id, store, sCh, fHB),
tunnelerCh: sCh,
id: id,
querier: newQuerier(ctx, logger, id, ps, store, id, cCh, ccCh, numQuerierWorkers, fHB),
querier: newQuerier(querierCtx, logger, id, ps, store, id, cCh, ccCh, numQuerierWorkers, fHB),
closed: make(chan struct{}),
}
go func() {
// when the main context is canceled, or the coordinator closed, the binder and tunneler
// always eventually stop. Once they stop it's safe to cancel the querier context, which
// has the effect of deleting the coordinator from the database and ceasing heartbeats.
c.binder.workerWG.Wait()
c.tunneler.workerWG.Wait()
querierCancel()
}()
logger.Info(ctx, "starting coordinator")
return c, nil
}
Expand DownExpand Up@@ -255,6 +267,8 @@ type tunneler struct {
mu sync.Mutex
latest map[uuid.UUID]map[uuid.UUID]tunnel
workQ *workQ[tKey]

workerWG sync.WaitGroup
}

func newTunneler(ctx context.Context,
Expand All@@ -274,6 +288,9 @@ func newTunneler(ctx context.Context,
workQ: newWorkQ[tKey](ctx),
}
go s.handle()
// add to the waitgroup immediately to avoid any races waiting for it before
// the workers start.
s.workerWG.Add(numTunnelerWorkers)
go func() {
<-startWorkers
for i := 0; i < numTunnelerWorkers; i++ {
Expand All@@ -297,6 +314,7 @@ func (t *tunneler) handle() {
}

func (t *tunneler) worker() {
defer t.workerWG.Done()
eb := backoff.NewExponentialBackOff()
eb.MaxElapsedTime = 0 // retry indefinitely
eb.MaxInterval = dbMaxBackoff
Expand DownExpand Up@@ -435,6 +453,8 @@ type binder struct {
mu sync.Mutex
latest map[bKey]binding
workQ *workQ[bKey]

workerWG sync.WaitGroup
}

func newBinder(ctx context.Context,
Expand All@@ -454,6 +474,9 @@ func newBinder(ctx context.Context,
workQ: newWorkQ[bKey](ctx),
}
go b.handleBindings()
// add to the waitgroup immediately to avoid any races waiting for it before
// the workers start.
b.workerWG.Add(numBinderWorkers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Is there a chance that<-startWorkers below (i.e.fHB) doesn't get closed (e.g. some error during startup), and thus, these waitgroups never resolving?

(I didn't try to dig in as to how or wherefHB is closed as it's not obvious from this PR.)

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

It gets closed unconditionally after we send the first heartbeat (success or fail).

go func() {
<-startWorkers
for i := 0; i < numBinderWorkers; i++ {
Expand All@@ -477,6 +500,7 @@ func (b *binder) handleBindings() {
}

func (b *binder) worker() {
defer b.workerWG.Done()
eb := backoff.NewExponentialBackOff()
eb.MaxElapsedTime = 0 // retry indefinitely
eb.MaxInterval = dbMaxBackoff
Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp