Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

chore: add additional network telemetry stats & events#13800

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
ethanndickson merged 17 commits intomainfromethan/more-telemetry
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
Show all changes
17 commits
Select commitHold shift + click to select a range
6ee3bfa
chore: add additional network telemetry stats & events
ethanndicksonJul 8, 2024
6e0ec17
better connectionage
ethanndicksonJul 8, 2024
ae17bb3
send update events only when home derp changes
ethanndicksonJul 8, 2024
612ae5f
better remote node id
ethanndicksonJul 8, 2024
d2d9c39
use netmap callback
ethanndicksonJul 8, 2024
af36c37
fixup
ethanndicksonJul 8, 2024
3466ba6
fixup
ethanndicksonJul 8, 2024
dbffec4
test
ethanndicksonJul 8, 2024
b240507
set p2p setup
ethanndicksonJul 9, 2024
e2e4018
fixup
ethanndicksonJul 9, 2024
0bebad4
watch for conn changes
ethanndicksonJul 9, 2024
6a19849
fixup
ethanndicksonJul 9, 2024
11f2a2b
use latest p2p setup time
ethanndicksonJul 9, 2024
fe7252a
ensure latest derpmap
ethanndicksonJul 9, 2024
4b76942
nicer ping peer
ethanndicksonJul 9, 2024
6214e2a
fix race
ethanndicksonJul 9, 2024
60e88f7
relay -> udp addr
ethanndicksonJul 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletioncli/ssh.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -437,7 +437,7 @@ func (r *RootCmd) ssh() *serpent.Command {
}

err = sshSession.Wait()
conn.SendDisconnectedTelemetry("ssh")
conn.SendDisconnectedTelemetry()
if err != nil {
if exitErr := (&gossh.ExitError{}); errors.As(err, &exitErr) {
// Clear the error since it's not useful beyond
Expand Down
4 changes: 2 additions & 2 deletionscoderd/telemetry/telemetry.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -1240,7 +1240,7 @@ type NetworkEvent struct {
NodeIDSelf uint64 `json:"node_id_self"`
NodeIDRemote uint64 `json:"node_id_remote"`
P2PEndpoint NetworkEventP2PEndpoint `json:"p2p_endpoint"`
HomeDERPstring `json:"home_derp"`
HomeDERPint `json:"home_derp"`
DERPMap DERPMap `json:"derp_map"`
LatestNetcheck Netcheck `json:"latest_netcheck"`

Expand DownExpand Up@@ -1286,7 +1286,7 @@ func NetworkEventFromProto(proto *tailnetproto.TelemetryEvent) (NetworkEvent, er
NodeIDSelf: proto.NodeIdSelf,
NodeIDRemote: proto.NodeIdRemote,
P2PEndpoint: p2pEndpointFromProto(proto.P2PEndpoint),
HomeDERP: proto.HomeDerp,
HomeDERP:int(proto.HomeDerp),
DERPMap: derpMapFromProto(proto.DerpMap),
LatestNetcheck: netcheckFromProto(proto.LatestNetcheck),

Expand Down
4 changes: 0 additions & 4 deletionscodersdk/workspacesdk/agentconn.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -380,7 +380,3 @@ func (c *AgentConn) apiClient() *http.Client {
func (c *AgentConn) GetPeerDiagnostics() tailnet.PeerDiagnostics {
return c.Conn.GetPeerDiagnostics(c.opts.AgentID)
}

func (c *AgentConn) SendDisconnectedTelemetry(application string) {
c.Conn.SendDisconnectedTelemetry(c.agentAddress(), application)
}
12 changes: 6 additions & 6 deletionscodersdk/workspacesdk/connector_internal_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -228,12 +228,12 @@ func TestTailnetAPIConnector_TelemetryUnimplemented(t *testing.T) {
return uut.client != nil
}, testutil.WaitShort, testutil.IntervalFast)

fakeDRPCClient.telemeteryErorr = drpcerr.WithCode(xerrors.New("Unimplemented"), 0)
fakeDRPCClient.telemetryError = drpcerr.WithCode(xerrors.New("Unimplemented"), 0)
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
require.False(t, uut.telemetryUnavailable.Load())
require.Equal(t, int64(1), atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls))

fakeDRPCClient.telemeteryErorr = drpcerr.WithCode(xerrors.New("Unimplemented"), drpcerr.Unimplemented)
fakeDRPCClient.telemetryError = drpcerr.WithCode(xerrors.New("Unimplemented"), drpcerr.Unimplemented)
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
require.True(t, uut.telemetryUnavailable.Load())
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
Expand DownExpand Up@@ -268,12 +268,12 @@ func TestTailnetAPIConnector_TelemetryNotRecognised(t *testing.T) {
return uut.client != nil
}, testutil.WaitShort, testutil.IntervalFast)

fakeDRPCClient.telemeteryErorr = drpc.ProtocolError.New("Protocol Error")
fakeDRPCClient.telemetryError = drpc.ProtocolError.New("Protocol Error")
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
require.False(t, uut.telemetryUnavailable.Load())
require.Equal(t, int64(1), atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls))

fakeDRPCClient.telemeteryErorr = drpc.ProtocolError.New("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry")
fakeDRPCClient.telemetryError = drpc.ProtocolError.New("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry")
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
require.True(t, uut.telemetryUnavailable.Load())
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
Expand DownExpand Up@@ -301,7 +301,7 @@ func newFakeTailnetConn() *fakeTailnetConn {

type fakeDRPCClient struct {
postTelemetryCalls int64
telemeteryErorr error
telemetryError error
fakeDRPPCMapStream
}

Expand DownExpand Up@@ -331,7 +331,7 @@ func (*fakeDRPCClient) DRPCConn() drpc.Conn {
// PostTelemetry implements proto.DRPCTailnetClient.
func (f *fakeDRPCClient) PostTelemetry(_ context.Context, _ *proto.TelemetryRequest) (*proto.TelemetryResponse, error) {
atomic.AddInt64(&f.postTelemetryCalls, 1)
return nil, f.telemeteryErorr
return nil, f.telemetryError
}

// StreamDERPMaps implements proto.DRPCTailnetClient.
Expand Down
130 changes: 77 additions & 53 deletionstailnet/conn.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -31,6 +31,7 @@ import (
"tailscale.com/types/key"
tslogger "tailscale.com/types/logger"
"tailscale.com/types/netlogtype"
"tailscale.com/types/netmap"
"tailscale.com/wgengine"
"tailscale.com/wgengine/capture"
"tailscale.com/wgengine/magicsock"
Expand DownExpand Up@@ -262,17 +263,8 @@ func NewConn(options *Options) (conn *Conn, err error) {
)
nodeUp.setAddresses(options.Addresses)
nodeUp.setBlockEndpoints(options.BlockEndpoints)
wireguardEngine.SetStatusCallback(nodeUp.setStatus)
magicConn.SetDERPForcedWebsocketCallback(nodeUp.setDERPForcedWebsocket)
if telemetryStore != nil {
wireguardEngine.SetNetInfoCallback(func(ni *tailcfg.NetInfo) {
nodeUp.setNetInfo(ni)
telemetryStore.setNetInfo(ni)
})
} else {
wireguardEngine.SetNetInfoCallback(nodeUp.setNetInfo)
}

ctx, ctxCancel := context.WithCancel(context.Background())
server := &Conn{
id: uuid.New(),
closed: make(chan struct{}),
Expand All@@ -290,13 +282,32 @@ func NewConn(options *Options) (conn *Conn, err error) {
configMaps: cfgMaps,
nodeUpdater: nodeUp,
telemetrySink: options.TelemetrySink,
telemeteryStore: telemetryStore,
telemetryStore: telemetryStore,
createdAt: time.Now(),
watchCtx: ctx,
watchCancel: ctxCancel,
}
defer func() {
if err != nil {
_ = server.Close()
}
}()
if server.telemetryStore != nil {
server.wireguardEngine.SetNetInfoCallback(func(ni *tailcfg.NetInfo) {
server.telemetryStore.setNetInfo(ni)
nodeUp.setNetInfo(ni)
server.telemetryStore.pingPeer(server)
})
server.wireguardEngine.AddNetworkMapCallback(func(nm *netmap.NetworkMap) {
Copy link
MemberAuthor

@ethanndicksonethanndicksonJul 8, 2024
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

We use this callback to ensure we always have the latest Tailscale node data for a given peer.

server.telemetryStore.updateNetworkMap(nm)
server.telemetryStore.pingPeer(server)
})
go server.watchConnChange()
} else {
server.wireguardEngine.SetNetInfoCallback(nodeUp.setNetInfo)
}
server.wireguardEngine.SetStatusCallback(nodeUp.setStatus)
server.magicConn.SetDERPForcedWebsocketCallback(nodeUp.setDERPForcedWebsocket)

netStack.GetTCPHandlerForFlow = server.forwardTCP

Expand DownExpand Up@@ -351,11 +362,15 @@ type Conn struct {
wireguardEngine wgengine.Engine
listeners map[listenKey]*listener
clientType proto.TelemetryEvent_ClientType
createdAt time.Time

telemetrySink TelemetrySink
// telemeteryStore will be nil if telemetrySink is nil.
telemeteryStore *TelemetryStore
telemetryWg sync.WaitGroup
// telemetryStore will be nil if telemetrySink is nil.
telemetryStore *TelemetryStore
telemetryWg sync.WaitGroup

watchCtx context.Context
watchCancel func()

trafficStats *connstats.Statistics
}
Expand DownExpand Up@@ -390,8 +405,8 @@ func (c *Conn) SetNodeCallback(callback func(node *Node)) {

// SetDERPMap updates the DERPMap of a connection.
func (c *Conn) SetDERPMap(derpMap *tailcfg.DERPMap) {
if c.configMaps.setDERPMap(derpMap) && c.telemeteryStore != nil {
c.telemeteryStore.updateDerpMap(derpMap)
if c.configMaps.setDERPMap(derpMap) && c.telemetryStore != nil {
c.telemetryStore.updateDerpMap(derpMap)
}
}

Expand DownExpand Up@@ -540,6 +555,7 @@ func (c *Conn) Closed() <-chan struct{} {
// Close shuts down the Wireguard connection.
func (c *Conn) Close() error {
c.logger.Info(context.Background(), "closing tailnet Conn")
c.watchCancel()
c.telemetryWg.Wait()
c.configMaps.close()
c.nodeUpdater.close()
Expand DownExpand Up@@ -709,54 +725,34 @@ func (c *Conn) MagicsockServeHTTPDebug(w http.ResponseWriter, r *http.Request) {
c.magicConn.ServeHTTPDebug(w, r)
}

// SendConnectedTelemetry should be called when connection to a peer with the given IP is established.
func (c *Conn) SendConnectedTelemetry(ip netip.Addr, application string) {
if c.telemetrySink == nil {
return
}
c.telemetryStore.markConnected(&ip, application)
e := c.newTelemetryEvent()
e.Status = proto.TelemetryEvent_CONNECTED
e.Application = application
pip, ok := c.wireguardEngine.PeerForIP(ip)
if ok {
e.NodeIdRemote = uint64(pip.Node.ID)
}
c.telemetryWg.Add(1)
go func() {
defer c.telemetryWg.Done()
c.telemetrySink.SendTelemetryEvent(e)
}()
c.sendTelemetryBackground(e)
}

func (c *Conn) SendDisconnectedTelemetry(ip netip.Addr, application string) {
func (c *Conn) SendDisconnectedTelemetry() {
if c.telemetrySink == nil {
return
}
e := c.newTelemetryEvent()
e.Status = proto.TelemetryEvent_DISCONNECTED
e.Application = application
pip, ok := c.wireguardEngine.PeerForIP(ip)
if ok {
e.NodeIdRemote = uint64(pip.Node.ID)
}
c.telemetryWg.Add(1)
go func() {
defer c.telemetryWg.Done()
c.telemetrySink.SendTelemetryEvent(e)
}()
c.sendTelemetryBackground(e)
}

func (c *Conn) SendSpeedtestTelemetry(throughputMbits float64) {
if c.telemetrySink == nil {
return
}
e := c.newTelemetryEvent()
e.Status = proto.TelemetryEvent_CONNECTED
e.ThroughputMbits = wrapperspb.Float(float32(throughputMbits))
c.telemetryWg.Add(1)
go func() {
defer c.telemetryWg.Done()
c.telemetrySink.SendTelemetryEvent(e)
}()
e.Status = proto.TelemetryEvent_CONNECTED
c.sendTelemetryBackground(e)
}

// nolint:revive
Expand All@@ -769,31 +765,59 @@ func (c *Conn) sendPingTelemetry(pr *ipnstate.PingResult) {
latency := durationpb.New(time.Duration(pr.LatencySeconds * float64(time.Second)))
if pr.Endpoint != "" {
e.P2PLatency = latency
e.P2PEndpoint = c.telemeteryStore.toEndpoint(pr.Endpoint)
e.P2PEndpoint = c.telemetryStore.toEndpoint(pr.Endpoint)
} else {
e.DerpLatency = latency
}
e.Status = proto.TelemetryEvent_CONNECTED
c.telemetryWg.Add(1)
go func() {
defer c.telemetryWg.Done()
c.telemetrySink.SendTelemetryEvent(e)
}()
c.sendTelemetryBackground(e)
}

// The returned telemetry event will not have it's status set.
func (c *Conn) newTelemetryEvent() *proto.TelemetryEvent {
// Infallible
id, _ := c.id.MarshalBinary()
event := c.telemeteryStore.newEvent()
event := c.telemetryStore.newEvent()
event.ClientType = c.clientType
event.Id = id
selfNode := c.Node()
event.NodeIdSelf = uint64(selfNode.ID)
event.HomeDerp = strconv.Itoa(selfNode.PreferredDERP)
event.ConnectionAge = durationpb.New(time.Since(c.createdAt))
return event
}

func (c *Conn) sendTelemetryBackground(e *proto.TelemetryEvent) {
c.telemetryWg.Add(1)
go func() {
defer c.telemetryWg.Done()
c.telemetrySink.SendTelemetryEvent(e)
}()
}

// Watch for changes in the connection type (P2P<->DERP) and send telemetry events.
func (c *Conn) watchConnChange() {
ticker := time.NewTicker(time.Millisecond * 50)
Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Because we have to manually check if the connection has changed like this, P2PSetup can be ~50ms off it's true value.

defer ticker.Stop()
for {
select {
case <-c.watchCtx.Done():
return
case <-ticker.C:
}
status := c.Status()
peers := status.Peers()
if len(peers) > 1 {
// Not a CLI<->agent connection, stop watching
return
} else if len(peers) == 0 {
continue
}
peer := status.Peer[peers[0]]
// If the connection type has changed, send a telemetry event with the latest ping stats
if c.telemetryStore.changedConntype(peer.CurAddr) {
c.telemetryStore.pingPeer(c)
}
}
}

// PeerDiagnostics is a checklist of human-readable conditions necessary to establish an encrypted
// tunnel to a peer via a Conn
type PeerDiagnostics struct {
Expand Down
8 changes: 4 additions & 4 deletionstailnet/proto/tailnet.pb.go
View file
Open in desktop

Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.

2 changes: 1 addition & 1 deletiontailnet/proto/tailnet.proto
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -169,7 +169,7 @@ message TelemetryEvent {
uint64 node_id_self = 7;
uint64 node_id_remote = 8;
P2PEndpoint p2p_endpoint = 9;
string home_derp = 10;
int32 home_derp = 10;
DERPMap derp_map = 11;
Netcheck latest_netcheck = 12;

Expand Down
Loading
Loading

[8]ページ先頭

©2009-2025 Movatter.jp