- Notifications
You must be signed in to change notification settings - Fork914
chore: add support for peer updates to tailnet.configMaps#11487
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -3,11 +3,15 @@ package tailnet | ||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"net/netip" | ||
"sync" | ||
"time" | ||
"github.com/benbjohnson/clock" | ||
"github.com/google/uuid" | ||
"go4.org/netipx" | ||
"tailscale.com/ipn/ipnstate" | ||
"tailscale.com/net/dns" | ||
"tailscale.com/tailcfg" | ||
"tailscale.com/types/ipproto" | ||
@@ -23,10 +27,13 @@ import ( | ||
"github.com/coder/coder/v2/tailnet/proto" | ||
) | ||
const lostTimeout = 15 * time.Minute | ||
// engineConfigurable is the subset of wgengine.Engine that we use for configuration. | ||
// | ||
// This allows us to test configuration code without faking the whole interface. | ||
type engineConfigurable interface { | ||
UpdateStatus(*ipnstate.StatusBuilder) | ||
SetNetworkMap(*netmap.NetworkMap) | ||
Reconfig(*wgcfg.Config, *router.Config, *dns.Config, *tailcfg.Debug) error | ||
SetDERPMap(*tailcfg.DERPMap) | ||
@@ -49,12 +56,16 @@ type configMaps struct { | ||
closing bool | ||
phase phase | ||
engine engineConfigurable | ||
static netmap.NetworkMap | ||
peers map[uuid.UUID]*peerLifecycle | ||
addresses []netip.Prefix | ||
derpMap *proto.DERPMap | ||
logger slog.Logger | ||
blockEndpoints bool | ||
// for testing | ||
clock clock.Clock | ||
} | ||
func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg.NodeID, nodeKey key.NodePrivate, discoKey key.DiscoPublic, addresses []netip.Prefix) *configMaps { | ||
@@ -101,6 +112,7 @@ func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg | ||
}, | ||
peers: make(map[uuid.UUID]*peerLifecycle), | ||
addresses: addresses, | ||
clock: clock.New(), | ||
} | ||
go c.configLoop() | ||
return c | ||
@@ -165,6 +177,9 @@ func (c *configMaps) configLoop() { | ||
func (c *configMaps) close() { | ||
c.L.Lock() | ||
defer c.L.Unlock() | ||
for _, lc := range c.peers { | ||
lc.resetTimer() | ||
} | ||
c.closing = true | ||
c.Broadcast() | ||
for c.phase != closed { | ||
@@ -260,11 +275,208 @@ func (c *configMaps) filterLocked() *filter.Filter { | ||
) | ||
} | ||
// updatePeers handles protocol updates about peers from the coordinator. c.L MUST NOT be held. | ||
func (c *configMaps) updatePeers(updates []*proto.CoordinateResponse_PeerUpdate) { | ||
status := c.status() | ||
c.L.Lock() | ||
defer c.L.Unlock() | ||
// Update all the lastHandshake values here. That way we don't have to | ||
// worry about them being up-to-date when handling updates below, and it covers | ||
// all peers, not just the ones we got updates about. | ||
for _, lc := range c.peers { | ||
if peerStatus, ok := status.Peer[lc.node.Key]; ok { | ||
lc.lastHandshake = peerStatus.LastHandshake | ||
} | ||
} | ||
for _, update := range updates { | ||
if dirty := c.updatePeerLocked(update, status); dirty { | ||
c.netmapDirty = true | ||
} | ||
} | ||
if c.netmapDirty { | ||
c.Broadcast() | ||
} | ||
} | ||
// status requests a status update from the engine. | ||
func (c *configMaps) status() *ipnstate.Status { | ||
sb := &ipnstate.StatusBuilder{WantPeers: true} | ||
c.engine.UpdateStatus(sb) | ||
return sb.Status() | ||
} | ||
// updatePeerLocked processes a single update for a single peer. It is intended | ||
// as internal function since it returns whether or not the config is dirtied by | ||
// the update (instead of handling it directly like updatePeers). c.L must be held. | ||
func (c *configMaps) updatePeerLocked(update *proto.CoordinateResponse_PeerUpdate, status *ipnstate.Status) (dirty bool) { | ||
id, err := uuid.FromBytes(update.Id) | ||
if err != nil { | ||
c.logger.Critical(context.Background(), "received update with bad id", slog.F("id", update.Id)) | ||
return false | ||
} | ||
logger := c.logger.With(slog.F("peer_id", id)) | ||
lc, ok := c.peers[id] | ||
var node *tailcfg.Node | ||
if update.Kind == proto.CoordinateResponse_PeerUpdate_NODE { | ||
// If no preferred DERP is provided, we can't reach the node. | ||
if update.Node.PreferredDerp == 0 { | ||
logger.Warn(context.Background(), "no preferred DERP, peer update", slog.F("node_proto", update.Node)) | ||
return false | ||
} | ||
node, err = c.protoNodeToTailcfg(update.Node) | ||
if err != nil { | ||
logger.Critical(context.Background(), "failed to convert proto node to tailcfg", slog.F("node_proto", update.Node)) | ||
return false | ||
} | ||
logger = logger.With(slog.F("key_id", node.Key.ShortString()), slog.F("node", node)) | ||
peerStatus, ok := status.Peer[node.Key] | ||
// Starting KeepAlive messages at the initialization of a connection | ||
// causes a race condition. If we send the handshake before the peer has | ||
// our node, we'll have to wait for 5 seconds before trying again. | ||
// Ideally, the first handshake starts when the user first initiates a | ||
// connection to the peer. After a successful connection we enable | ||
// keep alives to persist the connection and keep it from becoming idle. | ||
// SSH connections don't send packets while idle, so we use keep alives | ||
// to avoid random hangs while we set up the connection again after | ||
// inactivity. | ||
node.KeepAlive = ok && peerStatus.Active | ||
if c.blockEndpoints { | ||
node.Endpoints = nil | ||
} | ||
} | ||
switch { | ||
case !ok && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE: | ||
// new! | ||
var lastHandshake time.Time | ||
if ps, ok := status.Peer[node.Key]; ok { | ||
lastHandshake = ps.LastHandshake | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Is there a situation a peer would be missing from the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Yeah, immediately after receiving a DISCONNECTED update, we remove it from configMaps. Then until the next update it'll be in the engine but not configMaps. Probably an edge case for us to DISCONNECT and then immediately get a new NODE update, though. | ||
c.peers[id] = &peerLifecycle{ | ||
peerID: id, | ||
node: node, | ||
lastHandshake: lastHandshake, | ||
lost: false, | ||
} | ||
logger.Debug(context.Background(), "adding new peer") | ||
return true | ||
case ok && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE: | ||
// update | ||
node.Created = lc.node.Created | ||
dirty = !lc.node.Equal(node) | ||
lc.node = node | ||
lc.lost = false | ||
lc.resetTimer() | ||
logger.Debug(context.Background(), "node update to existing peer", slog.F("dirty", dirty)) | ||
return dirty | ||
case !ok: | ||
// disconnected or lost, but we don't have the node. No op | ||
logger.Debug(context.Background(), "skipping update for peer we don't recognize") | ||
return false | ||
case update.Kind == proto.CoordinateResponse_PeerUpdate_DISCONNECTED: | ||
lc.resetTimer() | ||
delete(c.peers, id) | ||
logger.Debug(context.Background(), "disconnected peer") | ||
return true | ||
case update.Kind == proto.CoordinateResponse_PeerUpdate_LOST: | ||
lc.lost = true | ||
lc.setLostTimer(c) | ||
logger.Debug(context.Background(), "marked peer lost") | ||
// marking a node lost doesn't change anything right now, so dirty=false | ||
return false | ||
default: | ||
logger.Warn(context.Background(), "unknown peer update", slog.F("kind", update.Kind)) | ||
return false | ||
} | ||
} | ||
// peerLostTimeout is the callback that peerLifecycle uses when a peer is lost the timeout to | ||
// receive a handshake fires. | ||
func (c *configMaps) peerLostTimeout(id uuid.UUID) { | ||
logger := c.logger.With(slog.F("peer_id", id)) | ||
logger.Debug(context.Background(), | ||
"peer lost timeout") | ||
// First do a status update to see if the peer did a handshake while we were | ||
// waiting | ||
status := c.status() | ||
c.L.Lock() | ||
defer c.L.Unlock() | ||
lc, ok := c.peers[id] | ||
if !ok { | ||
logger.Debug(context.Background(), | ||
"timeout triggered for peer that is removed from the map") | ||
return | ||
} | ||
if peerStatus, ok := status.Peer[lc.node.Key]; ok { | ||
lc.lastHandshake = peerStatus.LastHandshake | ||
} | ||
logger = logger.With(slog.F("key_id", lc.node.Key.ShortString())) | ||
if !lc.lost { | ||
logger.Debug(context.Background(), | ||
"timeout triggered for peer that is no longer lost") | ||
return | ||
} | ||
since := c.clock.Since(lc.lastHandshake) | ||
if since >= lostTimeout { | ||
logger.Info( | ||
context.Background(), "removing lost peer") | ||
delete(c.peers, id) | ||
c.netmapDirty = true | ||
c.Broadcast() | ||
return | ||
} | ||
logger.Debug(context.Background(), | ||
"timeout triggered for peer but it had handshake in meantime") | ||
lc.setLostTimer(c) | ||
} | ||
func (c *configMaps) protoNodeToTailcfg(p *proto.Node) (*tailcfg.Node, error) { | ||
node, err := ProtoToNode(p) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &tailcfg.Node{ | ||
ID: tailcfg.NodeID(p.GetId()), | ||
Created: c.clock.Now(), | ||
Key: node.Key, | ||
DiscoKey: node.DiscoKey, | ||
Addresses: node.Addresses, | ||
AllowedIPs: node.AllowedIPs, | ||
Endpoints: node.Endpoints, | ||
DERP: fmt.Sprintf("%s:%d", tailcfg.DerpMagicIP, node.PreferredDERP), | ||
Hostinfo: (&tailcfg.Hostinfo{}).View(), | ||
}, nil | ||
} | ||
type peerLifecycle struct { | ||
peerID uuid.UUID | ||
node *tailcfg.Node | ||
lost bool | ||
lastHandshake time.Time | ||
timer *clock.Timer | ||
} | ||
func (l *peerLifecycle) resetTimer() { | ||
if l.timer != nil { | ||
l.timer.Stop() | ||
l.timer = nil | ||
} | ||
} | ||
func (l *peerLifecycle) setLostTimer(c *configMaps) { | ||
if l.timer != nil { | ||
l.timer.Stop() | ||
} | ||
ttl := lostTimeout - c.clock.Since(l.lastHandshake) | ||
if ttl <= 0 { | ||
ttl = time.Nanosecond | ||
} | ||
l.timer = c.clock.AfterFunc(ttl, func() { | ||
c.peerLostTimeout(l.peerID) | ||
}) | ||
} | ||
// prefixesDifferent returns true if the two slices contain different prefixes | ||
Uh oh!
There was an error while loading.Please reload this page.