Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3844f8a

Browse files
committed
chore: add support for peer updates to tailnet.configMaps
1 parent6f37b9b commit3844f8a

File tree

4 files changed

+719
-21
lines changed

4 files changed

+719
-21
lines changed

‎go.mod‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ require (
206206

207207
requirego.uber.org/mockv0.4.0
208208

209+
requiregithub.com/benbjohnson/clockv1.3.5// indirect
210+
209211
require (
210212
cloud.google.com/go/computev1.23.3// indirect
211213
cloud.google.com/go/loggingv1.8.1// indirect

‎go.sum‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiE
123123
github.com/aymanbagabas/go-osc52/v2v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8=
124124
github.com/aymerick/douceurv0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk=
125125
github.com/aymerick/douceurv0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4=
126+
github.com/benbjohnson/clockv1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
127+
github.com/benbjohnson/clockv1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
126128
github.com/beorn7/perksv1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
127129
github.com/beorn7/perksv1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
128130
github.com/bep/godartsassv1.2.0 h1:E2VvQrxAHAFwbjyOIExAMmogTItSKodoKuijNrGm5yU=

‎tailnet/configmaps.go‎

Lines changed: 215 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@ package tailnet
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"net/netip"
78
"sync"
9+
"time"
810

11+
"github.com/benbjohnson/clock"
912
"github.com/google/uuid"
1013
"go4.org/netipx"
14+
"tailscale.com/ipn/ipnstate"
1115
"tailscale.com/net/dns"
1216
"tailscale.com/tailcfg"
1317
"tailscale.com/types/ipproto"
@@ -23,10 +27,13 @@ import (
2327
"github.com/coder/coder/v2/tailnet/proto"
2428
)
2529

30+
constlostTimeout=15*time.Minute
31+
2632
// engineConfigurable is the subset of wgengine.Engine that we use for configuration.
2733
//
2834
// This allows us to test configuration code without faking the whole interface.
2935
typeengineConfigurableinterface {
36+
UpdateStatus(*ipnstate.StatusBuilder)
3037
SetNetworkMap(*netmap.NetworkMap)
3138
Reconfig(*wgcfg.Config,*router.Config,*dns.Config,*tailcfg.Debug)error
3239
SetDERPMap(*tailcfg.DERPMap)
@@ -49,12 +56,16 @@ type configMaps struct {
4956
closingbool
5057
phasephase
5158

52-
engineengineConfigurable
53-
static netmap.NetworkMap
54-
peersmap[uuid.UUID]*peerLifecycle
55-
addresses []netip.Prefix
56-
derpMap*proto.DERPMap
57-
logger slog.Logger
59+
engineengineConfigurable
60+
static netmap.NetworkMap
61+
peersmap[uuid.UUID]*peerLifecycle
62+
addresses []netip.Prefix
63+
derpMap*proto.DERPMap
64+
logger slog.Logger
65+
blockEndpointsbool
66+
67+
// for testing
68+
clock clock.Clock
5869
}
5970

6071
funcnewConfigMaps(logger slog.Logger,engineengineConfigurable,nodeID tailcfg.NodeID,nodeKey key.NodePrivate,discoKey key.DiscoPublic,addresses []netip.Prefix)*configMaps {
@@ -101,6 +112,7 @@ func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg
101112
},
102113
peers:make(map[uuid.UUID]*peerLifecycle),
103114
addresses:addresses,
115+
clock:clock.New(),
104116
}
105117
goc.configLoop()
106118
returnc
@@ -165,6 +177,9 @@ func (c *configMaps) configLoop() {
165177
func (c*configMaps)close() {
166178
c.L.Lock()
167179
deferc.L.Unlock()
180+
for_,lc:=rangec.peers {
181+
lc.resetTimer()
182+
}
168183
c.closing=true
169184
c.Broadcast()
170185
forc.phase!=closed {
@@ -248,11 +263,201 @@ func (c *configMaps) filterLocked() *filter.Filter {
248263
)
249264
}
250265

266+
func (c*configMaps)updatePeers(updates []*proto.CoordinateResponse_PeerUpdate) {
267+
status:=c.status()
268+
c.L.Lock()
269+
deferc.L.Unlock()
270+
271+
// Update all the lastHandshake values here. That way we don't have to
272+
// worry about them being up-to-date when handling updates below, and it covers
273+
// all peers, not just the ones we got updates about.
274+
for_,lc:=rangec.peers {
275+
ifpeerStatus,ok:=status.Peer[lc.node.Key];ok {
276+
lc.lastHandshake=peerStatus.LastHandshake
277+
}
278+
}
279+
280+
for_,update:=rangeupdates {
281+
ifdirty:=c.updatePeerLocked(update,status);dirty {
282+
c.netmapDirty=true
283+
}
284+
}
285+
ifc.netmapDirty {
286+
c.Broadcast()
287+
}
288+
}
289+
290+
func (c*configMaps)status()*ipnstate.Status {
291+
sb:=&ipnstate.StatusBuilder{WantPeers:true}
292+
c.engine.UpdateStatus(sb)
293+
returnsb.Status()
294+
}
295+
296+
func (c*configMaps)updatePeerLocked(update*proto.CoordinateResponse_PeerUpdate,status*ipnstate.Status) (dirtybool) {
297+
id,err:=uuid.FromBytes(update.Id)
298+
iferr!=nil {
299+
c.logger.Critical(context.Background(),"received update with bad id",slog.F("id",update.Id))
300+
returnfalse
301+
}
302+
logger:=c.logger.With(slog.F("peer_id",id))
303+
lc,ok:=c.peers[id]
304+
varnode*tailcfg.Node
305+
ifupdate.Kind==proto.CoordinateResponse_PeerUpdate_NODE {
306+
// If no preferred DERP is provided, we can't reach the node.
307+
ifupdate.Node.PreferredDerp==0 {
308+
logger.Warn(context.Background(),"no preferred DERP, peer update",slog.F("node_proto",update.Node))
309+
returnfalse
310+
}
311+
node,err=c.protoNodeToTailcfg(update.Node)
312+
iferr!=nil {
313+
logger.Critical(context.Background(),"failed to convert proto node to tailcfg",slog.F("node_proto",update.Node))
314+
returnfalse
315+
}
316+
logger=logger.With(slog.F("key_id",node.Key.ShortString()),slog.F("node",node))
317+
peerStatus,ok:=status.Peer[node.Key]
318+
// Starting KeepAlive messages at the initialization of a connection
319+
// causes a race condition. If we send the handshake before the peer has
320+
// our node, we'll have to wait for 5 seconds before trying again.
321+
// Ideally, the first handshake starts when the user first initiates a
322+
// connection to the peer. After a successful connection we enable
323+
// keep alives to persist the connection and keep it from becoming idle.
324+
// SSH connections don't send packets while idle, so we use keep alives
325+
// to avoid random hangs while we set up the connection again after
326+
// inactivity.
327+
node.KeepAlive=ok&&peerStatus.Active
328+
ifc.blockEndpoints {
329+
node.Endpoints=nil
330+
}
331+
}
332+
switch {
333+
case!ok&&update.Kind==proto.CoordinateResponse_PeerUpdate_NODE:
334+
// new!
335+
varlastHandshake time.Time
336+
ifps,ok:=status.Peer[node.Key];ok {
337+
lastHandshake=ps.LastHandshake
338+
}
339+
c.peers[id]=&peerLifecycle{
340+
peerID:id,
341+
node:node,
342+
lastHandshake:lastHandshake,
343+
lost:false,
344+
}
345+
logger.Debug(context.Background(),"adding new peer")
346+
returntrue
347+
caseok&&update.Kind==proto.CoordinateResponse_PeerUpdate_NODE:
348+
// update
349+
node.Created=lc.node.Created
350+
dirty=!lc.node.Equal(node)
351+
lc.node=node
352+
lc.lost=false
353+
lc.resetTimer()
354+
logger.Debug(context.Background(),"node update to existing peer",slog.F("dirty",dirty))
355+
returndirty
356+
case!ok:
357+
// disconnected or lost, but we don't have the node. No op
358+
logger.Debug(context.Background(),"skipping update for peer we don't recognize")
359+
returnfalse
360+
caseupdate.Kind==proto.CoordinateResponse_PeerUpdate_DISCONNECTED:
361+
lc.resetTimer()
362+
delete(c.peers,id)
363+
logger.Debug(context.Background(),"disconnected peer")
364+
returntrue
365+
caseupdate.Kind==proto.CoordinateResponse_PeerUpdate_LOST:
366+
lc.lost=true
367+
lc.setLostTimer(c)
368+
logger.Debug(context.Background(),"marked peer lost")
369+
// marking a node lost doesn't change anything right now, so dirty=false
370+
returnfalse
371+
default:
372+
logger.Warn(context.Background(),"unknown peer update",slog.F("kind",update.Kind))
373+
returnfalse
374+
}
375+
}
376+
377+
func (c*configMaps)peerLostTimeout(id uuid.UUID) {
378+
logger:=c.logger.With(slog.F("peer_id",id))
379+
logger.Debug(context.Background(),
380+
"peer lost timeout")
381+
382+
// First do a status update to see if the peer did a handshake while we were
383+
// waiting
384+
status:=c.status()
385+
c.L.Lock()
386+
deferc.L.Unlock()
387+
388+
lc,ok:=c.peers[id]
389+
if!ok {
390+
logger.Debug(context.Background(),
391+
"timeout triggered for peer that is removed from the map")
392+
return
393+
}
394+
ifpeerStatus,ok:=status.Peer[lc.node.Key];ok {
395+
lc.lastHandshake=peerStatus.LastHandshake
396+
}
397+
logger=logger.With(slog.F("key_id",lc.node.Key.ShortString()))
398+
if!lc.lost {
399+
logger.Debug(context.Background(),
400+
"timeout triggered for peer that is no longer lost")
401+
return
402+
}
403+
since:=c.clock.Since(lc.lastHandshake)
404+
ifsince>=lostTimeout {
405+
logger.Info(
406+
context.Background(),"removing lost peer")
407+
delete(c.peers,id)
408+
c.netmapDirty=true
409+
c.Broadcast()
410+
return
411+
}
412+
logger.Debug(context.Background(),
413+
"timeout triggered for peer but it had handshake in meantime")
414+
lc.setLostTimer(c)
415+
}
416+
417+
func (c*configMaps)protoNodeToTailcfg(p*proto.Node) (*tailcfg.Node,error) {
418+
node,err:=ProtoToNode(p)
419+
iferr!=nil {
420+
returnnil,err
421+
}
422+
return&tailcfg.Node{
423+
ID:tailcfg.NodeID(p.GetId()),
424+
Created:c.clock.Now(),
425+
Key:node.Key,
426+
DiscoKey:node.DiscoKey,
427+
Addresses:node.Addresses,
428+
AllowedIPs:node.AllowedIPs,
429+
Endpoints:node.Endpoints,
430+
DERP:fmt.Sprintf("%s:%d",tailcfg.DerpMagicIP,node.PreferredDERP),
431+
Hostinfo: (&tailcfg.Hostinfo{}).View(),
432+
},nil
433+
}
434+
251435
typepeerLifecyclestruct {
252-
node*tailcfg.Node
253-
// TODO: implement timers to track lost peers
254-
// lastHandshake time.Time
255-
// timer time.Timer
436+
peerID uuid.UUID
437+
node*tailcfg.Node
438+
lostbool
439+
lastHandshake time.Time
440+
timer*clock.Timer
441+
}
442+
443+
func (l*peerLifecycle)resetTimer() {
444+
ifl.timer!=nil {
445+
l.timer.Stop()
446+
l.timer=nil
447+
}
448+
}
449+
450+
func (l*peerLifecycle)setLostTimer(c*configMaps) {
451+
ifl.timer!=nil {
452+
l.timer.Stop()
453+
}
454+
ttl:=lostTimeout-c.clock.Since(l.lastHandshake)
455+
ifttl<=0 {
456+
ttl=time.Nanosecond
457+
}
458+
l.timer=c.clock.AfterFunc(ttl,func() {
459+
c.peerLostTimeout(l.peerID)
460+
})
256461
}
257462

258463
// prefixesDifferent returns true if the two slices contain different prefixes

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp