@@ -3,11 +3,15 @@ package tailnet
33import (
44"context"
55"errors"
6+ "fmt"
67"net/netip"
78"sync"
9+ "time"
810
11+ "github.com/benbjohnson/clock"
912"github.com/google/uuid"
1013"go4.org/netipx"
14+ "tailscale.com/ipn/ipnstate"
1115"tailscale.com/net/dns"
1216"tailscale.com/tailcfg"
1317"tailscale.com/types/ipproto"
@@ -23,10 +27,13 @@ import (
2327"github.com/coder/coder/v2/tailnet/proto"
2428)
2529
30+ const lostTimeout = 15 * time .Minute
31+
2632// engineConfigurable is the subset of wgengine.Engine that we use for configuration.
2733//
2834// This allows us to test configuration code without faking the whole interface.
2935type engineConfigurable interface {
36+ UpdateStatus (* ipnstate.StatusBuilder )
3037SetNetworkMap (* netmap.NetworkMap )
3138Reconfig (* wgcfg.Config ,* router.Config ,* dns.Config ,* tailcfg.Debug )error
3239SetDERPMap (* tailcfg.DERPMap )
@@ -49,12 +56,16 @@ type configMaps struct {
4956closing bool
5057phase phase
5158
52- engine engineConfigurable
53- static netmap.NetworkMap
54- peers map [uuid.UUID ]* peerLifecycle
55- addresses []netip.Prefix
56- derpMap * proto.DERPMap
57- logger slog.Logger
59+ engine engineConfigurable
60+ static netmap.NetworkMap
61+ peers map [uuid.UUID ]* peerLifecycle
62+ addresses []netip.Prefix
63+ derpMap * proto.DERPMap
64+ logger slog.Logger
65+ blockEndpoints bool
66+
67+ // for testing
68+ clock clock.Clock
5869}
5970
6071func newConfigMaps (logger slog.Logger ,engine engineConfigurable ,nodeID tailcfg.NodeID ,nodeKey key.NodePrivate ,discoKey key.DiscoPublic ,addresses []netip.Prefix )* configMaps {
@@ -101,6 +112,7 @@ func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg
101112},
102113peers :make (map [uuid.UUID ]* peerLifecycle ),
103114addresses :addresses ,
115+ clock :clock .New (),
104116}
105117go c .configLoop ()
106118return c
@@ -165,6 +177,9 @@ func (c *configMaps) configLoop() {
165177func (c * configMaps )close () {
166178c .L .Lock ()
167179defer c .L .Unlock ()
180+ for _ ,lc := range c .peers {
181+ lc .resetTimer ()
182+ }
168183c .closing = true
169184c .Broadcast ()
170185for c .phase != closed {
@@ -260,11 +275,208 @@ func (c *configMaps) filterLocked() *filter.Filter {
260275)
261276}
262277
278+ // updatePeers handles protocol updates about peers from the coordinator. c.L MUST NOT be held.
279+ func (c * configMaps )updatePeers (updates []* proto.CoordinateResponse_PeerUpdate ) {
280+ status := c .status ()
281+ c .L .Lock ()
282+ defer c .L .Unlock ()
283+
284+ // Update all the lastHandshake values here. That way we don't have to
285+ // worry about them being up-to-date when handling updates below, and it covers
286+ // all peers, not just the ones we got updates about.
287+ for _ ,lc := range c .peers {
288+ if peerStatus ,ok := status .Peer [lc .node .Key ];ok {
289+ lc .lastHandshake = peerStatus .LastHandshake
290+ }
291+ }
292+
293+ for _ ,update := range updates {
294+ if dirty := c .updatePeerLocked (update ,status );dirty {
295+ c .netmapDirty = true
296+ }
297+ }
298+ if c .netmapDirty {
299+ c .Broadcast ()
300+ }
301+ }
302+
303+ // status requests a status update from the engine.
304+ func (c * configMaps )status ()* ipnstate.Status {
305+ sb := & ipnstate.StatusBuilder {WantPeers :true }
306+ c .engine .UpdateStatus (sb )
307+ return sb .Status ()
308+ }
309+
310+ // updatePeerLocked processes a single update for a single peer. It is intended
311+ // as internal function since it returns whether or not the config is dirtied by
312+ // the update (instead of handling it directly like updatePeers). c.L must be held.
313+ func (c * configMaps )updatePeerLocked (update * proto.CoordinateResponse_PeerUpdate ,status * ipnstate.Status ) (dirty bool ) {
314+ id ,err := uuid .FromBytes (update .Id )
315+ if err != nil {
316+ c .logger .Critical (context .Background (),"received update with bad id" ,slog .F ("id" ,update .Id ))
317+ return false
318+ }
319+ logger := c .logger .With (slog .F ("peer_id" ,id ))
320+ lc ,ok := c .peers [id ]
321+ var node * tailcfg.Node
322+ if update .Kind == proto .CoordinateResponse_PeerUpdate_NODE {
323+ // If no preferred DERP is provided, we can't reach the node.
324+ if update .Node .PreferredDerp == 0 {
325+ logger .Warn (context .Background (),"no preferred DERP, peer update" ,slog .F ("node_proto" ,update .Node ))
326+ return false
327+ }
328+ node ,err = c .protoNodeToTailcfg (update .Node )
329+ if err != nil {
330+ logger .Critical (context .Background (),"failed to convert proto node to tailcfg" ,slog .F ("node_proto" ,update .Node ))
331+ return false
332+ }
333+ logger = logger .With (slog .F ("key_id" ,node .Key .ShortString ()),slog .F ("node" ,node ))
334+ peerStatus ,ok := status .Peer [node .Key ]
335+ // Starting KeepAlive messages at the initialization of a connection
336+ // causes a race condition. If we send the handshake before the peer has
337+ // our node, we'll have to wait for 5 seconds before trying again.
338+ // Ideally, the first handshake starts when the user first initiates a
339+ // connection to the peer. After a successful connection we enable
340+ // keep alives to persist the connection and keep it from becoming idle.
341+ // SSH connections don't send packets while idle, so we use keep alives
342+ // to avoid random hangs while we set up the connection again after
343+ // inactivity.
344+ node .KeepAlive = ok && peerStatus .Active
345+ if c .blockEndpoints {
346+ node .Endpoints = nil
347+ }
348+ }
349+ switch {
350+ case ! ok && update .Kind == proto .CoordinateResponse_PeerUpdate_NODE :
351+ // new!
352+ var lastHandshake time.Time
353+ if ps ,ok := status .Peer [node .Key ];ok {
354+ lastHandshake = ps .LastHandshake
355+ }
356+ c .peers [id ]= & peerLifecycle {
357+ peerID :id ,
358+ node :node ,
359+ lastHandshake :lastHandshake ,
360+ lost :false ,
361+ }
362+ logger .Debug (context .Background (),"adding new peer" )
363+ return true
364+ case ok && update .Kind == proto .CoordinateResponse_PeerUpdate_NODE :
365+ // update
366+ node .Created = lc .node .Created
367+ dirty = ! lc .node .Equal (node )
368+ lc .node = node
369+ lc .lost = false
370+ lc .resetTimer ()
371+ logger .Debug (context .Background (),"node update to existing peer" ,slog .F ("dirty" ,dirty ))
372+ return dirty
373+ case ! ok :
374+ // disconnected or lost, but we don't have the node. No op
375+ logger .Debug (context .Background (),"skipping update for peer we don't recognize" )
376+ return false
377+ case update .Kind == proto .CoordinateResponse_PeerUpdate_DISCONNECTED :
378+ lc .resetTimer ()
379+ delete (c .peers ,id )
380+ logger .Debug (context .Background (),"disconnected peer" )
381+ return true
382+ case update .Kind == proto .CoordinateResponse_PeerUpdate_LOST :
383+ lc .lost = true
384+ lc .setLostTimer (c )
385+ logger .Debug (context .Background (),"marked peer lost" )
386+ // marking a node lost doesn't change anything right now, so dirty=false
387+ return false
388+ default :
389+ logger .Warn (context .Background (),"unknown peer update" ,slog .F ("kind" ,update .Kind ))
390+ return false
391+ }
392+ }
393+
394+ // peerLostTimeout is the callback that peerLifecycle uses when a peer is lost the timeout to
395+ // receive a handshake fires.
396+ func (c * configMaps )peerLostTimeout (id uuid.UUID ) {
397+ logger := c .logger .With (slog .F ("peer_id" ,id ))
398+ logger .Debug (context .Background (),
399+ "peer lost timeout" )
400+
401+ // First do a status update to see if the peer did a handshake while we were
402+ // waiting
403+ status := c .status ()
404+ c .L .Lock ()
405+ defer c .L .Unlock ()
406+
407+ lc ,ok := c .peers [id ]
408+ if ! ok {
409+ logger .Debug (context .Background (),
410+ "timeout triggered for peer that is removed from the map" )
411+ return
412+ }
413+ if peerStatus ,ok := status .Peer [lc .node .Key ];ok {
414+ lc .lastHandshake = peerStatus .LastHandshake
415+ }
416+ logger = logger .With (slog .F ("key_id" ,lc .node .Key .ShortString ()))
417+ if ! lc .lost {
418+ logger .Debug (context .Background (),
419+ "timeout triggered for peer that is no longer lost" )
420+ return
421+ }
422+ since := c .clock .Since (lc .lastHandshake )
423+ if since >= lostTimeout {
424+ logger .Info (
425+ context .Background (),"removing lost peer" )
426+ delete (c .peers ,id )
427+ c .netmapDirty = true
428+ c .Broadcast ()
429+ return
430+ }
431+ logger .Debug (context .Background (),
432+ "timeout triggered for peer but it had handshake in meantime" )
433+ lc .setLostTimer (c )
434+ }
435+
436+ func (c * configMaps )protoNodeToTailcfg (p * proto.Node ) (* tailcfg.Node ,error ) {
437+ node ,err := ProtoToNode (p )
438+ if err != nil {
439+ return nil ,err
440+ }
441+ return & tailcfg.Node {
442+ ID :tailcfg .NodeID (p .GetId ()),
443+ Created :c .clock .Now (),
444+ Key :node .Key ,
445+ DiscoKey :node .DiscoKey ,
446+ Addresses :node .Addresses ,
447+ AllowedIPs :node .AllowedIPs ,
448+ Endpoints :node .Endpoints ,
449+ DERP :fmt .Sprintf ("%s:%d" ,tailcfg .DerpMagicIP ,node .PreferredDERP ),
450+ Hostinfo : (& tailcfg.Hostinfo {}).View (),
451+ },nil
452+ }
453+
263454type peerLifecycle struct {
264- node * tailcfg.Node
265- // TODO: implement timers to track lost peers
266- // lastHandshake time.Time
267- // timer time.Timer
455+ peerID uuid.UUID
456+ node * tailcfg.Node
457+ lost bool
458+ lastHandshake time.Time
459+ timer * clock.Timer
460+ }
461+
462+ func (l * peerLifecycle )resetTimer () {
463+ if l .timer != nil {
464+ l .timer .Stop ()
465+ l .timer = nil
466+ }
467+ }
468+
469+ func (l * peerLifecycle )setLostTimer (c * configMaps ) {
470+ if l .timer != nil {
471+ l .timer .Stop ()
472+ }
473+ ttl := lostTimeout - c .clock .Since (l .lastHandshake )
474+ if ttl <= 0 {
475+ ttl = time .Nanosecond
476+ }
477+ l .timer = c .clock .AfterFunc (ttl ,func () {
478+ c .peerLostTimeout (l .peerID )
479+ })
268480}
269481
270482// prefixesDifferent returns true if the two slices contain different prefixes