Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4fc0479

Browse files
authored
fix: avoid deleting peers on graceful close (#14165)
* fix: avoid deleting peers on graceful close- Fixes an issue where a coordinator deletes all its peers on shutdown. This can cause disconnects whenever a coderd is redeployed.
1 parent6f1951e commit4fc0479

File tree

13 files changed

+328
-102
lines changed

13 files changed

+328
-102
lines changed

‎coderd/database/dbauthz/dbauthz.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3324,6 +3324,13 @@ func (q *querier) UpdateReplica(ctx context.Context, arg database.UpdateReplicaP
33243324
returnq.db.UpdateReplica(ctx,arg)
33253325
}
33263326

3327+
func (q*querier)UpdateTailnetPeerStatusByCoordinator(ctx context.Context,arg database.UpdateTailnetPeerStatusByCoordinatorParams)error {
3328+
iferr:=q.authorizeContext(ctx,policy.ActionUpdate,rbac.ResourceTailnetCoordinator);err!=nil {
3329+
returnerr
3330+
}
3331+
returnq.db.UpdateTailnetPeerStatusByCoordinator(ctx,arg)
3332+
}
3333+
33273334
func (q*querier)UpdateTemplateACLByID(ctx context.Context,arg database.UpdateTemplateACLByIDParams)error {
33283335
fetch:=func(ctx context.Context,arg database.UpdateTemplateACLByIDParams) (database.Template,error) {
33293336
returnq.db.GetTemplateByID(ctx,arg.ID)

‎coderd/database/dbauthz/dbauthz_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2152,6 +2152,11 @@ func (s *MethodTestSuite) TestTailnetFunctions() {
21522152
Asserts(rbac.ResourceTailnetCoordinator,policy.ActionCreate).
21532153
Errors(dbmem.ErrUnimplemented)
21542154
}))
2155+
s.Run("UpdateTailnetPeerStatusByCoordinator",s.Subtest(func(_ database.Store,check*expects) {
2156+
check.Args(database.UpdateTailnetPeerStatusByCoordinatorParams{}).
2157+
Asserts(rbac.ResourceTailnetCoordinator,policy.ActionUpdate).
2158+
Errors(dbmem.ErrUnimplemented)
2159+
}))
21552160
}
21562161

21572162
func (s*MethodTestSuite)TestDBCrypt() {

‎coderd/database/dbmem/dbmem.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7917,6 +7917,10 @@ func (q *FakeQuerier) UpdateReplica(_ context.Context, arg database.UpdateReplic
79177917
return database.Replica{},sql.ErrNoRows
79187918
}
79197919

7920+
func (*FakeQuerier)UpdateTailnetPeerStatusByCoordinator(context.Context, database.UpdateTailnetPeerStatusByCoordinatorParams)error {
7921+
returnErrUnimplemented
7922+
}
7923+
79207924
func (q*FakeQuerier)UpdateTemplateACLByID(_ context.Context,arg database.UpdateTemplateACLByIDParams)error {
79217925
iferr:=validateDatabaseType(arg);err!=nil {
79227926
returnerr

‎coderd/database/dbmetrics/dbmetrics.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.

‎coderd/database/dbmock/dbmock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.

‎coderd/database/dbtestutil/db.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type options struct {
3535
dumpOnFailurebool
3636
returnSQLDBfunc(*sql.DB)
3737
logger slog.Logger
38+
urlstring
3839
}
3940

4041
typeOptionfunc(*options)
@@ -59,6 +60,12 @@ func WithLogger(logger slog.Logger) Option {
5960
}
6061
}
6162

63+
funcWithURL(ustring)Option {
64+
returnfunc(o*options) {
65+
o.url=u
66+
}
67+
}
68+
6269
funcwithReturnSQLDB(ffunc(*sql.DB))Option {
6370
returnfunc(o*options) {
6471
o.returnSQLDB=f
@@ -92,6 +99,9 @@ func NewDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub) {
9299
ps:=pubsub.NewInMemory()
93100
ifWillUsePostgres() {
94101
connectionURL:=os.Getenv("CODER_PG_CONNECTION_URL")
102+
ifconnectionURL==""&&o.url!="" {
103+
connectionURL=o.url
104+
}
95105
ifconnectionURL=="" {
96106
var (
97107
errerror

‎coderd/database/querier.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.

‎coderd/database/queries.sql.go

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.

‎coderd/database/queries/tailnet.sql

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,14 @@ DO UPDATE SET
149149
updated_at= now() attime zone'utc'
150150
RETURNING*;
151151

152+
-- name: UpdateTailnetPeerStatusByCoordinator :exec
153+
UPDATE
154+
tailnet_peers
155+
SET
156+
status= $2
157+
WHERE
158+
coordinator_id= $1;
159+
152160
-- name: DeleteTailnetPeer :one
153161
DELETE
154162
FROM tailnet_peers

‎enterprise/tailnet/pgcoord.go

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,6 @@ func newPGCoordInternal(
144144
// signals when first heartbeat has been sent, so it's safe to start binding.
145145
fHB:=make(chanstruct{})
146146

147-
// we need to arrange for the querier to stop _after_ the tunneler and binder, since we delete
148-
// the coordinator when the querier stops (via the heartbeats). If the tunneler and binder are
149-
// still running, they could run afoul of foreign key constraints.
150-
querierCtx,querierCancel:=context.WithCancel(dbauthz.As(context.Background(),pgCoordSubject))
151147
c:=&pgCoord{
152148
ctx:ctx,
153149
cancel:cancel,
@@ -163,18 +159,9 @@ func newPGCoordInternal(
163159
handshaker:newHandshaker(ctx,logger,id,ps,rfhCh,fHB),
164160
handshakerCh:rfhCh,
165161
id:id,
166-
querier:newQuerier(querierCtx,logger,id,ps,store,id,cCh,ccCh,numQuerierWorkers,fHB,clk),
162+
querier:newQuerier(ctx,logger,id,ps,store,id,cCh,ccCh,numQuerierWorkers,fHB,clk),
167163
closed:make(chanstruct{}),
168164
}
169-
gofunc() {
170-
// when the main context is canceled, or the coordinator closed, the binder, tunneler, and
171-
// handshaker always eventually stop. Once they stop it's safe to cancel the querier context, which
172-
// has the effect of deleting the coordinator from the database and ceasing heartbeats.
173-
c.binder.workerWG.Wait()
174-
c.tunneler.workerWG.Wait()
175-
c.handshaker.workerWG.Wait()
176-
querierCancel()
177-
}()
178165
logger.Info(ctx,"starting coordinator")
179166
returnc,nil
180167
}
@@ -239,6 +226,9 @@ func (c *pgCoord) Close() error {
239226
c.cancel()
240227
c.closeOnce.Do(func() {close(c.closed) })
241228
c.querier.wait()
229+
c.binder.wait()
230+
c.tunneler.workerWG.Wait()
231+
c.handshaker.workerWG.Wait()
242232
returnnil
243233
}
244234

@@ -485,6 +475,7 @@ type binder struct {
485475
workQ*workQ[bKey]
486476

487477
workerWG sync.WaitGroup
478+
closechanstruct{}
488479
}
489480

490481
funcnewBinder(ctx context.Context,
@@ -502,6 +493,7 @@ func newBinder(ctx context.Context,
502493
bindings:bindings,
503494
latest:make(map[bKey]binding),
504495
workQ:newWorkQ[bKey](ctx),
496+
close:make(chanstruct{}),
505497
}
506498
gob.handleBindings()
507499
// add to the waitgroup immediately to avoid any races waiting for it before
@@ -513,14 +505,34 @@ func newBinder(ctx context.Context,
513505
gob.worker()
514506
}
515507
}()
508+
509+
gofunc() {
510+
deferclose(b.close)
511+
<-b.ctx.Done()
512+
b.logger.Debug(b.ctx,"binder exiting, waiting for workers")
513+
514+
b.workerWG.Wait()
515+
516+
b.logger.Debug(b.ctx,"updating peers to lost")
517+
518+
ctx,cancel:=context.WithTimeout(context.Background(),time.Second*15)
519+
defercancel()
520+
err:=b.store.UpdateTailnetPeerStatusByCoordinator(ctx, database.UpdateTailnetPeerStatusByCoordinatorParams{
521+
CoordinatorID:b.coordinatorID,
522+
Status:database.TailnetStatusLost,
523+
})
524+
iferr!=nil {
525+
b.logger.Error(b.ctx,"update peer status to lost",slog.Error(err))
526+
}
527+
}()
516528
returnb
517529
}
518530

519531
func (b*binder)handleBindings() {
520532
for {
521533
select {
522534
case<-b.ctx.Done():
523-
b.logger.Debug(b.ctx,"binder exiting",slog.Error(b.ctx.Err()))
535+
b.logger.Debug(b.ctx,"binder exiting")
524536
return
525537
casebnd:=<-b.bindings:
526538
b.storeBinding(bnd)
@@ -632,6 +644,10 @@ func (b *binder) retrieveBinding(bk bKey) binding {
632644
returnbnd
633645
}
634646

647+
func (b*binder)wait() {
648+
<-b.close
649+
}
650+
635651
// mapper tracks data sent to a peer, and sends updates based on changes read from the database.
636652
typemapperstruct {
637653
ctx context.Context
@@ -1646,7 +1662,6 @@ func (h *heartbeats) sendBeats() {
16461662
// send an initial heartbeat so that other coordinators can start using our bindings right away.
16471663
h.sendBeat()
16481664
close(h.firstHeartbeat)// signal binder it can start writing
1649-
deferh.sendDelete()
16501665
tkr:=h.clock.TickerFunc(h.ctx,HeartbeatPeriod,func()error {
16511666
h.sendBeat()
16521667
returnnil
@@ -1677,17 +1692,6 @@ func (h *heartbeats) sendBeat() {
16771692
h.failedHeartbeats=0
16781693
}
16791694

1680-
func (h*heartbeats)sendDelete() {
1681-
// here we don't want to use the main context, since it will have been canceled
1682-
ctx:=dbauthz.As(context.Background(),pgCoordSubject)
1683-
err:=h.store.DeleteCoordinator(ctx,h.self)
1684-
iferr!=nil {
1685-
h.logger.Error(h.ctx,"failed to send coordinator delete",slog.Error(err))
1686-
return
1687-
}
1688-
h.logger.Debug(h.ctx,"deleted coordinator")
1689-
}
1690-
16911695
func (h*heartbeats)cleanupLoop() {
16921696
deferh.wg.Done()
16931697
h.cleanup()

‎enterprise/tailnet/pgcoord_internal_test.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -396,17 +396,14 @@ func TestPGCoordinatorUnhealthy(t *testing.T) {
396396
UpsertTailnetCoordinator(gomock.Any(),gomock.Any()).
397397
Times(3).
398398
Return(database.TailnetCoordinator{},xerrors.New("badness"))
399-
mStore.EXPECT().
400-
DeleteCoordinator(gomock.Any(),gomock.Any()).
401-
Times(1).
402-
Return(nil)
403399
// But, in particular we DO NOT want the coordinator to call DeleteTailnetPeer, as this is
404400
// unnecessary and can spam the database. c.f. https://github.com/coder/coder/issues/12923
405401

406402
// these cleanup queries run, but we don't care for this test
407403
mStore.EXPECT().CleanTailnetCoordinators(gomock.Any()).AnyTimes().Return(nil)
408404
mStore.EXPECT().CleanTailnetLostPeers(gomock.Any()).AnyTimes().Return(nil)
409405
mStore.EXPECT().CleanTailnetTunnels(gomock.Any()).AnyTimes().Return(nil)
406+
mStore.EXPECT().UpdateTailnetPeerStatusByCoordinator(gomock.Any(),gomock.Any())
410407

411408
coordinator,err:=newPGCoordInternal(ctx,logger,ps,mStore,mClock)
412409
require.NoError(t,err)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp