Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

chore: add nodeUpdater to tailnet#11539

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
spikecurtis merged 1 commit intomainfromspike/10533-add-nodeUpdater
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletionstailnet/configmaps.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -48,13 +48,17 @@ const (
closed
)

typeconfigMaps struct {
typephased struct {
sync.Cond
phase phase
}

type configMaps struct {
phased
netmapDirty bool
derpMapDirty bool
filterDirty bool
closing bool
phase phase

engine engineConfigurable
static netmap.NetworkMap
Expand All@@ -71,7 +75,7 @@ type configMaps struct {
func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg.NodeID, nodeKey key.NodePrivate, discoKey key.DiscoPublic, addresses []netip.Prefix) *configMaps {
pubKey := nodeKey.Public()
c := &configMaps{
Cond:*(sync.NewCond(&sync.Mutex{})),
phased: phased{Cond: *(sync.NewCond(&sync.Mutex{}))},
logger: logger,
engine: engine,
static: netmap.NetworkMap{
Expand Down
10 changes: 5 additions & 5 deletionstailnet/configmaps_internal_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -96,7 +96,7 @@ func TestConfigMaps_setAddresses_same(t *testing.T) {
uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), addrs)
defer uut.close()

requireNeverConfigures(ctx, t, uut)
requireNeverConfigures(ctx, t,&uut.phased)

uut.setAddresses(addrs)

Expand DownExpand Up@@ -190,7 +190,7 @@ func TestConfigMaps_updatePeers_same(t *testing.T) {
defer uut.close()

// Then: we don't configure
requireNeverConfigures(ctx, t, uut)
requireNeverConfigures(ctx, t,&uut.phased)

p1ID := uuid.UUID{1}
p1Node := newTestNode(1)
Expand DownExpand Up@@ -558,7 +558,7 @@ func TestConfigMaps_setBlockEndpoints_same(t *testing.T) {
uut.L.Unlock()

// Then: we don't configure
requireNeverConfigures(ctx, t, uut)
requireNeverConfigures(ctx, t,&uut.phased)

// When we set blockEndpoints to true
uut.setBlockEndpoints(true)
Expand DownExpand Up@@ -619,7 +619,7 @@ func TestConfigMaps_updatePeers_nonexist(t *testing.T) {
defer uut.close()

// Then: we don't configure
requireNeverConfigures(ctx, t, uut)
requireNeverConfigures(ctx, t,&uut.phased)

// Given: no known peers
go func() {
Expand DownExpand Up@@ -669,7 +669,7 @@ func getNodeWithID(t testing.TB, peers []*tailcfg.Node, id tailcfg.NodeID) *tail
return nil
}

func requireNeverConfigures(ctx context.Context, t *testing.T, uut *configMaps) {
func requireNeverConfigures(ctx context.Context, t *testing.T, uut *phased) {
t.Helper()
waiting := make(chan struct{})
go func() {
Expand Down
134 changes: 134 additions & 0 deletionstailnet/node.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
package tailnet

import (
"context"
"net/netip"
"sync"

"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"tailscale.com/tailcfg"
"tailscale.com/types/key"

"cdr.dev/slog"
"github.com/coder/coder/v2/coderd/database/dbtime"
)

type nodeUpdater struct {
phased
dirty bool
closing bool

// static
logger slog.Logger
id tailcfg.NodeID
key key.NodePublic
discoKey key.DiscoPublic
callback func(n *Node)

// dynamic
preferredDERP int
derpLatency map[string]float64
derpForcedWebsockets map[int]string
endpoints []string
addresses []netip.Prefix
}

// updateLoop waits until the config is dirty and then calls the callback with the newest node.
// It is intended only to be called internally, and shuts down when close() is called.
func (u *nodeUpdater) updateLoop() {
u.L.Lock()
defer u.L.Unlock()
defer func() {
u.phase = closed
u.Broadcast()
}()
for {
for !(u.closing || u.dirty) {
u.phase = idle
u.Wait()
}
if u.closing {
return
}
node := u.nodeLocked()
u.dirty = false
u.phase = configuring
u.Broadcast()

// We cannot reach nodes without DERP for discovery. Therefore, there is no point in sending
// the node without this, and we can save ourselves from churn in the tailscale/wireguard
// layer.
if node.PreferredDERP == 0 {
u.logger.Debug(context.Background(), "skipped sending node; no PreferredDERP", slog.F("node", node))
continue
}

u.L.Unlock()
u.callback(node)
u.L.Lock()
}
}

// close closes the nodeUpdate and stops it calling the node callback
func (u *nodeUpdater) close() {
u.L.Lock()
defer u.L.Unlock()
u.closing = true
u.Broadcast()
for u.phase != closed {
u.Wait()
}
}

func newNodeUpdater(
logger slog.Logger, callback func(n *Node),
id tailcfg.NodeID, np key.NodePublic, dp key.DiscoPublic,
) *nodeUpdater {
u := &nodeUpdater{
phased: phased{Cond: *(sync.NewCond(&sync.Mutex{}))},
logger: logger,
id: id,
key: np,
discoKey: dp,
callback: callback,
}
go u.updateLoop()
return u
}

// nodeLocked returns the current best node information. u.L must be held.
func (u *nodeUpdater) nodeLocked() *Node {
return &Node{
ID: u.id,
AsOf: dbtime.Now(),
Key: u.key,
Addresses: slices.Clone(u.addresses),
AllowedIPs: slices.Clone(u.addresses),
DiscoKey: u.discoKey,
Endpoints: slices.Clone(u.endpoints),
PreferredDERP: u.preferredDERP,
DERPLatency: maps.Clone(u.derpLatency),
DERPForcedWebsocket: maps.Clone(u.derpForcedWebsockets),
}
}

// setNetInfo processes a NetInfo update from the wireguard engine. c.L MUST
// NOT be held.
func (u *nodeUpdater) setNetInfo(ni *tailcfg.NetInfo) {
u.L.Lock()
defer u.L.Unlock()
dirty := false
if u.preferredDERP != ni.PreferredDERP {
dirty = true
u.preferredDERP = ni.PreferredDERP
}
if !maps.Equal(u.derpLatency, ni.DERPLatency) {
dirty = true
u.derpLatency = ni.DERPLatency
}
if dirty {
u.dirty = true
u.Broadcast()
}
}
110 changes: 110 additions & 0 deletionstailnet/node_internal_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
package tailnet

import (
"testing"

"github.com/stretchr/testify/require"
"golang.org/x/exp/maps"
"tailscale.com/tailcfg"
"tailscale.com/types/key"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/testutil"
)

func TestNodeUpdater_setNetInfo_different(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
id := tailcfg.NodeID(1)
nodeKey := key.NewNode().Public()
discoKey := key.NewDisco().Public()
nodeCh := make(chan *Node)
goCh := make(chan struct{})
uut := newNodeUpdater(
logger,
func(n *Node) {
nodeCh <- n
<-goCh
},
id, nodeKey, discoKey,
)
defer uut.close()

dl := map[string]float64{"1": 0.025}
uut.setNetInfo(&tailcfg.NetInfo{
PreferredDERP: 1,
DERPLatency: dl,
})

node := testutil.RequireRecvCtx(ctx, t, nodeCh)
require.Equal(t, nodeKey, node.Key)
require.Equal(t, discoKey, node.DiscoKey)
require.Equal(t, 1, node.PreferredDERP)
require.True(t, maps.Equal(dl, node.DERPLatency))

// Send in second update to test getting updates in the middle of the
// callback
uut.setNetInfo(&tailcfg.NetInfo{
PreferredDERP: 2,
DERPLatency: dl,
})
close(goCh) // allows callback to complete

node = testutil.RequireRecvCtx(ctx, t, nodeCh)
require.Equal(t, nodeKey, node.Key)
require.Equal(t, discoKey, node.DiscoKey)
require.Equal(t, 2, node.PreferredDERP)
require.True(t, maps.Equal(dl, node.DERPLatency))

done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.RequireRecvCtx(ctx, t, done)
}

func TestNodeUpdater_setNetInfo_same(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
id := tailcfg.NodeID(1)
nodeKey := key.NewNode().Public()
discoKey := key.NewDisco().Public()
nodeCh := make(chan *Node)
goCh := make(chan struct{})
uut := newNodeUpdater(
logger,
func(n *Node) {
nodeCh <- n
<-goCh
},
id, nodeKey, discoKey,
)
defer uut.close()

// Then: we don't configure
requireNeverConfigures(ctx, t, &uut.phased)

// Given: preferred DERP and latency already set
dl := map[string]float64{"1": 0.025}
uut.L.Lock()
uut.preferredDERP = 1
uut.derpLatency = maps.Clone(dl)
uut.L.Unlock()

// When: new update with same info
uut.setNetInfo(&tailcfg.NetInfo{
PreferredDERP: 1,
DERPLatency: dl,
})

done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.RequireRecvCtx(ctx, t, done)
}

[8]ページ先頭

©2009-2025 Movatter.jp