@@ -129,6 +129,10 @@ func newPGCoordInternal(
129
129
// signals when first heartbeat has been sent, so it's safe to start binding.
130
130
fHB := make (chan struct {})
131
131
132
+ // we need to arrange for the querier to stop _after_ the tunneler and binder, since we delete
133
+ // the coordinator when the querier stops (via the heartbeats). If the tunneler and binder are
134
+ // still running, they could run afoul of foreign key constraints.
135
+ querierCtx ,querierCancel := context .WithCancel (dbauthz .As (context .Background (),pgCoordSubject ))
132
136
c := & pgCoord {
133
137
ctx :ctx ,
134
138
cancel :cancel ,
@@ -142,9 +146,17 @@ func newPGCoordInternal(
142
146
tunneler :newTunneler (ctx ,logger ,id ,store ,sCh ,fHB ),
143
147
tunnelerCh :sCh ,
144
148
id :id ,
145
- querier :newQuerier (ctx ,logger ,id ,ps ,store ,id ,cCh ,ccCh ,numQuerierWorkers ,fHB ),
149
+ querier :newQuerier (querierCtx ,logger ,id ,ps ,store ,id ,cCh ,ccCh ,numQuerierWorkers ,fHB ),
146
150
closed :make (chan struct {}),
147
151
}
152
+ go func () {
153
+ // when the main context is canceled, or the coordinator closed, the binder and tunneler
154
+ // always eventually stop. Once they stop it's safe to cancel the querier context, which
155
+ // has the effect of deleting the coordinator from the database and ceasing heartbeats.
156
+ c .binder .workerWG .Wait ()
157
+ c .tunneler .workerWG .Wait ()
158
+ querierCancel ()
159
+ }()
148
160
logger .Info (ctx ,"starting coordinator" )
149
161
return c ,nil
150
162
}
@@ -255,6 +267,8 @@ type tunneler struct {
255
267
mu sync.Mutex
256
268
latest map [uuid.UUID ]map [uuid.UUID ]tunnel
257
269
workQ * workQ [tKey ]
270
+
271
+ workerWG sync.WaitGroup
258
272
}
259
273
260
274
func newTunneler (ctx context.Context ,
@@ -274,6 +288,9 @@ func newTunneler(ctx context.Context,
274
288
workQ :newWorkQ [tKey ](ctx ),
275
289
}
276
290
go s .handle ()
291
+ // add to the waitgroup immediately to avoid any races waiting for it before
292
+ // the workers start.
293
+ s .workerWG .Add (numTunnelerWorkers )
277
294
go func () {
278
295
<- startWorkers
279
296
for i := 0 ;i < numTunnelerWorkers ;i ++ {
@@ -297,6 +314,7 @@ func (t *tunneler) handle() {
297
314
}
298
315
299
316
func (t * tunneler )worker () {
317
+ defer t .workerWG .Done ()
300
318
eb := backoff .NewExponentialBackOff ()
301
319
eb .MaxElapsedTime = 0 // retry indefinitely
302
320
eb .MaxInterval = dbMaxBackoff
@@ -435,6 +453,8 @@ type binder struct {
435
453
mu sync.Mutex
436
454
latest map [bKey ]binding
437
455
workQ * workQ [bKey ]
456
+
457
+ workerWG sync.WaitGroup
438
458
}
439
459
440
460
func newBinder (ctx context.Context ,
@@ -454,6 +474,9 @@ func newBinder(ctx context.Context,
454
474
workQ :newWorkQ [bKey ](ctx ),
455
475
}
456
476
go b .handleBindings ()
477
+ // add to the waitgroup immediately to avoid any races waiting for it before
478
+ // the workers start.
479
+ b .workerWG .Add (numBinderWorkers )
457
480
go func () {
458
481
<- startWorkers
459
482
for i := 0 ;i < numBinderWorkers ;i ++ {
@@ -477,6 +500,7 @@ func (b *binder) handleBindings() {
477
500
}
478
501
479
502
func (b * binder )worker () {
503
+ defer b .workerWG .Done ()
480
504
eb := backoff .NewExponentialBackOff ()
481
505
eb .MaxElapsedTime = 0 // retry indefinitely
482
506
eb .MaxInterval = dbMaxBackoff