Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3aa6b73

Browse files
committed
fix: stop sending DeleteTailnetPeer when coordinator is unhealthy
1 parentb6359b0 commit3aa6b73

File tree

2 files changed

+70
-1
lines changed

2 files changed

+70
-1
lines changed

‎enterprise/tailnet/pgcoord.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,17 @@ func (c *pgCoord) Coordinate(
231231
logger:=c.logger.With(slog.F("peer_id",id))
232232
reqs:=make(chan*proto.CoordinateRequest,agpl.RequestBufferSize)
233233
resps:=make(chan*proto.CoordinateResponse,agpl.ResponseBufferSize)
234+
if!c.querier.isHealthy() {
235+
// If the coordinator is unhealthy, we don't want to hook this Coordinate call up to the
236+
// binder, as that can cause an unnecessary call to DeleteTailnetPeer when the connIO is
237+
// closed. Instead, we just close the response channel and bail out.
238+
// c.f. https://github.com/coder/coder/issues/12923
239+
c.logger.Info(ctx,"closed incoming coordinate call while unhealthy",
240+
slog.F("peer_id",id),
241+
)
242+
close(resps)
243+
returnreqs,resps
244+
}
234245
cIO:=newConnIO(c.ctx,ctx,logger,c.bindings,c.tunnelerCh,reqs,resps,id,name,a)
235246
err:=agpl.SendCtx(c.ctx,c.newConnections,cIO)
236247
iferr!=nil {
@@ -842,7 +853,7 @@ func (q *querier) newConn(c *connIO) {
842853
deferq.mu.Unlock()
843854
if!q.healthy {
844855
err:=c.Close()
845-
q.logger.Info(q.ctx,"closed incoming connection while unhealthy",
856+
q.logger.Warn(q.ctx,"closed incoming connection while unhealthy",
846857
slog.Error(err),
847858
slog.F("peer_id",c.UniqueID()),
848859
)
@@ -865,6 +876,12 @@ func (q *querier) newConn(c *connIO) {
865876
})
866877
}
867878

879+
func (q*querier)isHealthy()bool {
880+
q.mu.Lock()
881+
deferq.mu.Unlock()
882+
returnq.healthy
883+
}
884+
868885
func (q*querier)cleanupConn(c*connIO) {
869886
logger:=q.logger.With(slog.F("peer_id",c.UniqueID()))
870887
q.mu.Lock()

‎enterprise/tailnet/pgcoord_internal_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ import (
1010
"testing"
1111
"time"
1212

13+
"github.com/coder/coder/v2/coderd/database/pubsub"
14+
agpl"github.com/coder/coder/v2/tailnet"
15+
"golang.org/x/xerrors"
16+
1317
"github.com/google/uuid"
1418
"github.com/stretchr/testify/require"
1519
"go.uber.org/mock/gomock"
@@ -291,3 +295,51 @@ func TestGetDebug(t *testing.T) {
291295
require.Equal(t,peerID,debug.Tunnels[0].SrcID)
292296
require.Equal(t,dstID,debug.Tunnels[0].DstID)
293297
}
298+
299+
// TestPGCoordinatorUnhealthy tests that when the coordinator fails to send heartbeats and is
300+
// unhealthy it disconnects any peers and does not send any extraneous database queries.
301+
funcTestPGCoordinatorUnhealthy(t*testing.T) {
302+
t.Parallel()
303+
ctx:=testutil.Context(t,testutil.WaitShort)
304+
logger:=slogtest.Make(t,&slogtest.Options{IgnoreErrors:true}).Leveled(slog.LevelDebug)
305+
306+
ctrl:=gomock.NewController(t)
307+
mStore:=dbmock.NewMockStore(ctrl)
308+
ps:=pubsub.NewInMemory()
309+
310+
// after 3 failed heartbeats, the coordinator is unhealthy
311+
mStore.EXPECT().
312+
UpsertTailnetCoordinator(gomock.Any(),gomock.Any()).
313+
MinTimes(3).
314+
Return(database.TailnetCoordinator{},xerrors.New("badness"))
315+
mStore.EXPECT().
316+
DeleteCoordinator(gomock.Any(),gomock.Any()).
317+
Times(1).
318+
Return(nil)
319+
// But, in particular we DO NOT want the coordinator to call DeleteTailnetPeer, as this is
320+
// unnecessary and can spam the database. c.f. https://github.com/coder/coder/issues/12923
321+
322+
// these cleanup queries run, but we don't care for this test
323+
mStore.EXPECT().CleanTailnetCoordinators(gomock.Any()).AnyTimes().Return(nil)
324+
mStore.EXPECT().CleanTailnetLostPeers(gomock.Any()).AnyTimes().Return(nil)
325+
mStore.EXPECT().CleanTailnetTunnels(gomock.Any()).AnyTimes().Return(nil)
326+
327+
coordinator,err:=newPGCoordInternal(ctx,logger,ps,mStore)
328+
require.NoError(t,err)
329+
330+
require.Eventually(t,func()bool {
331+
return!coordinator.querier.isHealthy()
332+
},testutil.WaitShort,testutil.IntervalFast)
333+
334+
pID:= uuid.UUID{5}
335+
_,resps:=coordinator.Coordinate(ctx,pID,"test", agpl.AgentCoordinateeAuth{ID:pID})
336+
resp:=testutil.RequireRecvCtx(ctx,t,resps)
337+
require.Nil(t,resp,"channel should be closed")
338+
339+
// give the coordinator some time to process any pending work. We are
340+
// testing here that a database call is absent, so we don't want to race to
341+
// shut down the test.
342+
time.Sleep(testutil.IntervalMedium)
343+
_=coordinator.Close()
344+
require.Eventually(t,ctrl.Satisfied,testutil.WaitShort,testutil.IntervalFast)
345+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp