- Notifications
You must be signed in to change notification settings - Fork1k
chore: add nodeUpdater to tailnet#11539
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Merged
Uh oh!
There was an error while loading.Please reload this page.
Merged
Changes fromall commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Jump to file
Failed to load files.
Loading
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
10 changes: 7 additions & 3 deletionstailnet/configmaps.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.Learn more about bidirectional Unicode characters
10 changes: 5 additions & 5 deletionstailnet/configmaps_internal_test.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.Learn more about bidirectional Unicode characters
134 changes: 134 additions & 0 deletionstailnet/node.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
package tailnet | ||
import ( | ||
"context" | ||
"net/netip" | ||
"sync" | ||
"golang.org/x/exp/maps" | ||
"golang.org/x/exp/slices" | ||
"tailscale.com/tailcfg" | ||
"tailscale.com/types/key" | ||
"cdr.dev/slog" | ||
"github.com/coder/coder/v2/coderd/database/dbtime" | ||
) | ||
type nodeUpdater struct { | ||
phased | ||
dirty bool | ||
closing bool | ||
// static | ||
logger slog.Logger | ||
id tailcfg.NodeID | ||
key key.NodePublic | ||
discoKey key.DiscoPublic | ||
callback func(n *Node) | ||
// dynamic | ||
preferredDERP int | ||
derpLatency map[string]float64 | ||
derpForcedWebsockets map[int]string | ||
endpoints []string | ||
addresses []netip.Prefix | ||
} | ||
// updateLoop waits until the config is dirty and then calls the callback with the newest node. | ||
// It is intended only to be called internally, and shuts down when close() is called. | ||
func (u *nodeUpdater) updateLoop() { | ||
u.L.Lock() | ||
defer u.L.Unlock() | ||
defer func() { | ||
u.phase = closed | ||
u.Broadcast() | ||
}() | ||
for { | ||
for !(u.closing || u.dirty) { | ||
u.phase = idle | ||
u.Wait() | ||
} | ||
if u.closing { | ||
return | ||
} | ||
node := u.nodeLocked() | ||
u.dirty = false | ||
u.phase = configuring | ||
u.Broadcast() | ||
// We cannot reach nodes without DERP for discovery. Therefore, there is no point in sending | ||
// the node without this, and we can save ourselves from churn in the tailscale/wireguard | ||
// layer. | ||
if node.PreferredDERP == 0 { | ||
u.logger.Debug(context.Background(), "skipped sending node; no PreferredDERP", slog.F("node", node)) | ||
continue | ||
} | ||
u.L.Unlock() | ||
u.callback(node) | ||
u.L.Lock() | ||
} | ||
} | ||
// close closes the nodeUpdate and stops it calling the node callback | ||
func (u *nodeUpdater) close() { | ||
u.L.Lock() | ||
defer u.L.Unlock() | ||
u.closing = true | ||
u.Broadcast() | ||
for u.phase != closed { | ||
u.Wait() | ||
} | ||
} | ||
func newNodeUpdater( | ||
logger slog.Logger, callback func(n *Node), | ||
id tailcfg.NodeID, np key.NodePublic, dp key.DiscoPublic, | ||
) *nodeUpdater { | ||
u := &nodeUpdater{ | ||
phased: phased{Cond: *(sync.NewCond(&sync.Mutex{}))}, | ||
logger: logger, | ||
id: id, | ||
key: np, | ||
discoKey: dp, | ||
callback: callback, | ||
} | ||
go u.updateLoop() | ||
return u | ||
} | ||
// nodeLocked returns the current best node information. u.L must be held. | ||
func (u *nodeUpdater) nodeLocked() *Node { | ||
return &Node{ | ||
ID: u.id, | ||
AsOf: dbtime.Now(), | ||
Key: u.key, | ||
Addresses: slices.Clone(u.addresses), | ||
AllowedIPs: slices.Clone(u.addresses), | ||
DiscoKey: u.discoKey, | ||
Endpoints: slices.Clone(u.endpoints), | ||
PreferredDERP: u.preferredDERP, | ||
DERPLatency: maps.Clone(u.derpLatency), | ||
DERPForcedWebsocket: maps.Clone(u.derpForcedWebsockets), | ||
} | ||
} | ||
// setNetInfo processes a NetInfo update from the wireguard engine. c.L MUST | ||
// NOT be held. | ||
func (u *nodeUpdater) setNetInfo(ni *tailcfg.NetInfo) { | ||
u.L.Lock() | ||
defer u.L.Unlock() | ||
dirty := false | ||
if u.preferredDERP != ni.PreferredDERP { | ||
dirty = true | ||
u.preferredDERP = ni.PreferredDERP | ||
} | ||
if !maps.Equal(u.derpLatency, ni.DERPLatency) { | ||
dirty = true | ||
u.derpLatency = ni.DERPLatency | ||
} | ||
if dirty { | ||
u.dirty = true | ||
u.Broadcast() | ||
} | ||
} |
110 changes: 110 additions & 0 deletionstailnet/node_internal_test.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
package tailnet | ||
import ( | ||
"testing" | ||
"github.com/stretchr/testify/require" | ||
"golang.org/x/exp/maps" | ||
"tailscale.com/tailcfg" | ||
"tailscale.com/types/key" | ||
"cdr.dev/slog" | ||
"cdr.dev/slog/sloggers/slogtest" | ||
"github.com/coder/coder/v2/testutil" | ||
) | ||
func TestNodeUpdater_setNetInfo_different(t *testing.T) { | ||
t.Parallel() | ||
ctx := testutil.Context(t, testutil.WaitShort) | ||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) | ||
id := tailcfg.NodeID(1) | ||
nodeKey := key.NewNode().Public() | ||
discoKey := key.NewDisco().Public() | ||
nodeCh := make(chan *Node) | ||
goCh := make(chan struct{}) | ||
uut := newNodeUpdater( | ||
logger, | ||
func(n *Node) { | ||
nodeCh <- n | ||
<-goCh | ||
}, | ||
id, nodeKey, discoKey, | ||
) | ||
defer uut.close() | ||
dl := map[string]float64{"1": 0.025} | ||
uut.setNetInfo(&tailcfg.NetInfo{ | ||
PreferredDERP: 1, | ||
DERPLatency: dl, | ||
}) | ||
node := testutil.RequireRecvCtx(ctx, t, nodeCh) | ||
require.Equal(t, nodeKey, node.Key) | ||
require.Equal(t, discoKey, node.DiscoKey) | ||
require.Equal(t, 1, node.PreferredDERP) | ||
require.True(t, maps.Equal(dl, node.DERPLatency)) | ||
// Send in second update to test getting updates in the middle of the | ||
// callback | ||
uut.setNetInfo(&tailcfg.NetInfo{ | ||
PreferredDERP: 2, | ||
DERPLatency: dl, | ||
}) | ||
close(goCh) // allows callback to complete | ||
node = testutil.RequireRecvCtx(ctx, t, nodeCh) | ||
require.Equal(t, nodeKey, node.Key) | ||
require.Equal(t, discoKey, node.DiscoKey) | ||
require.Equal(t, 2, node.PreferredDERP) | ||
require.True(t, maps.Equal(dl, node.DERPLatency)) | ||
done := make(chan struct{}) | ||
go func() { | ||
defer close(done) | ||
uut.close() | ||
}() | ||
_ = testutil.RequireRecvCtx(ctx, t, done) | ||
} | ||
func TestNodeUpdater_setNetInfo_same(t *testing.T) { | ||
t.Parallel() | ||
ctx := testutil.Context(t, testutil.WaitShort) | ||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) | ||
id := tailcfg.NodeID(1) | ||
nodeKey := key.NewNode().Public() | ||
discoKey := key.NewDisco().Public() | ||
nodeCh := make(chan *Node) | ||
goCh := make(chan struct{}) | ||
uut := newNodeUpdater( | ||
logger, | ||
func(n *Node) { | ||
nodeCh <- n | ||
<-goCh | ||
}, | ||
id, nodeKey, discoKey, | ||
) | ||
defer uut.close() | ||
// Then: we don't configure | ||
requireNeverConfigures(ctx, t, &uut.phased) | ||
// Given: preferred DERP and latency already set | ||
dl := map[string]float64{"1": 0.025} | ||
uut.L.Lock() | ||
uut.preferredDERP = 1 | ||
uut.derpLatency = maps.Clone(dl) | ||
uut.L.Unlock() | ||
// When: new update with same info | ||
uut.setNetInfo(&tailcfg.NetInfo{ | ||
PreferredDERP: 1, | ||
DERPLatency: dl, | ||
}) | ||
done := make(chan struct{}) | ||
go func() { | ||
defer close(done) | ||
uut.close() | ||
}() | ||
_ = testutil.RequireRecvCtx(ctx, t, done) | ||
} |
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.