Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4e34b3e

Browse files
committed
chore: add nodeUpdater to tailnet
1 parent63d95ce commit4e34b3e

File tree

4 files changed

+260
-8
lines changed

4 files changed

+260
-8
lines changed

‎tailnet/configmaps.go‎

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,17 @@ const (
4848
closed
4949
)
5050

51-
typeconfigMapsstruct {
51+
typephasedstruct {
5252
sync.Cond
53+
phasephase
54+
}
55+
56+
typeconfigMapsstruct {
57+
phased
5358
netmapDirtybool
5459
derpMapDirtybool
5560
filterDirtybool
5661
closingbool
57-
phasephase
5862

5963
engineengineConfigurable
6064
static netmap.NetworkMap
@@ -71,7 +75,7 @@ type configMaps struct {
7175
funcnewConfigMaps(logger slog.Logger,engineengineConfigurable,nodeID tailcfg.NodeID,nodeKey key.NodePrivate,discoKey key.DiscoPublic,addresses []netip.Prefix)*configMaps {
7276
pubKey:=nodeKey.Public()
7377
c:=&configMaps{
74-
Cond:*(sync.NewCond(&sync.Mutex{})),
78+
phased:phased{Cond:*(sync.NewCond(&sync.Mutex{}))},
7579
logger:logger,
7680
engine:engine,
7781
static: netmap.NetworkMap{

‎tailnet/configmaps_internal_test.go‎

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func TestConfigMaps_setAddresses_same(t *testing.T) {
9696
uut:=newConfigMaps(logger,fEng,nodeID,nodePrivateKey,discoKey.Public(),addrs)
9797
deferuut.close()
9898

99-
requireNeverConfigures(ctx,t,uut)
99+
requireNeverConfigures(ctx,t,&uut.phased)
100100

101101
uut.setAddresses(addrs)
102102

@@ -190,7 +190,7 @@ func TestConfigMaps_updatePeers_same(t *testing.T) {
190190
deferuut.close()
191191

192192
// Then: we don't configure
193-
requireNeverConfigures(ctx,t,uut)
193+
requireNeverConfigures(ctx,t,&uut.phased)
194194

195195
p1ID:=uuid.MustParse("10000000-0000-0000-0000-000000000000")
196196
p1Node:=newTestNode(1)
@@ -549,7 +549,7 @@ func TestConfigMaps_setBlockEndpoints_same(t *testing.T) {
549549
uut.L.Unlock()
550550

551551
// Then: we don't configure
552-
requireNeverConfigures(ctx,t,uut)
552+
requireNeverConfigures(ctx,t,&uut.phased)
553553

554554
// When we set blockEndpoints to true
555555
uut.setBlockEndpoints(true)
@@ -610,7 +610,7 @@ func TestConfigMaps_updatePeers_nonexist(t *testing.T) {
610610
deferuut.close()
611611

612612
// Then: we don't configure
613-
requireNeverConfigures(ctx,t,uut)
613+
requireNeverConfigures(ctx,t,&uut.phased)
614614

615615
// Given: no known peers
616616
gofunc() {
@@ -660,7 +660,7 @@ func getNodeWithID(t testing.TB, peers []*tailcfg.Node, id tailcfg.NodeID) *tail
660660
returnnil
661661
}
662662

663-
funcrequireNeverConfigures(ctx context.Context,t*testing.T,uut*configMaps) {
663+
funcrequireNeverConfigures(ctx context.Context,t*testing.T,uut*phased) {
664664
t.Helper()
665665
waiting:=make(chanstruct{})
666666
gofunc() {

‎tailnet/node.go‎

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package tailnet
2+
3+
import (
4+
"context"
5+
"net/netip"
6+
"sync"
7+
8+
"golang.org/x/exp/maps"
9+
"golang.org/x/exp/slices"
10+
"tailscale.com/tailcfg"
11+
"tailscale.com/types/key"
12+
13+
"cdr.dev/slog"
14+
"github.com/coder/coder/v2/coderd/database/dbtime"
15+
)
16+
17+
typenodeUpdaterstruct {
18+
phased
19+
dirtybool
20+
closingbool
21+
22+
// static
23+
logger slog.Logger
24+
id tailcfg.NodeID
25+
key key.NodePublic
26+
discoKey key.DiscoPublic
27+
callbackfunc(n*Node)
28+
29+
// dynamic
30+
preferredDERPint
31+
derpLatencymap[string]float64
32+
derpForcedWebsocketsmap[int]string
33+
endpoints []string
34+
addresses []netip.Prefix
35+
}
36+
37+
// updateLoop waits until the config is dirty and then calls the callback with the newest node.
38+
// It is intended only to be called internally, and shuts down when close() is called.
39+
func (u*nodeUpdater)updateLoop() {
40+
u.L.Lock()
41+
deferu.L.Unlock()
42+
deferfunc() {
43+
u.phase=closed
44+
u.Broadcast()
45+
}()
46+
for {
47+
for!(u.closing||u.dirty) {
48+
u.phase=idle
49+
u.Wait()
50+
}
51+
ifu.closing {
52+
return
53+
}
54+
node:=u.nodeLocked()
55+
u.dirty=false
56+
u.phase=configuring
57+
u.Broadcast()
58+
59+
// We cannot reach nodes without DERP for discovery. Therefore, there is no point in sending
60+
// the node without this, and we can save ourselves from churn in the tailscale/wireguard
61+
// layer.
62+
ifnode.PreferredDERP==0 {
63+
u.logger.Debug(context.Background(),"skipped sending node; no PreferredDERP",slog.F("node",node))
64+
continue
65+
}
66+
67+
func() {
68+
// this may look a little odd, but here we want this code to run
69+
// without the lock, and then relock when done
70+
u.L.Unlock()
71+
deferu.L.Lock()
72+
u.callback(node)
73+
}()
74+
}
75+
}
76+
77+
// close closes the nodeUpdate and stops it calling the node callback
78+
func (u*nodeUpdater)close() {
79+
u.L.Lock()
80+
deferu.L.Unlock()
81+
u.closing=true
82+
u.Broadcast()
83+
foru.phase!=closed {
84+
u.Wait()
85+
}
86+
}
87+
88+
funcnewNodeUpdater(
89+
logger slog.Logger,callbackfunc(n*Node),
90+
id tailcfg.NodeID,np key.NodePublic,dp key.DiscoPublic,
91+
)*nodeUpdater {
92+
u:=&nodeUpdater{
93+
phased:phased{Cond:*(sync.NewCond(&sync.Mutex{}))},
94+
logger:logger,
95+
id:id,
96+
key:np,
97+
discoKey:dp,
98+
callback:callback,
99+
}
100+
gou.updateLoop()
101+
returnu
102+
}
103+
104+
// nodeLocked returns the current best node information. u.L must be held.
105+
func (u*nodeUpdater)nodeLocked()*Node {
106+
return&Node{
107+
ID:u.id,
108+
AsOf:dbtime.Now(),
109+
Key:u.key,
110+
Addresses:slices.Clone(u.addresses),
111+
AllowedIPs:slices.Clone(u.addresses),
112+
DiscoKey:u.discoKey,
113+
Endpoints:slices.Clone(u.endpoints),
114+
PreferredDERP:u.preferredDERP,
115+
DERPLatency:maps.Clone(u.derpLatency),
116+
DERPForcedWebsocket:maps.Clone(u.derpForcedWebsockets),
117+
}
118+
}
119+
120+
// setNetInfo processes a NetInfo update from the wireguard engine. c.L MUST
121+
// NOT be held.
122+
func (u*nodeUpdater)setNetInfo(ni*tailcfg.NetInfo) {
123+
u.L.Lock()
124+
deferu.L.Unlock()
125+
dirty:=false
126+
ifu.preferredDERP!=ni.PreferredDERP {
127+
dirty=true
128+
u.preferredDERP=ni.PreferredDERP
129+
}
130+
if!maps.Equal(u.derpLatency,ni.DERPLatency) {
131+
dirty=true
132+
u.derpLatency=ni.DERPLatency
133+
}
134+
ifdirty {
135+
u.dirty=true
136+
u.Broadcast()
137+
}
138+
}

‎tailnet/node_internal_test.go‎

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package tailnet
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
"golang.org/x/exp/maps"
8+
"tailscale.com/tailcfg"
9+
"tailscale.com/types/key"
10+
11+
"cdr.dev/slog"
12+
"cdr.dev/slog/sloggers/slogtest"
13+
"github.com/coder/coder/v2/testutil"
14+
)
15+
16+
funcTestNodeUpdater_setNetInfo_different(t*testing.T) {
17+
t.Parallel()
18+
ctx:=testutil.Context(t,testutil.WaitShort)
19+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
20+
id:=tailcfg.NodeID(1)
21+
nodeKey:=key.NewNode().Public()
22+
discoKey:=key.NewDisco().Public()
23+
nodeCh:=make(chan*Node)
24+
goCh:=make(chanstruct{})
25+
uut:=newNodeUpdater(
26+
logger,
27+
func(n*Node) {
28+
nodeCh<-n
29+
<-goCh
30+
},
31+
id,nodeKey,discoKey,
32+
)
33+
deferuut.close()
34+
35+
dl:=map[string]float64{"1":0.025}
36+
uut.setNetInfo(&tailcfg.NetInfo{
37+
PreferredDERP:1,
38+
DERPLatency:dl,
39+
})
40+
41+
node:=testutil.RequireRecvCtx(ctx,t,nodeCh)
42+
require.Equal(t,nodeKey,node.Key)
43+
require.Equal(t,discoKey,node.DiscoKey)
44+
require.Equal(t,1,node.PreferredDERP)
45+
require.True(t,maps.Equal(dl,node.DERPLatency))
46+
47+
// Send in second update to test getting updates in the middle of the
48+
// callback
49+
uut.setNetInfo(&tailcfg.NetInfo{
50+
PreferredDERP:2,
51+
DERPLatency:dl,
52+
})
53+
close(goCh)// allows callback to complete
54+
55+
node=testutil.RequireRecvCtx(ctx,t,nodeCh)
56+
require.Equal(t,nodeKey,node.Key)
57+
require.Equal(t,discoKey,node.DiscoKey)
58+
require.Equal(t,2,node.PreferredDERP)
59+
require.True(t,maps.Equal(dl,node.DERPLatency))
60+
61+
done:=make(chanstruct{})
62+
gofunc() {
63+
deferclose(done)
64+
uut.close()
65+
}()
66+
_=testutil.RequireRecvCtx(ctx,t,done)
67+
}
68+
69+
funcTestNodeUpdater_setNetInfo_same(t*testing.T) {
70+
t.Parallel()
71+
ctx:=testutil.Context(t,testutil.WaitShort)
72+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
73+
id:=tailcfg.NodeID(1)
74+
nodeKey:=key.NewNode().Public()
75+
discoKey:=key.NewDisco().Public()
76+
nodeCh:=make(chan*Node)
77+
goCh:=make(chanstruct{})
78+
uut:=newNodeUpdater(
79+
logger,
80+
func(n*Node) {
81+
nodeCh<-n
82+
<-goCh
83+
},
84+
id,nodeKey,discoKey,
85+
)
86+
deferuut.close()
87+
88+
// Then: we don't configure
89+
requireNeverConfigures(ctx,t,&uut.phased)
90+
91+
// Given: preferred DERP and latency already set
92+
dl:=map[string]float64{"1":0.025}
93+
uut.L.Lock()
94+
uut.preferredDERP=1
95+
uut.derpLatency=maps.Clone(dl)
96+
uut.L.Unlock()
97+
98+
// When: new update with same info
99+
uut.setNetInfo(&tailcfg.NetInfo{
100+
PreferredDERP:1,
101+
DERPLatency:dl,
102+
})
103+
104+
done:=make(chanstruct{})
105+
gofunc() {
106+
deferclose(done)
107+
uut.close()
108+
}()
109+
_=testutil.RequireRecvCtx(ctx,t,done)
110+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp