Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

chore: add additional network telemetry stats & events#13800

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
ethanndickson merged 17 commits intomainfromethan/more-telemetry
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from1 commit
Commits
Show all changes
17 commits
Select commitHold shift + click to select a range
6ee3bfa
chore: add additional network telemetry stats & events
ethanndicksonJul 8, 2024
6e0ec17
better connectionage
ethanndicksonJul 8, 2024
ae17bb3
send update events only when home derp changes
ethanndicksonJul 8, 2024
612ae5f
better remote node id
ethanndicksonJul 8, 2024
d2d9c39
use netmap callback
ethanndicksonJul 8, 2024
af36c37
fixup
ethanndicksonJul 8, 2024
3466ba6
fixup
ethanndicksonJul 8, 2024
dbffec4
test
ethanndicksonJul 8, 2024
b240507
set p2p setup
ethanndicksonJul 9, 2024
e2e4018
fixup
ethanndicksonJul 9, 2024
0bebad4
watch for conn changes
ethanndicksonJul 9, 2024
6a19849
fixup
ethanndicksonJul 9, 2024
11f2a2b
use latest p2p setup time
ethanndicksonJul 9, 2024
fe7252a
ensure latest derpmap
ethanndicksonJul 9, 2024
4b76942
nicer ping peer
ethanndicksonJul 9, 2024
6214e2a
fix race
ethanndicksonJul 9, 2024
60e88f7
relay -> udp addr
ethanndicksonJul 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
NextNext commit
chore: add additional network telemetry stats & events
  • Loading branch information
@ethanndickson
ethanndickson committedJul 8, 2024
commit6ee3bfa3c6b919a7f55c8dc98146a570450ecbb7
3 changes: 2 additions & 1 deletioncli/ssh.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -37,6 +37,7 @@ import (
"github.com/coder/coder/v2/codersdk/workspacesdk"
"github.com/coder/coder/v2/cryptorand"
"github.com/coder/coder/v2/pty"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/retry"
"github.com/coder/serpent"
)
Expand DownExpand Up@@ -437,7 +438,7 @@ func (r *RootCmd) ssh() *serpent.Command {
}

err = sshSession.Wait()
conn.SendDisconnectedTelemetry("ssh")
conn.SendDisconnectedTelemetry(tailnet.TelemetryApplicationSSH)
if err != nil {
if exitErr := (&gossh.ExitError{}); errors.As(err, &exitErr) {
// Clear the error since it's not useful beyond
Expand Down
4 changes: 2 additions & 2 deletionscoderd/telemetry/telemetry.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -1240,7 +1240,7 @@ type NetworkEvent struct {
NodeIDSelf uint64 `json:"node_id_self"`
NodeIDRemote uint64 `json:"node_id_remote"`
P2PEndpoint NetworkEventP2PEndpoint `json:"p2p_endpoint"`
HomeDERPstring `json:"home_derp"`
HomeDERPint `json:"home_derp"`
DERPMap DERPMap `json:"derp_map"`
LatestNetcheck Netcheck `json:"latest_netcheck"`

Expand DownExpand Up@@ -1286,7 +1286,7 @@ func NetworkEventFromProto(proto *tailnetproto.TelemetryEvent) (NetworkEvent, er
NodeIDSelf: proto.NodeIdSelf,
NodeIDRemote: proto.NodeIdRemote,
P2PEndpoint: p2pEndpointFromProto(proto.P2PEndpoint),
HomeDERP: proto.HomeDerp,
HomeDERP:int(proto.HomeDerp),
DERPMap: derpMapFromProto(proto.DerpMap),
LatestNetcheck: netcheckFromProto(proto.LatestNetcheck),

Expand Down
12 changes: 6 additions & 6 deletionscodersdk/workspacesdk/connector_internal_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -228,12 +228,12 @@ func TestTailnetAPIConnector_TelemetryUnimplemented(t *testing.T) {
return uut.client != nil
}, testutil.WaitShort, testutil.IntervalFast)

fakeDRPCClient.telemeteryErorr = drpcerr.WithCode(xerrors.New("Unimplemented"), 0)
fakeDRPCClient.telemetryError = drpcerr.WithCode(xerrors.New("Unimplemented"), 0)
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
require.False(t, uut.telemetryUnavailable.Load())
require.Equal(t, int64(1), atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls))

fakeDRPCClient.telemeteryErorr = drpcerr.WithCode(xerrors.New("Unimplemented"), drpcerr.Unimplemented)
fakeDRPCClient.telemetryError = drpcerr.WithCode(xerrors.New("Unimplemented"), drpcerr.Unimplemented)
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
require.True(t, uut.telemetryUnavailable.Load())
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
Expand DownExpand Up@@ -268,12 +268,12 @@ func TestTailnetAPIConnector_TelemetryNotRecognised(t *testing.T) {
return uut.client != nil
}, testutil.WaitShort, testutil.IntervalFast)

fakeDRPCClient.telemeteryErorr = drpc.ProtocolError.New("Protocol Error")
fakeDRPCClient.telemetryError = drpc.ProtocolError.New("Protocol Error")
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
require.False(t, uut.telemetryUnavailable.Load())
require.Equal(t, int64(1), atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls))

fakeDRPCClient.telemeteryErorr = drpc.ProtocolError.New("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry")
fakeDRPCClient.telemetryError = drpc.ProtocolError.New("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry")
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
require.True(t, uut.telemetryUnavailable.Load())
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
Expand DownExpand Up@@ -301,7 +301,7 @@ func newFakeTailnetConn() *fakeTailnetConn {

type fakeDRPCClient struct {
postTelemetryCalls int64
telemeteryErorr error
telemetryError error
fakeDRPPCMapStream
}

Expand DownExpand Up@@ -331,7 +331,7 @@ func (*fakeDRPCClient) DRPCConn() drpc.Conn {
// PostTelemetry implements proto.DRPCTailnetClient.
func (f *fakeDRPCClient) PostTelemetry(_ context.Context, _ *proto.TelemetryRequest) (*proto.TelemetryResponse, error) {
atomic.AddInt64(&f.postTelemetryCalls, 1)
return nil, f.telemeteryErorr
return nil, f.telemetryError
}

// StreamDERPMaps implements proto.DRPCTailnetClient.
Expand Down
52 changes: 40 additions & 12 deletionstailnet/conn.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -290,13 +290,15 @@ func NewConn(options *Options) (conn *Conn, err error) {
configMaps: cfgMaps,
nodeUpdater: nodeUp,
telemetrySink: options.TelemetrySink,
telemeteryStore: telemetryStore,
telemetryStore: telemetryStore,
createdAt: time.Now(),
}
defer func() {
if err != nil {
_ = server.Close()
}
}()
server.SetNodeCallback(nil)

netStack.GetTCPHandlerForFlow = server.forwardTCP

Expand DownExpand Up@@ -351,11 +353,12 @@ type Conn struct {
wireguardEngine wgengine.Engine
listeners map[listenKey]*listener
clientType proto.TelemetryEvent_ClientType
createdAt time.Time

telemetrySink TelemetrySink
//telemeteryStore will be nil if telemetrySink is nil.
telemeteryStore *TelemetryStore
telemetryWgsync.WaitGroup
//telemetryStore will be nil if telemetrySink is nil.
telemetryStore *TelemetryStore
telemetryWg sync.WaitGroup

trafficStats *connstats.Statistics
}
Expand DownExpand Up@@ -384,14 +387,25 @@ func (c *Conn) SetAddresses(ips []netip.Prefix) error {
return nil
}

// Sets the callback for when the node is updated.
// If telemetry is enabled, the callback will first update the telemetry store,
// send the updated telemetry, and then call the provided callback.
func (c *Conn) SetNodeCallback(callback func(node *Node)) {
c.nodeUpdater.setCallback(callback)
if c.telemetryStore != nil {
c.nodeUpdater.setCallback(func(node *Node) {
c.telemetryStore.updateByNode(node)
c.sendUpdatedTelemetry()
callback(node)
})
} else {
c.nodeUpdater.setCallback(callback)
}
}

// SetDERPMap updates the DERPMap of a connection.
func (c *Conn) SetDERPMap(derpMap *tailcfg.DERPMap) {
if c.configMaps.setDERPMap(derpMap) && c.telemeteryStore != nil {
c.telemeteryStore.updateDerpMap(derpMap)
if c.configMaps.setDERPMap(derpMap) && c.telemetryStore != nil {
c.telemetryStore.updateDerpMap(derpMap)
}
}

Expand DownExpand Up@@ -715,6 +729,7 @@ func (c *Conn) SendConnectedTelemetry(ip netip.Addr, application string) {
}
e := c.newTelemetryEvent()
e.Status = proto.TelemetryEvent_CONNECTED
e.ConnectionSetup = durationpb.New(time.Since(c.createdAt))
e.Application = application
pip, ok := c.wireguardEngine.PeerForIP(ip)
if ok {
Expand All@@ -727,6 +742,21 @@ func (c *Conn) SendConnectedTelemetry(ip netip.Addr, application string) {
}()
}

// Called whenever the Node is updated.
// Expects that the telemetry store has the latest node information.
func (c *Conn) sendUpdatedTelemetry() {
if c.telemetrySink == nil {
return
}
e := c.newTelemetryEvent()
e.Status = proto.TelemetryEvent_CONNECTED
c.telemetryWg.Add(1)
go func() {
defer c.telemetryWg.Done()
c.telemetrySink.SendTelemetryEvent(e)
}()
}

func (c *Conn) SendDisconnectedTelemetry(ip netip.Addr, application string) {
if c.telemetrySink == nil {
return
Expand DownExpand Up@@ -769,7 +799,7 @@ func (c *Conn) sendPingTelemetry(pr *ipnstate.PingResult) {
latency := durationpb.New(time.Duration(pr.LatencySeconds * float64(time.Second)))
if pr.Endpoint != "" {
e.P2PLatency = latency
e.P2PEndpoint = c.telemeteryStore.toEndpoint(pr.Endpoint)
e.P2PEndpoint = c.telemetryStore.toEndpoint(pr.Endpoint)
} else {
e.DerpLatency = latency
}
Expand All@@ -785,12 +815,10 @@ func (c *Conn) sendPingTelemetry(pr *ipnstate.PingResult) {
func (c *Conn) newTelemetryEvent() *proto.TelemetryEvent {
// Infallible
id, _ := c.id.MarshalBinary()
event := c.telemeteryStore.newEvent()
event := c.telemetryStore.newEvent()
event.ClientType = c.clientType
event.Id = id
selfNode := c.Node()
event.NodeIdSelf = uint64(selfNode.ID)
event.HomeDerp = strconv.Itoa(selfNode.PreferredDERP)
event.ConnectionAge = durationpb.New(time.Since(c.createdAt))
return event
}

Expand Down
8 changes: 4 additions & 4 deletionstailnet/proto/tailnet.pb.go
View file
Open in desktop

Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.

2 changes: 1 addition & 1 deletiontailnet/proto/tailnet.proto
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -169,7 +169,7 @@ message TelemetryEvent {
uint64 node_id_self = 7;
uint64 node_id_remote = 8;
P2PEndpoint p2p_endpoint = 9;
string home_derp = 10;
int32 home_derp = 10;
DERPMap derp_map = 11;
Netcheck latest_netcheck = 12;

Expand Down
24 changes: 20 additions & 4 deletionstailnet/telemetry.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -31,6 +31,8 @@ type TelemetryStore struct {

cleanDerpMap *tailcfg.DERPMap
cleanNetCheck *proto.Netcheck
nodeID uint64
homeDerp int32
}

func newTelemetryStore() (*TelemetryStore, error) {
Expand All@@ -53,11 +55,11 @@ func (b *TelemetryStore) newEvent() *proto.TelemetryEvent {
Time: timestamppb.Now(),
DerpMap: DERPMapToProto(b.cleanDerpMap),
LatestNetcheck: b.cleanNetCheck,
NodeIdSelf: b.nodeID,
HomeDerp: b.homeDerp,

// TODO(ethanndickson):
ConnectionAge: &durationpb.Duration{},
ConnectionSetup: &durationpb.Duration{},
P2PSetup: &durationpb.Duration{},
P2PSetup: &durationpb.Duration{},
}
}

Expand DownExpand Up@@ -85,11 +87,24 @@ func (b *TelemetryStore) updateDerpMap(cur *tailcfg.DERPMap) {
b.cleanDerpMap = cleanMap
}

func (b *TelemetryStore) updateByNode(n *Node) {
b.mu.Lock()
defer b.mu.Unlock()

b.nodeID = uint64(n.ID)
b.homeDerp = int32(n.PreferredDERP)
}

// Store an anonymized proto.Netcheck given a tailscale NetInfo.
func (b *TelemetryStore) setNetInfo(ni *tailcfg.NetInfo) {
func (b *TelemetryStore) setNetInfo(ni *tailcfg.NetInfo)bool{
b.mu.Lock()
defer b.mu.Unlock()

derpHomeChanged := false
if b.cleanNetCheck != nil {
derpHomeChanged = b.cleanNetCheck.PreferredDERP != int64(ni.PreferredDERP)
}

b.cleanNetCheck = &proto.Netcheck{
UDP: ni.UDP,
IPv6: ni.IPv6,
Expand DownExpand Up@@ -127,6 +142,7 @@ func (b *TelemetryStore) setNetInfo(ni *tailcfg.NetInfo) {
for rid, seconds := range ni.DERPLatencyV6 {
b.cleanNetCheck.RegionV6Latency[int64(rid)] = durationpb.New(time.Duration(seconds * float64(time.Second)))
}
return derpHomeChanged
}

func (b *TelemetryStore) toEndpoint(ipport string) *proto.TelemetryEvent_P2PEndpoint {
Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp