Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

fix: avoid deleting peers on graceful close#14165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
sreya merged 14 commits intomainfromjon/pgc
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletionscoderd/database/dbauthz/dbauthz.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -3160,6 +3160,13 @@ func (q *querier) UpdateReplica(ctx context.Context, arg database.UpdateReplicaP
return q.db.UpdateReplica(ctx, arg)
}

func (q *querier) UpdateTailnetPeerStatusByCoordinator(ctx context.Context, arg database.UpdateTailnetPeerStatusByCoordinatorParams) error {
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceTailnetCoordinator); err != nil {
return err
}
return q.db.UpdateTailnetPeerStatusByCoordinator(ctx, arg)
}

func (q *querier) UpdateTemplateACLByID(ctx context.Context, arg database.UpdateTemplateACLByIDParams) error {
fetch := func(ctx context.Context, arg database.UpdateTemplateACLByIDParams) (database.Template, error) {
return q.db.GetTemplateByID(ctx, arg.ID)
Expand Down
5 changes: 5 additions & 0 deletionscoderd/database/dbauthz/dbauthz_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -2053,6 +2053,11 @@ func (s *MethodTestSuite) TestTailnetFunctions() {
Asserts(rbac.ResourceTailnetCoordinator, policy.ActionCreate).
Errors(dbmem.ErrUnimplemented)
}))
s.Run("UpdateTailnetPeerStatusByCoordinator", s.Subtest(func(_ database.Store, check *expects) {
check.Args(database.UpdateTailnetPeerStatusByCoordinatorParams{}).
Asserts(rbac.ResourceTailnetCoordinator, policy.ActionUpdate).
Errors(dbmem.ErrUnimplemented)
}))
}

func (s *MethodTestSuite) TestDBCrypt() {
Expand Down
4 changes: 4 additions & 0 deletionscoderd/database/dbmem/dbmem.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -7759,6 +7759,10 @@ func (q *FakeQuerier) UpdateReplica(_ context.Context, arg database.UpdateReplic
return database.Replica{}, sql.ErrNoRows
}

func (*FakeQuerier) UpdateTailnetPeerStatusByCoordinator(context.Context, database.UpdateTailnetPeerStatusByCoordinatorParams) error {
return ErrUnimplemented
}

func (q *FakeQuerier) UpdateTemplateACLByID(_ context.Context, arg database.UpdateTemplateACLByIDParams) error {
if err := validateDatabaseType(arg); err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletionscoderd/database/dbmetrics/dbmetrics.go
View file
Open in desktop

Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.

14 changes: 14 additions & 0 deletionscoderd/database/dbmock/dbmock.go
View file
Open in desktop

Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.

10 changes: 10 additions & 0 deletionscoderd/database/dbtestutil/db.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -35,6 +35,7 @@ type options struct {
dumpOnFailure bool
returnSQLDB func(*sql.DB)
logger slog.Logger
url string
}

type Option func(*options)
Expand All@@ -59,6 +60,12 @@ func WithLogger(logger slog.Logger) Option {
}
}

func WithURL(u string) Option {
return func(o *options) {
o.url = u
}
}

func withReturnSQLDB(f func(*sql.DB)) Option {
return func(o *options) {
o.returnSQLDB = f
Expand DownExpand Up@@ -92,6 +99,9 @@ func NewDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub) {
ps := pubsub.NewInMemory()
if WillUsePostgres() {
connectionURL := os.Getenv("CODER_PG_CONNECTION_URL")
if connectionURL == "" && o.url != "" {
connectionURL = o.url
}
if connectionURL == "" {
var (
err error
Expand Down
1 change: 1 addition & 0 deletionscoderd/database/querier.go
View file
Open in desktop

Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.

19 changes: 19 additions & 0 deletionscoderd/database/queries.sql.go
View file
Open in desktop

Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.

8 changes: 8 additions & 0 deletionscoderd/database/queries/tailnet.sql
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -149,6 +149,14 @@ DO UPDATE SET
updated_at = now() at time zone 'utc'
RETURNING *;

-- name: UpdateTailnetPeerStatusByCoordinator :exec
UPDATE
tailnet_peers
SET
status = $2
WHERE
coordinator_id = $1;

-- name: DeleteTailnetPeer :one
DELETE
FROM tailnet_peers
Expand Down
58 changes: 31 additions & 27 deletionsenterprise/tailnet/pgcoord.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -144,10 +144,6 @@ func newPGCoordInternal(
// signals when first heartbeat has been sent, so it's safe to start binding.
fHB := make(chan struct{})

// we need to arrange for the querier to stop _after_ the tunneler and binder, since we delete
// the coordinator when the querier stops (via the heartbeats). If the tunneler and binder are
// still running, they could run afoul of foreign key constraints.
querierCtx, querierCancel := context.WithCancel(dbauthz.As(context.Background(), pgCoordSubject))
c := &pgCoord{
ctx: ctx,
cancel: cancel,
Expand All@@ -163,18 +159,9 @@ func newPGCoordInternal(
handshaker: newHandshaker(ctx, logger, id, ps, rfhCh, fHB),
handshakerCh: rfhCh,
id: id,
querier: newQuerier(querierCtx, logger, id, ps, store, id, cCh, ccCh, numQuerierWorkers, fHB, clk),
querier: newQuerier(ctx, logger, id, ps, store, id, cCh, ccCh, numQuerierWorkers, fHB, clk),
closed: make(chan struct{}),
}
go func() {
// when the main context is canceled, or the coordinator closed, the binder, tunneler, and
// handshaker always eventually stop. Once they stop it's safe to cancel the querier context, which
// has the effect of deleting the coordinator from the database and ceasing heartbeats.
c.binder.workerWG.Wait()
c.tunneler.workerWG.Wait()
c.handshaker.workerWG.Wait()
querierCancel()
}()
logger.Info(ctx, "starting coordinator")
return c, nil
}
Expand DownExpand Up@@ -239,6 +226,9 @@ func (c *pgCoord) Close() error {
c.cancel()
c.closeOnce.Do(func() { close(c.closed) })
c.querier.wait()
c.binder.wait()
c.tunneler.workerWG.Wait()
c.handshaker.workerWG.Wait()
return nil
}

Expand DownExpand Up@@ -485,6 +475,7 @@ type binder struct {
workQ *workQ[bKey]

workerWG sync.WaitGroup
close chan struct{}
}

func newBinder(ctx context.Context,
Expand All@@ -502,6 +493,7 @@ func newBinder(ctx context.Context,
bindings: bindings,
latest: make(map[bKey]binding),
workQ: newWorkQ[bKey](ctx),
close: make(chan struct{}),
}
go b.handleBindings()
// add to the waitgroup immediately to avoid any races waiting for it before
Expand All@@ -513,14 +505,34 @@ func newBinder(ctx context.Context,
go b.worker()
}
}()

go func() {
defer close(b.close)
<-b.ctx.Done()
b.logger.Debug(b.ctx, "binder exiting, waiting for workers")

b.workerWG.Wait()

b.logger.Debug(b.ctx, "updating peers to lost")

ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
defer cancel()
err := b.store.UpdateTailnetPeerStatusByCoordinator(ctx, database.UpdateTailnetPeerStatusByCoordinatorParams{
CoordinatorID: b.coordinatorID,
Status: database.TailnetStatusLost,
})
if err != nil {
b.logger.Error(b.ctx, "update peer status to lost", slog.Error(err))
}
}()
return b
}

func (b *binder) handleBindings() {
for {
select {
case <-b.ctx.Done():
b.logger.Debug(b.ctx, "binder exiting", slog.Error(b.ctx.Err()))
b.logger.Debug(b.ctx, "binder exiting")
return
case bnd := <-b.bindings:
b.storeBinding(bnd)
Expand DownExpand Up@@ -632,6 +644,10 @@ func (b *binder) retrieveBinding(bk bKey) binding {
return bnd
}

func (b *binder) wait() {
<-b.close
}

// mapper tracks data sent to a peer, and sends updates based on changes read from the database.
type mapper struct {
ctx context.Context
Expand DownExpand Up@@ -1646,7 +1662,6 @@ func (h *heartbeats) sendBeats() {
// send an initial heartbeat so that other coordinators can start using our bindings right away.
h.sendBeat()
close(h.firstHeartbeat) // signal binder it can start writing
defer h.sendDelete()
tkr := h.clock.TickerFunc(h.ctx, HeartbeatPeriod, func() error {
h.sendBeat()
return nil
Expand DownExpand Up@@ -1677,17 +1692,6 @@ func (h *heartbeats) sendBeat() {
h.failedHeartbeats = 0
}

func (h *heartbeats) sendDelete() {
// here we don't want to use the main context, since it will have been canceled
ctx := dbauthz.As(context.Background(), pgCoordSubject)
err := h.store.DeleteCoordinator(ctx, h.self)
if err != nil {
h.logger.Error(h.ctx, "failed to send coordinator delete", slog.Error(err))
return
}
h.logger.Debug(h.ctx, "deleted coordinator")
}

func (h *heartbeats) cleanupLoop() {
defer h.wg.Done()
h.cleanup()
Expand Down
5 changes: 1 addition & 4 deletionsenterprise/tailnet/pgcoord_internal_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -396,17 +396,14 @@ func TestPGCoordinatorUnhealthy(t *testing.T) {
UpsertTailnetCoordinator(gomock.Any(), gomock.Any()).
Times(3).
Return(database.TailnetCoordinator{}, xerrors.New("badness"))
mStore.EXPECT().
DeleteCoordinator(gomock.Any(), gomock.Any()).
Times(1).
Return(nil)
// But, in particular we DO NOT want the coordinator to call DeleteTailnetPeer, as this is
// unnecessary and can spam the database. c.f. https://github.com/coder/coder/issues/12923

// these cleanup queries run, but we don't care for this test
mStore.EXPECT().CleanTailnetCoordinators(gomock.Any()).AnyTimes().Return(nil)
mStore.EXPECT().CleanTailnetLostPeers(gomock.Any()).AnyTimes().Return(nil)
mStore.EXPECT().CleanTailnetTunnels(gomock.Any()).AnyTimes().Return(nil)
mStore.EXPECT().UpdateTailnetPeerStatusByCoordinator(gomock.Any(), gomock.Any())

coordinator, err := newPGCoordInternal(ctx, logger, ps, mStore, mClock)
require.NoError(t, err)
Expand Down
Loading
Loading

[8]ページ先頭

©2009-2025 Movatter.jp