Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit16121ac

Browse files
committed
fix: stop sending DeleteTailnetPeer when coordinator is unhealthy
1 parentb6359b0 commit16121ac

File tree

2 files changed

+74
-1
lines changed

2 files changed

+74
-1
lines changed

‎enterprise/tailnet/pgcoord.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,17 @@ func (c *pgCoord) Coordinate(
231231
logger:=c.logger.With(slog.F("peer_id",id))
232232
reqs:=make(chan*proto.CoordinateRequest,agpl.RequestBufferSize)
233233
resps:=make(chan*proto.CoordinateResponse,agpl.ResponseBufferSize)
234+
if!c.querier.isHealthy() {
235+
// If the coordinator is unhealthy, we don't want to hook this Coordinate call up to the
236+
// binder, as that can cause an unnecessary call to DeleteTailnetPeer when the connIO is
237+
// closed. Instead, we just close the response channel and bail out.
238+
// c.f. https://github.com/coder/coder/issues/12923
239+
c.logger.Info(ctx,"closed incoming coordinate call while unhealthy",
240+
slog.F("peer_id",id),
241+
)
242+
close(resps)
243+
returnreqs,resps
244+
}
234245
cIO:=newConnIO(c.ctx,ctx,logger,c.bindings,c.tunnelerCh,reqs,resps,id,name,a)
235246
err:=agpl.SendCtx(c.ctx,c.newConnections,cIO)
236247
iferr!=nil {
@@ -842,7 +853,12 @@ func (q *querier) newConn(c *connIO) {
842853
deferq.mu.Unlock()
843854
if!q.healthy {
844855
err:=c.Close()
845-
q.logger.Info(q.ctx,"closed incoming connection while unhealthy",
856+
// This can only happen during a narrow window where we were healthy
857+
// when pgCoord checked before accepting the connection, but now are
858+
// unhealthy now that we get around to processing it. Seeing a small
859+
// number of these logs is not worrying, but a large number probably
860+
// indicates something is amiss.
861+
q.logger.Warn(q.ctx,"closed incoming connection while unhealthy",
846862
slog.Error(err),
847863
slog.F("peer_id",c.UniqueID()),
848864
)
@@ -865,6 +881,12 @@ func (q *querier) newConn(c *connIO) {
865881
})
866882
}
867883

884+
func (q*querier)isHealthy()bool {
885+
q.mu.Lock()
886+
deferq.mu.Unlock()
887+
returnq.healthy
888+
}
889+
868890
func (q*querier)cleanupConn(c*connIO) {
869891
logger:=q.logger.With(slog.F("peer_id",c.UniqueID()))
870892
q.mu.Lock()

‎enterprise/tailnet/pgcoord_internal_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/google/uuid"
1414
"github.com/stretchr/testify/require"
1515
"go.uber.org/mock/gomock"
16+
"golang.org/x/xerrors"
1617
gProto"google.golang.org/protobuf/proto"
1718

1819
"cdr.dev/slog"
@@ -21,6 +22,8 @@ import (
2122
"github.com/coder/coder/v2/coderd/database"
2223
"github.com/coder/coder/v2/coderd/database/dbmock"
2324
"github.com/coder/coder/v2/coderd/database/dbtestutil"
25+
"github.com/coder/coder/v2/coderd/database/pubsub"
26+
agpl"github.com/coder/coder/v2/tailnet"
2427
"github.com/coder/coder/v2/tailnet/proto"
2528
"github.com/coder/coder/v2/testutil"
2629
)
@@ -291,3 +294,51 @@ func TestGetDebug(t *testing.T) {
291294
require.Equal(t,peerID,debug.Tunnels[0].SrcID)
292295
require.Equal(t,dstID,debug.Tunnels[0].DstID)
293296
}
297+
298+
// TestPGCoordinatorUnhealthy tests that when the coordinator fails to send heartbeats and is
299+
// unhealthy it disconnects any peers and does not send any extraneous database queries.
300+
funcTestPGCoordinatorUnhealthy(t*testing.T) {
301+
t.Parallel()
302+
ctx:=testutil.Context(t,testutil.WaitShort)
303+
logger:=slogtest.Make(t,&slogtest.Options{IgnoreErrors:true}).Leveled(slog.LevelDebug)
304+
305+
ctrl:=gomock.NewController(t)
306+
mStore:=dbmock.NewMockStore(ctrl)
307+
ps:=pubsub.NewInMemory()
308+
309+
// after 3 failed heartbeats, the coordinator is unhealthy
310+
mStore.EXPECT().
311+
UpsertTailnetCoordinator(gomock.Any(),gomock.Any()).
312+
MinTimes(3).
313+
Return(database.TailnetCoordinator{},xerrors.New("badness"))
314+
mStore.EXPECT().
315+
DeleteCoordinator(gomock.Any(),gomock.Any()).
316+
Times(1).
317+
Return(nil)
318+
// But, in particular we DO NOT want the coordinator to call DeleteTailnetPeer, as this is
319+
// unnecessary and can spam the database. c.f. https://github.com/coder/coder/issues/12923
320+
321+
// these cleanup queries run, but we don't care for this test
322+
mStore.EXPECT().CleanTailnetCoordinators(gomock.Any()).AnyTimes().Return(nil)
323+
mStore.EXPECT().CleanTailnetLostPeers(gomock.Any()).AnyTimes().Return(nil)
324+
mStore.EXPECT().CleanTailnetTunnels(gomock.Any()).AnyTimes().Return(nil)
325+
326+
coordinator,err:=newPGCoordInternal(ctx,logger,ps,mStore)
327+
require.NoError(t,err)
328+
329+
require.Eventually(t,func()bool {
330+
return!coordinator.querier.isHealthy()
331+
},testutil.WaitShort,testutil.IntervalFast)
332+
333+
pID:= uuid.UUID{5}
334+
_,resps:=coordinator.Coordinate(ctx,pID,"test", agpl.AgentCoordinateeAuth{ID:pID})
335+
resp:=testutil.RequireRecvCtx(ctx,t,resps)
336+
require.Nil(t,resp,"channel should be closed")
337+
338+
// give the coordinator some time to process any pending work. We are
339+
// testing here that a database call is absent, so we don't want to race to
340+
// shut down the test.
341+
time.Sleep(testutil.IntervalMedium)
342+
_=coordinator.Close()
343+
require.Eventually(t,ctrl.Satisfied,testutil.WaitShort,testutil.IntervalFast)
344+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp