Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3b8d264

Browse files
committed
chore: add support for peer updates to tailnet.configMaps
1 parentc125206 commit3b8d264

File tree

4 files changed

+735
-21
lines changed

4 files changed

+735
-21
lines changed

‎go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ require (
206206

207207
requirego.uber.org/mockv0.4.0
208208

209+
requiregithub.com/benbjohnson/clockv1.3.5// indirect
210+
209211
require (
210212
cloud.google.com/go/computev1.23.3// indirect
211213
cloud.google.com/go/loggingv1.8.1// indirect

‎go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiE
123123
github.com/aymanbagabas/go-osc52/v2v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8=
124124
github.com/aymerick/douceurv0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk=
125125
github.com/aymerick/douceurv0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4=
126+
github.com/benbjohnson/clockv1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
127+
github.com/benbjohnson/clockv1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
126128
github.com/beorn7/perksv1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
127129
github.com/beorn7/perksv1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
128130
github.com/bep/godartsassv1.2.0 h1:E2VvQrxAHAFwbjyOIExAMmogTItSKodoKuijNrGm5yU=

‎tailnet/configmaps.go

Lines changed: 222 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@ package tailnet
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"net/netip"
78
"sync"
9+
"time"
810

11+
"github.com/benbjohnson/clock"
912
"github.com/google/uuid"
1013
"go4.org/netipx"
14+
"tailscale.com/ipn/ipnstate"
1115
"tailscale.com/net/dns"
1216
"tailscale.com/tailcfg"
1317
"tailscale.com/types/ipproto"
@@ -23,10 +27,13 @@ import (
2327
"github.com/coder/coder/v2/tailnet/proto"
2428
)
2529

30+
constlostTimeout=15*time.Minute
31+
2632
// engineConfigurable is the subset of wgengine.Engine that we use for configuration.
2733
//
2834
// This allows us to test configuration code without faking the whole interface.
2935
typeengineConfigurableinterface {
36+
UpdateStatus(*ipnstate.StatusBuilder)
3037
SetNetworkMap(*netmap.NetworkMap)
3138
Reconfig(*wgcfg.Config,*router.Config,*dns.Config,*tailcfg.Debug)error
3239
SetDERPMap(*tailcfg.DERPMap)
@@ -49,12 +56,16 @@ type configMaps struct {
4956
closingbool
5057
phasephase
5158

52-
engineengineConfigurable
53-
static netmap.NetworkMap
54-
peersmap[uuid.UUID]*peerLifecycle
55-
addresses []netip.Prefix
56-
derpMap*proto.DERPMap
57-
logger slog.Logger
59+
engineengineConfigurable
60+
static netmap.NetworkMap
61+
peersmap[uuid.UUID]*peerLifecycle
62+
addresses []netip.Prefix
63+
derpMap*proto.DERPMap
64+
logger slog.Logger
65+
blockEndpointsbool
66+
67+
// for testing
68+
clock clock.Clock
5869
}
5970

6071
funcnewConfigMaps(logger slog.Logger,engineengineConfigurable,nodeID tailcfg.NodeID,nodeKey key.NodePrivate,discoKey key.DiscoPublic,addresses []netip.Prefix)*configMaps {
@@ -101,6 +112,7 @@ func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg
101112
},
102113
peers:make(map[uuid.UUID]*peerLifecycle),
103114
addresses:addresses,
115+
clock:clock.New(),
104116
}
105117
goc.configLoop()
106118
returnc
@@ -165,6 +177,9 @@ func (c *configMaps) configLoop() {
165177
func (c*configMaps)close() {
166178
c.L.Lock()
167179
deferc.L.Unlock()
180+
for_,lc:=rangec.peers {
181+
lc.resetTimer()
182+
}
168183
c.closing=true
169184
c.Broadcast()
170185
forc.phase!=closed {
@@ -260,11 +275,208 @@ func (c *configMaps) filterLocked() *filter.Filter {
260275
)
261276
}
262277

278+
// updatePeers handles protocol updates about peers from the coordinator. c.L MUST NOT be held.
279+
func (c*configMaps)updatePeers(updates []*proto.CoordinateResponse_PeerUpdate) {
280+
status:=c.status()
281+
c.L.Lock()
282+
deferc.L.Unlock()
283+
284+
// Update all the lastHandshake values here. That way we don't have to
285+
// worry about them being up-to-date when handling updates below, and it covers
286+
// all peers, not just the ones we got updates about.
287+
for_,lc:=rangec.peers {
288+
ifpeerStatus,ok:=status.Peer[lc.node.Key];ok {
289+
lc.lastHandshake=peerStatus.LastHandshake
290+
}
291+
}
292+
293+
for_,update:=rangeupdates {
294+
ifdirty:=c.updatePeerLocked(update,status);dirty {
295+
c.netmapDirty=true
296+
}
297+
}
298+
ifc.netmapDirty {
299+
c.Broadcast()
300+
}
301+
}
302+
303+
// status requests a status update from the engine.
304+
func (c*configMaps)status()*ipnstate.Status {
305+
sb:=&ipnstate.StatusBuilder{WantPeers:true}
306+
c.engine.UpdateStatus(sb)
307+
returnsb.Status()
308+
}
309+
310+
// updatePeerLocked processes a single update for a single peer. It is intended
311+
// as internal function since it returns whether or not the config is dirtied by
312+
// the update (instead of handling it directly like updatePeers). c.L must be held.
313+
func (c*configMaps)updatePeerLocked(update*proto.CoordinateResponse_PeerUpdate,status*ipnstate.Status) (dirtybool) {
314+
id,err:=uuid.FromBytes(update.Id)
315+
iferr!=nil {
316+
c.logger.Critical(context.Background(),"received update with bad id",slog.F("id",update.Id))
317+
returnfalse
318+
}
319+
logger:=c.logger.With(slog.F("peer_id",id))
320+
lc,ok:=c.peers[id]
321+
varnode*tailcfg.Node
322+
ifupdate.Kind==proto.CoordinateResponse_PeerUpdate_NODE {
323+
// If no preferred DERP is provided, we can't reach the node.
324+
ifupdate.Node.PreferredDerp==0 {
325+
logger.Warn(context.Background(),"no preferred DERP, peer update",slog.F("node_proto",update.Node))
326+
returnfalse
327+
}
328+
node,err=c.protoNodeToTailcfg(update.Node)
329+
iferr!=nil {
330+
logger.Critical(context.Background(),"failed to convert proto node to tailcfg",slog.F("node_proto",update.Node))
331+
returnfalse
332+
}
333+
logger=logger.With(slog.F("key_id",node.Key.ShortString()),slog.F("node",node))
334+
peerStatus,ok:=status.Peer[node.Key]
335+
// Starting KeepAlive messages at the initialization of a connection
336+
// causes a race condition. If we send the handshake before the peer has
337+
// our node, we'll have to wait for 5 seconds before trying again.
338+
// Ideally, the first handshake starts when the user first initiates a
339+
// connection to the peer. After a successful connection we enable
340+
// keep alives to persist the connection and keep it from becoming idle.
341+
// SSH connections don't send packets while idle, so we use keep alives
342+
// to avoid random hangs while we set up the connection again after
343+
// inactivity.
344+
node.KeepAlive=ok&&peerStatus.Active
345+
ifc.blockEndpoints {
346+
node.Endpoints=nil
347+
}
348+
}
349+
switch {
350+
case!ok&&update.Kind==proto.CoordinateResponse_PeerUpdate_NODE:
351+
// new!
352+
varlastHandshake time.Time
353+
ifps,ok:=status.Peer[node.Key];ok {
354+
lastHandshake=ps.LastHandshake
355+
}
356+
c.peers[id]=&peerLifecycle{
357+
peerID:id,
358+
node:node,
359+
lastHandshake:lastHandshake,
360+
lost:false,
361+
}
362+
logger.Debug(context.Background(),"adding new peer")
363+
returntrue
364+
caseok&&update.Kind==proto.CoordinateResponse_PeerUpdate_NODE:
365+
// update
366+
node.Created=lc.node.Created
367+
dirty=!lc.node.Equal(node)
368+
lc.node=node
369+
lc.lost=false
370+
lc.resetTimer()
371+
logger.Debug(context.Background(),"node update to existing peer",slog.F("dirty",dirty))
372+
returndirty
373+
case!ok:
374+
// disconnected or lost, but we don't have the node. No op
375+
logger.Debug(context.Background(),"skipping update for peer we don't recognize")
376+
returnfalse
377+
caseupdate.Kind==proto.CoordinateResponse_PeerUpdate_DISCONNECTED:
378+
lc.resetTimer()
379+
delete(c.peers,id)
380+
logger.Debug(context.Background(),"disconnected peer")
381+
returntrue
382+
caseupdate.Kind==proto.CoordinateResponse_PeerUpdate_LOST:
383+
lc.lost=true
384+
lc.setLostTimer(c)
385+
logger.Debug(context.Background(),"marked peer lost")
386+
// marking a node lost doesn't change anything right now, so dirty=false
387+
returnfalse
388+
default:
389+
logger.Warn(context.Background(),"unknown peer update",slog.F("kind",update.Kind))
390+
returnfalse
391+
}
392+
}
393+
394+
// peerLostTimeout is the callback that peerLifecycle uses when a peer is lost the timeout to
395+
// receive a handshake fires.
396+
func (c*configMaps)peerLostTimeout(id uuid.UUID) {
397+
logger:=c.logger.With(slog.F("peer_id",id))
398+
logger.Debug(context.Background(),
399+
"peer lost timeout")
400+
401+
// First do a status update to see if the peer did a handshake while we were
402+
// waiting
403+
status:=c.status()
404+
c.L.Lock()
405+
deferc.L.Unlock()
406+
407+
lc,ok:=c.peers[id]
408+
if!ok {
409+
logger.Debug(context.Background(),
410+
"timeout triggered for peer that is removed from the map")
411+
return
412+
}
413+
ifpeerStatus,ok:=status.Peer[lc.node.Key];ok {
414+
lc.lastHandshake=peerStatus.LastHandshake
415+
}
416+
logger=logger.With(slog.F("key_id",lc.node.Key.ShortString()))
417+
if!lc.lost {
418+
logger.Debug(context.Background(),
419+
"timeout triggered for peer that is no longer lost")
420+
return
421+
}
422+
since:=c.clock.Since(lc.lastHandshake)
423+
ifsince>=lostTimeout {
424+
logger.Info(
425+
context.Background(),"removing lost peer")
426+
delete(c.peers,id)
427+
c.netmapDirty=true
428+
c.Broadcast()
429+
return
430+
}
431+
logger.Debug(context.Background(),
432+
"timeout triggered for peer but it had handshake in meantime")
433+
lc.setLostTimer(c)
434+
}
435+
436+
func (c*configMaps)protoNodeToTailcfg(p*proto.Node) (*tailcfg.Node,error) {
437+
node,err:=ProtoToNode(p)
438+
iferr!=nil {
439+
returnnil,err
440+
}
441+
return&tailcfg.Node{
442+
ID:tailcfg.NodeID(p.GetId()),
443+
Created:c.clock.Now(),
444+
Key:node.Key,
445+
DiscoKey:node.DiscoKey,
446+
Addresses:node.Addresses,
447+
AllowedIPs:node.AllowedIPs,
448+
Endpoints:node.Endpoints,
449+
DERP:fmt.Sprintf("%s:%d",tailcfg.DerpMagicIP,node.PreferredDERP),
450+
Hostinfo: (&tailcfg.Hostinfo{}).View(),
451+
},nil
452+
}
453+
263454
typepeerLifecyclestruct {
264-
node*tailcfg.Node
265-
// TODO: implement timers to track lost peers
266-
// lastHandshake time.Time
267-
// timer time.Timer
455+
peerID uuid.UUID
456+
node*tailcfg.Node
457+
lostbool
458+
lastHandshake time.Time
459+
timer*clock.Timer
460+
}
461+
462+
func (l*peerLifecycle)resetTimer() {
463+
ifl.timer!=nil {
464+
l.timer.Stop()
465+
l.timer=nil
466+
}
467+
}
468+
469+
func (l*peerLifecycle)setLostTimer(c*configMaps) {
470+
ifl.timer!=nil {
471+
l.timer.Stop()
472+
}
473+
ttl:=lostTimeout-c.clock.Since(l.lastHandshake)
474+
ifttl<=0 {
475+
ttl=time.Nanosecond
476+
}
477+
l.timer=c.clock.AfterFunc(ttl,func() {
478+
c.peerLostTimeout(l.peerID)
479+
})
268480
}
269481

270482
// prefixesDifferent returns true if the two slices contain different prefixes

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp