Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd52fab0

Browse files
committed
chore: add nodeUpdater to tailnet
1 parent15eef5e commitd52fab0

File tree

4 files changed

+256
-8
lines changed

4 files changed

+256
-8
lines changed

‎tailnet/configmaps.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,17 @@ const (
4848
closed
4949
)
5050

51-
typeconfigMapsstruct {
51+
typephasedstruct {
5252
sync.Cond
53+
phasephase
54+
}
55+
56+
typeconfigMapsstruct {
57+
phased
5358
netmapDirtybool
5459
derpMapDirtybool
5560
filterDirtybool
5661
closingbool
57-
phasephase
5862

5963
engineengineConfigurable
6064
static netmap.NetworkMap
@@ -71,7 +75,7 @@ type configMaps struct {
7175
funcnewConfigMaps(logger slog.Logger,engineengineConfigurable,nodeID tailcfg.NodeID,nodeKey key.NodePrivate,discoKey key.DiscoPublic,addresses []netip.Prefix)*configMaps {
7276
pubKey:=nodeKey.Public()
7377
c:=&configMaps{
74-
Cond:*(sync.NewCond(&sync.Mutex{})),
78+
phased:phased{Cond:*(sync.NewCond(&sync.Mutex{}))},
7579
logger:logger,
7680
engine:engine,
7781
static: netmap.NetworkMap{

‎tailnet/configmaps_internal_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func TestConfigMaps_setAddresses_same(t *testing.T) {
9696
uut:=newConfigMaps(logger,fEng,nodeID,nodePrivateKey,discoKey.Public(),addrs)
9797
deferuut.close()
9898

99-
requireNeverConfigures(ctx,t,uut)
99+
requireNeverConfigures(ctx,t,&uut.phased)
100100

101101
uut.setAddresses(addrs)
102102

@@ -190,7 +190,7 @@ func TestConfigMaps_updatePeers_same(t *testing.T) {
190190
deferuut.close()
191191

192192
// Then: we don't configure
193-
requireNeverConfigures(ctx,t,uut)
193+
requireNeverConfigures(ctx,t,&uut.phased)
194194

195195
p1ID:=uuid.MustParse("10000000-0000-0000-0000-000000000000")
196196
p1Node:=newTestNode(1)
@@ -558,7 +558,7 @@ func TestConfigMaps_setBlockEndpoints_same(t *testing.T) {
558558
uut.L.Unlock()
559559

560560
// Then: we don't configure
561-
requireNeverConfigures(ctx,t,uut)
561+
requireNeverConfigures(ctx,t,&uut.phased)
562562

563563
// When we set blockEndpoints to true
564564
uut.setBlockEndpoints(true)
@@ -619,7 +619,7 @@ func TestConfigMaps_updatePeers_nonexist(t *testing.T) {
619619
deferuut.close()
620620

621621
// Then: we don't configure
622-
requireNeverConfigures(ctx,t,uut)
622+
requireNeverConfigures(ctx,t,&uut.phased)
623623

624624
// Given: no known peers
625625
gofunc() {
@@ -669,7 +669,7 @@ func getNodeWithID(t testing.TB, peers []*tailcfg.Node, id tailcfg.NodeID) *tail
669669
returnnil
670670
}
671671

672-
funcrequireNeverConfigures(ctx context.Context,t*testing.T,uut*configMaps) {
672+
funcrequireNeverConfigures(ctx context.Context,t*testing.T,uut*phased) {
673673
t.Helper()
674674
waiting:=make(chanstruct{})
675675
gofunc() {

‎tailnet/node.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package tailnet
2+
3+
import (
4+
"context"
5+
"net/netip"
6+
"sync"
7+
8+
"golang.org/x/exp/maps"
9+
"golang.org/x/exp/slices"
10+
"tailscale.com/tailcfg"
11+
"tailscale.com/types/key"
12+
13+
"cdr.dev/slog"
14+
"github.com/coder/coder/v2/coderd/database/dbtime"
15+
)
16+
17+
typenodeUpdaterstruct {
18+
phased
19+
dirtybool
20+
closingbool
21+
22+
// static
23+
logger slog.Logger
24+
id tailcfg.NodeID
25+
key key.NodePublic
26+
discoKey key.DiscoPublic
27+
callbackfunc(n*Node)
28+
29+
// dynamic
30+
preferredDERPint
31+
derpLatencymap[string]float64
32+
derpForcedWebsocketsmap[int]string
33+
endpoints []string
34+
addresses []netip.Prefix
35+
}
36+
37+
// updateLoop waits until the config is dirty and then calls the callback with the newest node.
38+
// It is intended only to be called internally, and shuts down when close() is called.
39+
func (u*nodeUpdater)updateLoop() {
40+
u.L.Lock()
41+
deferu.L.Unlock()
42+
deferfunc() {
43+
u.phase=closed
44+
u.Broadcast()
45+
}()
46+
for {
47+
for!(u.closing||u.dirty) {
48+
u.phase=idle
49+
u.Wait()
50+
}
51+
ifu.closing {
52+
return
53+
}
54+
node:=u.nodeLocked()
55+
u.dirty=false
56+
u.phase=configuring
57+
u.Broadcast()
58+
59+
// We cannot reach nodes without DERP for discovery. Therefore, there is no point in sending
60+
// the node without this, and we can save ourselves from churn in the tailscale/wireguard
61+
// layer.
62+
ifnode.PreferredDERP==0 {
63+
u.logger.Debug(context.Background(),"skipped sending node; no PreferredDERP",slog.F("node",node))
64+
continue
65+
}
66+
67+
u.L.Unlock()
68+
u.callback(node)
69+
u.L.Lock()
70+
}
71+
}
72+
73+
// close closes the nodeUpdate and stops it calling the node callback
74+
func (u*nodeUpdater)close() {
75+
u.L.Lock()
76+
deferu.L.Unlock()
77+
u.closing=true
78+
u.Broadcast()
79+
foru.phase!=closed {
80+
u.Wait()
81+
}
82+
}
83+
84+
funcnewNodeUpdater(
85+
logger slog.Logger,callbackfunc(n*Node),
86+
id tailcfg.NodeID,np key.NodePublic,dp key.DiscoPublic,
87+
)*nodeUpdater {
88+
u:=&nodeUpdater{
89+
phased:phased{Cond:*(sync.NewCond(&sync.Mutex{}))},
90+
logger:logger,
91+
id:id,
92+
key:np,
93+
discoKey:dp,
94+
callback:callback,
95+
}
96+
gou.updateLoop()
97+
returnu
98+
}
99+
100+
// nodeLocked returns the current best node information. u.L must be held.
101+
func (u*nodeUpdater)nodeLocked()*Node {
102+
return&Node{
103+
ID:u.id,
104+
AsOf:dbtime.Now(),
105+
Key:u.key,
106+
Addresses:slices.Clone(u.addresses),
107+
AllowedIPs:slices.Clone(u.addresses),
108+
DiscoKey:u.discoKey,
109+
Endpoints:slices.Clone(u.endpoints),
110+
PreferredDERP:u.preferredDERP,
111+
DERPLatency:maps.Clone(u.derpLatency),
112+
DERPForcedWebsocket:maps.Clone(u.derpForcedWebsockets),
113+
}
114+
}
115+
116+
// setNetInfo processes a NetInfo update from the wireguard engine. c.L MUST
117+
// NOT be held.
118+
func (u*nodeUpdater)setNetInfo(ni*tailcfg.NetInfo) {
119+
u.L.Lock()
120+
deferu.L.Unlock()
121+
dirty:=false
122+
ifu.preferredDERP!=ni.PreferredDERP {
123+
dirty=true
124+
u.preferredDERP=ni.PreferredDERP
125+
}
126+
if!maps.Equal(u.derpLatency,ni.DERPLatency) {
127+
dirty=true
128+
u.derpLatency=ni.DERPLatency
129+
}
130+
ifdirty {
131+
u.dirty=true
132+
u.Broadcast()
133+
}
134+
}

‎tailnet/node_internal_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package tailnet
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
"golang.org/x/exp/maps"
8+
"tailscale.com/tailcfg"
9+
"tailscale.com/types/key"
10+
11+
"cdr.dev/slog"
12+
"cdr.dev/slog/sloggers/slogtest"
13+
"github.com/coder/coder/v2/testutil"
14+
)
15+
16+
funcTestNodeUpdater_setNetInfo_different(t*testing.T) {
17+
t.Parallel()
18+
ctx:=testutil.Context(t,testutil.WaitShort)
19+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
20+
id:=tailcfg.NodeID(1)
21+
nodeKey:=key.NewNode().Public()
22+
discoKey:=key.NewDisco().Public()
23+
nodeCh:=make(chan*Node)
24+
goCh:=make(chanstruct{})
25+
uut:=newNodeUpdater(
26+
logger,
27+
func(n*Node) {
28+
nodeCh<-n
29+
<-goCh
30+
},
31+
id,nodeKey,discoKey,
32+
)
33+
deferuut.close()
34+
35+
dl:=map[string]float64{"1":0.025}
36+
uut.setNetInfo(&tailcfg.NetInfo{
37+
PreferredDERP:1,
38+
DERPLatency:dl,
39+
})
40+
41+
node:=testutil.RequireRecvCtx(ctx,t,nodeCh)
42+
require.Equal(t,nodeKey,node.Key)
43+
require.Equal(t,discoKey,node.DiscoKey)
44+
require.Equal(t,1,node.PreferredDERP)
45+
require.True(t,maps.Equal(dl,node.DERPLatency))
46+
47+
// Send in second update to test getting updates in the middle of the
48+
// callback
49+
uut.setNetInfo(&tailcfg.NetInfo{
50+
PreferredDERP:2,
51+
DERPLatency:dl,
52+
})
53+
close(goCh)// allows callback to complete
54+
55+
node=testutil.RequireRecvCtx(ctx,t,nodeCh)
56+
require.Equal(t,nodeKey,node.Key)
57+
require.Equal(t,discoKey,node.DiscoKey)
58+
require.Equal(t,2,node.PreferredDERP)
59+
require.True(t,maps.Equal(dl,node.DERPLatency))
60+
61+
done:=make(chanstruct{})
62+
gofunc() {
63+
deferclose(done)
64+
uut.close()
65+
}()
66+
_=testutil.RequireRecvCtx(ctx,t,done)
67+
}
68+
69+
funcTestNodeUpdater_setNetInfo_same(t*testing.T) {
70+
t.Parallel()
71+
ctx:=testutil.Context(t,testutil.WaitShort)
72+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
73+
id:=tailcfg.NodeID(1)
74+
nodeKey:=key.NewNode().Public()
75+
discoKey:=key.NewDisco().Public()
76+
nodeCh:=make(chan*Node)
77+
goCh:=make(chanstruct{})
78+
uut:=newNodeUpdater(
79+
logger,
80+
func(n*Node) {
81+
nodeCh<-n
82+
<-goCh
83+
},
84+
id,nodeKey,discoKey,
85+
)
86+
deferuut.close()
87+
88+
// Then: we don't configure
89+
requireNeverConfigures(ctx,t,&uut.phased)
90+
91+
// Given: preferred DERP and latency already set
92+
dl:=map[string]float64{"1":0.025}
93+
uut.L.Lock()
94+
uut.preferredDERP=1
95+
uut.derpLatency=maps.Clone(dl)
96+
uut.L.Unlock()
97+
98+
// When: new update with same info
99+
uut.setNetInfo(&tailcfg.NetInfo{
100+
PreferredDERP:1,
101+
DERPLatency:dl,
102+
})
103+
104+
done:=make(chanstruct{})
105+
gofunc() {
106+
deferclose(done)
107+
uut.close()
108+
}()
109+
_=testutil.RequireRecvCtx(ctx,t,done)
110+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp