Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb7d0071

Browse files
committed
feat: modify coordinators to send errors and peers to log them
1 parent6e0e29a commitb7d0071

File tree

12 files changed

+127
-49
lines changed

12 files changed

+127
-49
lines changed

‎enterprise/tailnet/connio.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ func (c *connIO) recvLoop() {
113113
select {
114114
case<-c.coordCtx.Done():
115115
c.logger.Debug(c.coordCtx,"exiting io recvLoop; coordinator exit")
116+
_=c.Enqueue(&proto.CoordinateResponse{Error:agpl.CloseErrCoordinatorClose})
116117
return
117118
case<-c.peerCtx.Done():
118119
c.logger.Debug(c.peerCtx,"exiting io recvLoop; peer context canceled")
@@ -123,6 +124,9 @@ func (c *connIO) recvLoop() {
123124
return
124125
}
125126
iferr:=c.handleRequest(req);err!=nil {
127+
if!xerrors.Is(err,errDisconnect) {
128+
_=c.Enqueue(&proto.CoordinateResponse{Error:err.Error()})
129+
}
126130
return
127131
}
128132
}
@@ -136,7 +140,7 @@ func (c *connIO) handleRequest(req *proto.CoordinateRequest) error {
136140
err:=c.auth.Authorize(c.peerCtx,req)
137141
iferr!=nil {
138142
c.logger.Warn(c.peerCtx,"unauthorized request",slog.Error(err))
139-
returnxerrors.Errorf("authorize request: %w",err)
143+
returnagpl.AuthorizationError{Wrapped:err}
140144
}
141145

142146
ifreq.UpdateSelf!=nil {

‎enterprise/tailnet/multiagent_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"testing"
66

7+
agpl"github.com/coder/coder/v2/tailnet"
8+
79
"github.com/stretchr/testify/require"
810

911
"cdr.dev/slog"
@@ -77,7 +79,7 @@ func TestPGCoordinator_MultiAgent_CoordClose(t *testing.T) {
7779
err=coord1.Close()
7880
require.NoError(t,err)
7981

80-
ma1.AssertEventuallyResponsesClosed()
82+
ma1.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
8183
}
8284

8385
// TestPGCoordinator_MultiAgent_UnsubscribeRace tests a single coordinator with

‎enterprise/tailnet/pgcoord.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ const (
3737
numHandshakerWorkers=5
3838
dbMaxBackoff=10*time.Second
3939
cleanupPeriod=time.Hour
40+
CloseErrUnhealthy="coordinator unhealthy"
4041
)
4142

4243
// pgCoord is a postgres-backed coordinator
@@ -235,6 +236,7 @@ func (c *pgCoord) Coordinate(
235236
c.logger.Info(ctx,"closed incoming coordinate call while unhealthy",
236237
slog.F("peer_id",id),
237238
)
239+
resps<-&proto.CoordinateResponse{Error:CloseErrUnhealthy}
238240
close(resps)
239241
returnreqs,resps
240242
}
@@ -882,6 +884,7 @@ func (q *querier) newConn(c *connIO) {
882884
q.mu.Lock()
883885
deferq.mu.Unlock()
884886
if!q.healthy {
887+
_=c.Enqueue(&proto.CoordinateResponse{Error:CloseErrUnhealthy})
885888
err:=c.Close()
886889
// This can only happen during a narrow window where we were healthy
887890
// when pgCoord checked before accepting the connection, but now are
@@ -1271,6 +1274,7 @@ func (q *querier) unhealthyCloseAll() {
12711274
for_,mpr:=rangeq.mappers {
12721275
// close connections async so that we don't block the querier routine that responds to updates
12731276
gofunc(c*connIO) {
1277+
_=c.Enqueue(&proto.CoordinateResponse{Error:CloseErrUnhealthy})
12741278
err:=c.Close()
12751279
iferr!=nil {
12761280
q.logger.Debug(q.ctx,"error closing conn while unhealthy",slog.Error(err))

‎enterprise/tailnet/pgcoord_internal_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,9 @@ func TestPGCoordinatorUnhealthy(t *testing.T) {
427427

428428
pID:= uuid.UUID{5}
429429
_,resps:=coordinator.Coordinate(ctx,pID,"test", agpl.AgentCoordinateeAuth{ID:pID})
430-
resp:=testutil.TryReceive(ctx,t,resps)
430+
resp:=testutil.RequireReceive(ctx,t,resps)
431+
require.Equal(t,CloseErrUnhealthy,resp.Error)
432+
resp=testutil.TryReceive(ctx,t,resps)
431433
require.Nil(t,resp,"channel should be closed")
432434

433435
// give the coordinator some time to process any pending work. We are

‎enterprise/tailnet/pgcoord_test.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -118,15 +118,15 @@ func TestPGCoordinatorSingle_AgentInvalidIP(t *testing.T) {
118118

119119
agent:=agpltest.NewAgent(ctx,t,coordinator,"agent")
120120
deferagent.Close(ctx)
121+
prefix:=agpl.TailscaleServicePrefix.RandomPrefix()
121122
agent.UpdateNode(&proto.Node{
122-
Addresses: []string{
123-
agpl.TailscaleServicePrefix.RandomPrefix().String(),
124-
},
123+
Addresses: []string{prefix.String()},
125124
PreferredDerp:10,
126125
})
127126

128127
// The agent connection should be closed immediately after sending an invalid addr
129-
agent.AssertEventuallyResponsesClosed()
128+
agent.AssertEventuallyResponsesClosed(
129+
agpl.AuthorizationError{Wrapped: agpl.InvalidNodeAddressError{Addr:prefix.Addr().String()}}.Error())
130130
assertEventuallyLost(ctx,t,store,agent.ID)
131131
}
132132

@@ -153,7 +153,8 @@ func TestPGCoordinatorSingle_AgentInvalidIPBits(t *testing.T) {
153153
})
154154

155155
// The agent connection should be closed immediately after sending an invalid addr
156-
agent.AssertEventuallyResponsesClosed()
156+
agent.AssertEventuallyResponsesClosed(
157+
agpl.AuthorizationError{Wrapped: agpl.InvalidAddressBitsError{Bits:64}}.Error())
157158
assertEventuallyLost(ctx,t,store,agent.ID)
158159
}
159160

@@ -493,19 +494,19 @@ func TestPGCoordinatorDual_Mainline(t *testing.T) {
493494
require.NoError(t,err)
494495

495496
// this closes agent2, client22, client21
496-
agent2.AssertEventuallyResponsesClosed()
497-
client22.AssertEventuallyResponsesClosed()
498-
client21.AssertEventuallyResponsesClosed()
497+
agent2.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
498+
client22.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
499+
client21.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
499500
assertEventuallyLost(ctx,t,store,agent2.ID)
500501
assertEventuallyLost(ctx,t,store,client21.ID)
501502
assertEventuallyLost(ctx,t,store,client22.ID)
502503

503504
err=coord1.Close()
504505
require.NoError(t,err)
505506
// this closes agent1, client12, client11
506-
agent1.AssertEventuallyResponsesClosed()
507-
client12.AssertEventuallyResponsesClosed()
508-
client11.AssertEventuallyResponsesClosed()
507+
agent1.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
508+
client12.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
509+
client11.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
509510
assertEventuallyLost(ctx,t,store,agent1.ID)
510511
assertEventuallyLost(ctx,t,store,client11.ID)
511512
assertEventuallyLost(ctx,t,store,client12.ID)
@@ -636,12 +637,12 @@ func TestPGCoordinator_Unhealthy(t *testing.T) {
636637
}
637638
}
638639
// connected agent should be disconnected
639-
agent1.AssertEventuallyResponsesClosed()
640+
agent1.AssertEventuallyResponsesClosed(tailnet.CloseErrUnhealthy)
640641

641642
// new agent should immediately disconnect
642643
agent2:=agpltest.NewAgent(ctx,t,uut,"agent2")
643644
deferagent2.Close(ctx)
644-
agent2.AssertEventuallyResponsesClosed()
645+
agent2.AssertEventuallyResponsesClosed(tailnet.CloseErrUnhealthy)
645646

646647
// next heartbeats succeed, so we are healthy
647648
fori:=0;i<2;i++ {
@@ -836,7 +837,7 @@ func TestPGCoordinatorDual_FailedHeartbeat(t *testing.T) {
836837
// we eventually disconnect from the coordinator.
837838
err=sdb1.Close()
838839
require.NoError(t,err)
839-
p1.AssertEventuallyResponsesClosed()
840+
p1.AssertEventuallyResponsesClosed(tailnet.CloseErrUnhealthy)
840841
p2.AssertEventuallyLost(p1.ID)
841842
// This basically checks that peer2 had no update
842843
// performed on their status since we are connected
@@ -891,7 +892,7 @@ func TestPGCoordinatorDual_PeerReconnect(t *testing.T) {
891892
// never send a DISCONNECTED update.
892893
err=c1.Close()
893894
require.NoError(t,err)
894-
p1.AssertEventuallyResponsesClosed()
895+
p1.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
895896
p2.AssertEventuallyLost(p1.ID)
896897
// This basically checks that peer2 had no update
897898
// performed on their status since we are connected

‎tailnet/controllers.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,11 @@ func (c *BasicCoordination) respLoop() {
284284
return
285285
}
286286

287+
ifresp.Error!="" {
288+
c.logger.Error(context.Background(),
289+
"coordination protocol error",slog.F("error",resp.Error))
290+
}
291+
287292
err=c.coordinatee.UpdatePeers(resp.GetPeerUpdates())
288293
iferr!=nil {
289294
c.logger.Debug(context.Background(),"failed to update peers",slog.Error(err))

‎tailnet/coordinator.go

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ const (
2424
// dropping updates
2525
ResponseBufferSize=512
2626
// RequestBufferSize is the max number of requests to buffer per connection
27-
RequestBufferSize=32
27+
RequestBufferSize=32
28+
CloseErrOverwritten="peer ID overwritten by new connection"
29+
CloseErrCoordinatorClose="coordinator closed"
2830
)
2931

3032
// Coordinator exchanges nodes with agents to establish connections.
@@ -97,6 +99,18 @@ var (
9799
ErrAlreadyRemoved=xerrors.New("already removed")
98100
)
99101

102+
typeAuthorizationErrorstruct {
103+
Wrappederror
104+
}
105+
106+
func (eAuthorizationError)Error()string {
107+
returnfmt.Sprintf("authorization: %s",e.Wrapped.Error())
108+
}
109+
110+
func (eAuthorizationError)Unwrap()error {
111+
returne.Wrapped
112+
}
113+
100114
// NewCoordinator constructs a new in-memory connection coordinator. This
101115
// coordinator is incompatible with multiple Coder replicas as all node data is
102116
// in-memory.
@@ -161,8 +175,12 @@ func (c *coordinator) Coordinate(
161175
c.wg.Add(1)
162176
gofunc() {
163177
deferc.wg.Done()
164-
p.reqLoop(ctx,logger,c.core.handleRequest)
165-
err:=c.core.lostPeer(p)
178+
loopErr:=p.reqLoop(ctx,logger,c.core.handleRequest)
179+
closeErrStr:=""
180+
ifloopErr!=nil {
181+
closeErrStr=loopErr.Error()
182+
}
183+
err:=c.core.lostPeer(p,closeErrStr)
166184
ifxerrors.Is(err,ErrClosed)||xerrors.Is(err,ErrAlreadyRemoved) {
167185
return
168186
}
@@ -227,7 +245,7 @@ func (c *core) handleRequest(ctx context.Context, p *peer, req *proto.Coordinate
227245
}
228246

229247
iferr:=pr.auth.Authorize(ctx,req);err!=nil {
230-
returnxerrors.Errorf("authorize request: %w",err)
248+
returnAuthorizationError{Wrapped:err}
231249
}
232250

233251
ifreq.UpdateSelf!=nil {
@@ -270,7 +288,7 @@ func (c *core) handleRequest(ctx context.Context, p *peer, req *proto.Coordinate
270288
}
271289
}
272290
ifreq.Disconnect!=nil {
273-
c.removePeerLocked(p.id,proto.CoordinateResponse_PeerUpdate_DISCONNECTED,"graceful disconnect")
291+
c.removePeerLocked(p.id,proto.CoordinateResponse_PeerUpdate_DISCONNECTED,"graceful disconnect","")
274292
}
275293
ifrfhs:=req.ReadyForHandshake;rfhs!=nil {
276294
err:=c.handleReadyForHandshakeLocked(pr,rfhs)
@@ -344,7 +362,7 @@ func (c *core) updateTunnelPeersLocked(id uuid.UUID, n *proto.Node, k proto.Coor
344362
err:=other.updateMappingLocked(id,n,k,reason)
345363
iferr!=nil {
346364
other.logger.Error(context.Background(),"failed to update mapping",slog.Error(err))
347-
c.removePeerLocked(other.id,proto.CoordinateResponse_PeerUpdate_DISCONNECTED,"failed update")
365+
c.removePeerLocked(other.id,proto.CoordinateResponse_PeerUpdate_DISCONNECTED,"failed update","failed to update tunnel peer mapping")
348366
}
349367
}
350368
}
@@ -360,7 +378,8 @@ func (c *core) addTunnelLocked(src *peer, dstID uuid.UUID) error {
360378
err:=src.updateMappingLocked(dstID,dst.node,proto.CoordinateResponse_PeerUpdate_NODE,"add tunnel")
361379
iferr!=nil {
362380
src.logger.Error(context.Background(),"failed update of tunnel src",slog.Error(err))
363-
c.removePeerLocked(src.id,proto.CoordinateResponse_PeerUpdate_DISCONNECTED,"failed update")
381+
c.removePeerLocked(src.id,proto.CoordinateResponse_PeerUpdate_DISCONNECTED,"failed update",
382+
"failed to update tunnel dest mapping")
364383
// if the source fails, then the tunnel is also removed and there is no reason to continue
365384
// processing.
366385
returnerr
@@ -370,7 +389,8 @@ func (c *core) addTunnelLocked(src *peer, dstID uuid.UUID) error {
370389
err:=dst.updateMappingLocked(src.id,src.node,proto.CoordinateResponse_PeerUpdate_NODE,"add tunnel")
371390
iferr!=nil {
372391
dst.logger.Error(context.Background(),"failed update of tunnel dst",slog.Error(err))
373-
c.removePeerLocked(dst.id,proto.CoordinateResponse_PeerUpdate_DISCONNECTED,"failed update")
392+
c.removePeerLocked(dst.id,proto.CoordinateResponse_PeerUpdate_DISCONNECTED,"failed update",
393+
"failed to update tunnel src mapping")
374394
}
375395
}
376396
}
@@ -381,7 +401,7 @@ func (c *core) removeTunnelLocked(src *peer, dstID uuid.UUID) error {
381401
err:=src.updateMappingLocked(dstID,nil,proto.CoordinateResponse_PeerUpdate_DISCONNECTED,"remove tunnel")
382402
iferr!=nil {
383403
src.logger.Error(context.Background(),"failed to update",slog.Error(err))
384-
c.removePeerLocked(src.id,proto.CoordinateResponse_PeerUpdate_DISCONNECTED,"failed update")
404+
c.removePeerLocked(src.id,proto.CoordinateResponse_PeerUpdate_DISCONNECTED,"failed update","failed to remove tunnel dest mapping")
385405
// removing the peer also removes all other tunnels and notifies destinations, so it's safe to
386406
// return here.
387407
returnerr
@@ -391,7 +411,7 @@ func (c *core) removeTunnelLocked(src *peer, dstID uuid.UUID) error {
391411
err=dst.updateMappingLocked(src.id,nil,proto.CoordinateResponse_PeerUpdate_DISCONNECTED,"remove tunnel")
392412
iferr!=nil {
393413
dst.logger.Error(context.Background(),"failed to update",slog.Error(err))
394-
c.removePeerLocked(dst.id,proto.CoordinateResponse_PeerUpdate_DISCONNECTED,"failed update")
414+
c.removePeerLocked(dst.id,proto.CoordinateResponse_PeerUpdate_DISCONNECTED,"failed update","failed to remove tunnel src mapping")
395415
// don't return here because we still want to remove the tunnel, and an error at the
396416
// destination doesn't count as an error removing the tunnel at the source.
397417
}
@@ -413,6 +433,11 @@ func (c *core) initPeer(p *peer) error {
413433
ifold,ok:=c.peers[p.id];ok {
414434
// rare and interesting enough to log at Info, but it isn't an error per se
415435
old.logger.Info(context.Background(),"overwritten by new connection")
436+
select {
437+
caseold.resps<-&proto.CoordinateResponse{Error:CloseErrOverwritten}:
438+
default:
439+
// pass
440+
}
416441
close(old.resps)
417442
p.overwrites=old.overwrites+1
418443
}
@@ -433,7 +458,7 @@ func (c *core) initPeer(p *peer) error {
433458

434459
// removePeer removes and cleans up a lost peer. It updates all peers it shares a tunnel with, deletes
435460
// all tunnels from which the removed peer is the source.
436-
func (c*core)lostPeer(p*peer)error {
461+
func (c*core)lostPeer(p*peer,closeErrstring)error {
437462
c.mutex.Lock()
438463
deferc.mutex.Unlock()
439464
c.logger.Debug(context.Background(),"lostPeer",slog.F("peer_id",p.id))
@@ -443,18 +468,25 @@ func (c *core) lostPeer(p *peer) error {
443468
ifexisting,ok:=c.peers[p.id];!ok||existing!=p {
444469
returnErrAlreadyRemoved
445470
}
446-
c.removePeerLocked(p.id,proto.CoordinateResponse_PeerUpdate_LOST,"lost")
471+
c.removePeerLocked(p.id,proto.CoordinateResponse_PeerUpdate_LOST,"lost",closeErr)
447472
returnnil
448473
}
449474

450-
func (c*core)removePeerLocked(id uuid.UUID,kind proto.CoordinateResponse_PeerUpdate_Kind,reasonstring) {
475+
func (c*core)removePeerLocked(id uuid.UUID,kind proto.CoordinateResponse_PeerUpdate_Kind,reason,closeErrstring) {
451476
p,ok:=c.peers[id]
452477
if!ok {
453478
c.logger.Critical(context.Background(),"removed non-existent peer %s",id)
454479
return
455480
}
456481
c.updateTunnelPeersLocked(id,nil,kind,reason)
457482
c.tunnels.removeAll(id)
483+
ifcloseErr!="" {
484+
select {
485+
casep.resps<-&proto.CoordinateResponse{Error:closeErr}:
486+
default:
487+
// blocked, pass.
488+
}
489+
}
458490
close(p.resps)
459491
delete(c.peers,id)
460492
}
@@ -487,7 +519,8 @@ func (c *core) close() error {
487519
forid:=rangec.peers {
488520
// when closing, mark them as LOST so that we don't disrupt in-progress
489521
// connections.
490-
c.removePeerLocked(id,proto.CoordinateResponse_PeerUpdate_LOST,"coordinator close")
522+
c.removePeerLocked(id,proto.CoordinateResponse_PeerUpdate_LOST,"coordinator close",
523+
CloseErrCoordinatorClose)
491524
}
492525
returnnil
493526
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp