Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

chore: add support for peer updates to tailnet.configMaps#11487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
spikecurtis merged 1 commit intomainfromspike/10533-configmaps-peerupdates
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletionsgo.mod
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -206,6 +206,8 @@ require (

require go.uber.org/mock v0.4.0

require github.com/benbjohnson/clock v1.3.5 // indirect

require (
cloud.google.com/go/compute v1.23.3 // indirect
cloud.google.com/go/logging v1.8.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletionsgo.sum
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -123,6 +123,8 @@ github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiE
github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8=
github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk=
github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4=
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bep/godartsass v1.2.0 h1:E2VvQrxAHAFwbjyOIExAMmogTItSKodoKuijNrGm5yU=
Expand Down
232 changes: 222 additions & 10 deletionstailnet/configmaps.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -3,11 +3,15 @@ package tailnet
import (
"context"
"errors"
"fmt"
"net/netip"
"sync"
"time"

"github.com/benbjohnson/clock"
"github.com/google/uuid"
"go4.org/netipx"
"tailscale.com/ipn/ipnstate"
"tailscale.com/net/dns"
"tailscale.com/tailcfg"
"tailscale.com/types/ipproto"
Expand All@@ -23,10 +27,13 @@ import (
"github.com/coder/coder/v2/tailnet/proto"
)

const lostTimeout = 15 * time.Minute

// engineConfigurable is the subset of wgengine.Engine that we use for configuration.
//
// This allows us to test configuration code without faking the whole interface.
type engineConfigurable interface {
UpdateStatus(*ipnstate.StatusBuilder)
SetNetworkMap(*netmap.NetworkMap)
Reconfig(*wgcfg.Config, *router.Config, *dns.Config, *tailcfg.Debug) error
SetDERPMap(*tailcfg.DERPMap)
Expand All@@ -49,12 +56,16 @@ type configMaps struct {
closing bool
phase phase

engine engineConfigurable
static netmap.NetworkMap
peers map[uuid.UUID]*peerLifecycle
addresses []netip.Prefix
derpMap *proto.DERPMap
logger slog.Logger
engine engineConfigurable
static netmap.NetworkMap
peers map[uuid.UUID]*peerLifecycle
addresses []netip.Prefix
derpMap *proto.DERPMap
logger slog.Logger
blockEndpoints bool

// for testing
clock clock.Clock
}

func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg.NodeID, nodeKey key.NodePrivate, discoKey key.DiscoPublic, addresses []netip.Prefix) *configMaps {
Expand DownExpand Up@@ -101,6 +112,7 @@ func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg
},
peers: make(map[uuid.UUID]*peerLifecycle),
addresses: addresses,
clock: clock.New(),
}
go c.configLoop()
return c
Expand DownExpand Up@@ -165,6 +177,9 @@ func (c *configMaps) configLoop() {
func (c *configMaps) close() {
c.L.Lock()
defer c.L.Unlock()
for _, lc := range c.peers {
lc.resetTimer()
}
c.closing = true
c.Broadcast()
for c.phase != closed {
Expand DownExpand Up@@ -260,11 +275,208 @@ func (c *configMaps) filterLocked() *filter.Filter {
)
}

// updatePeers handles protocol updates about peers from the coordinator. c.L MUST NOT be held.
func (c *configMaps) updatePeers(updates []*proto.CoordinateResponse_PeerUpdate) {
status := c.status()
c.L.Lock()
defer c.L.Unlock()

// Update all the lastHandshake values here. That way we don't have to
// worry about them being up-to-date when handling updates below, and it covers
// all peers, not just the ones we got updates about.
for _, lc := range c.peers {
if peerStatus, ok := status.Peer[lc.node.Key]; ok {
lc.lastHandshake = peerStatus.LastHandshake
}
}

for _, update := range updates {
if dirty := c.updatePeerLocked(update, status); dirty {
c.netmapDirty = true
}
}
if c.netmapDirty {
c.Broadcast()
}
}

// status requests a status update from the engine.
func (c *configMaps) status() *ipnstate.Status {
sb := &ipnstate.StatusBuilder{WantPeers: true}
c.engine.UpdateStatus(sb)
return sb.Status()
}

// updatePeerLocked processes a single update for a single peer. It is intended
// as internal function since it returns whether or not the config is dirtied by
// the update (instead of handling it directly like updatePeers). c.L must be held.
func (c *configMaps) updatePeerLocked(update *proto.CoordinateResponse_PeerUpdate, status *ipnstate.Status) (dirty bool) {
id, err := uuid.FromBytes(update.Id)
if err != nil {
c.logger.Critical(context.Background(), "received update with bad id", slog.F("id", update.Id))
return false
}
logger := c.logger.With(slog.F("peer_id", id))
lc, ok := c.peers[id]
var node *tailcfg.Node
if update.Kind == proto.CoordinateResponse_PeerUpdate_NODE {
// If no preferred DERP is provided, we can't reach the node.
if update.Node.PreferredDerp == 0 {
logger.Warn(context.Background(), "no preferred DERP, peer update", slog.F("node_proto", update.Node))
return false
}
node, err = c.protoNodeToTailcfg(update.Node)
if err != nil {
logger.Critical(context.Background(), "failed to convert proto node to tailcfg", slog.F("node_proto", update.Node))
return false
}
logger = logger.With(slog.F("key_id", node.Key.ShortString()), slog.F("node", node))
peerStatus, ok := status.Peer[node.Key]
// Starting KeepAlive messages at the initialization of a connection
// causes a race condition. If we send the handshake before the peer has
// our node, we'll have to wait for 5 seconds before trying again.
// Ideally, the first handshake starts when the user first initiates a
// connection to the peer. After a successful connection we enable
// keep alives to persist the connection and keep it from becoming idle.
// SSH connections don't send packets while idle, so we use keep alives
// to avoid random hangs while we set up the connection again after
// inactivity.
node.KeepAlive = ok && peerStatus.Active
if c.blockEndpoints {
node.Endpoints = nil
}
}
switch {
case !ok && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE:
// new!
var lastHandshake time.Time
if ps, ok := status.Peer[node.Key]; ok {
lastHandshake = ps.LastHandshake
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Is there a situation a peer would be missing from theconfigMaps peer list but still be in the engine's peer list?

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Yeah, immediately after receiving a DISCONNECTED update, we remove it from configMaps. Then until the next update it'll be in the engine but not configMaps. Probably an edge case for us to DISCONNECT and then immediately get a new NODE update, though.

c.peers[id] = &peerLifecycle{
peerID: id,
node: node,
lastHandshake: lastHandshake,
lost: false,
}
logger.Debug(context.Background(), "adding new peer")
return true
case ok && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE:
// update
node.Created = lc.node.Created
dirty = !lc.node.Equal(node)
lc.node = node
lc.lost = false
lc.resetTimer()
logger.Debug(context.Background(), "node update to existing peer", slog.F("dirty", dirty))
return dirty
case !ok:
// disconnected or lost, but we don't have the node. No op
logger.Debug(context.Background(), "skipping update for peer we don't recognize")
return false
case update.Kind == proto.CoordinateResponse_PeerUpdate_DISCONNECTED:
lc.resetTimer()
delete(c.peers, id)
logger.Debug(context.Background(), "disconnected peer")
return true
case update.Kind == proto.CoordinateResponse_PeerUpdate_LOST:
lc.lost = true
lc.setLostTimer(c)
logger.Debug(context.Background(), "marked peer lost")
// marking a node lost doesn't change anything right now, so dirty=false
return false
default:
logger.Warn(context.Background(), "unknown peer update", slog.F("kind", update.Kind))
return false
}
}

// peerLostTimeout is the callback that peerLifecycle uses when a peer is lost the timeout to
// receive a handshake fires.
func (c *configMaps) peerLostTimeout(id uuid.UUID) {
logger := c.logger.With(slog.F("peer_id", id))
logger.Debug(context.Background(),
"peer lost timeout")

// First do a status update to see if the peer did a handshake while we were
// waiting
status := c.status()
c.L.Lock()
defer c.L.Unlock()

lc, ok := c.peers[id]
if !ok {
logger.Debug(context.Background(),
"timeout triggered for peer that is removed from the map")
return
}
if peerStatus, ok := status.Peer[lc.node.Key]; ok {
lc.lastHandshake = peerStatus.LastHandshake
}
logger = logger.With(slog.F("key_id", lc.node.Key.ShortString()))
if !lc.lost {
logger.Debug(context.Background(),
"timeout triggered for peer that is no longer lost")
return
}
since := c.clock.Since(lc.lastHandshake)
if since >= lostTimeout {
logger.Info(
context.Background(), "removing lost peer")
delete(c.peers, id)
c.netmapDirty = true
c.Broadcast()
return
}
logger.Debug(context.Background(),
"timeout triggered for peer but it had handshake in meantime")
lc.setLostTimer(c)
}

func (c *configMaps) protoNodeToTailcfg(p *proto.Node) (*tailcfg.Node, error) {
node, err := ProtoToNode(p)
if err != nil {
return nil, err
}
return &tailcfg.Node{
ID: tailcfg.NodeID(p.GetId()),
Created: c.clock.Now(),
Key: node.Key,
DiscoKey: node.DiscoKey,
Addresses: node.Addresses,
AllowedIPs: node.AllowedIPs,
Endpoints: node.Endpoints,
DERP: fmt.Sprintf("%s:%d", tailcfg.DerpMagicIP, node.PreferredDERP),
Hostinfo: (&tailcfg.Hostinfo{}).View(),
}, nil
}

type peerLifecycle struct {
node *tailcfg.Node
// TODO: implement timers to track lost peers
// lastHandshake time.Time
// timer time.Timer
peerID uuid.UUID
node *tailcfg.Node
lost bool
lastHandshake time.Time
timer *clock.Timer
}

func (l *peerLifecycle) resetTimer() {
if l.timer != nil {
l.timer.Stop()
l.timer = nil
}
}

func (l *peerLifecycle) setLostTimer(c *configMaps) {
if l.timer != nil {
l.timer.Stop()
}
ttl := lostTimeout - c.clock.Since(l.lastHandshake)
if ttl <= 0 {
ttl = time.Nanosecond
}
l.timer = c.clock.AfterFunc(ttl, func() {
c.peerLostTimeout(l.peerID)
})
}

// prefixesDifferent returns true if the two slices contain different prefixes
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp