- Notifications
You must be signed in to change notification settings - Fork928
chore: add additional network telemetry stats & events#13800
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
6ee3bfa
6e0ec17
ae17bb3
612ae5f
d2d9c39
af36c37
3466ba6
dbffec4
b240507
e2e4018
0bebad4
6a19849
11f2a2b
fe7252a
4b76942
6214e2a
60e88f7
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -31,6 +31,7 @@ import ( | ||
"tailscale.com/types/key" | ||
tslogger "tailscale.com/types/logger" | ||
"tailscale.com/types/netlogtype" | ||
"tailscale.com/types/netmap" | ||
"tailscale.com/wgengine" | ||
"tailscale.com/wgengine/capture" | ||
"tailscale.com/wgengine/magicsock" | ||
@@ -262,17 +263,8 @@ func NewConn(options *Options) (conn *Conn, err error) { | ||
) | ||
nodeUp.setAddresses(options.Addresses) | ||
nodeUp.setBlockEndpoints(options.BlockEndpoints) | ||
ctx, ctxCancel := context.WithCancel(context.Background()) | ||
server := &Conn{ | ||
id: uuid.New(), | ||
closed: make(chan struct{}), | ||
@@ -290,13 +282,32 @@ func NewConn(options *Options) (conn *Conn, err error) { | ||
configMaps: cfgMaps, | ||
nodeUpdater: nodeUp, | ||
telemetrySink: options.TelemetrySink, | ||
telemetryStore: telemetryStore, | ||
createdAt: time.Now(), | ||
watchCtx: ctx, | ||
watchCancel: ctxCancel, | ||
} | ||
defer func() { | ||
if err != nil { | ||
_ = server.Close() | ||
} | ||
}() | ||
if server.telemetryStore != nil { | ||
server.wireguardEngine.SetNetInfoCallback(func(ni *tailcfg.NetInfo) { | ||
server.telemetryStore.setNetInfo(ni) | ||
nodeUp.setNetInfo(ni) | ||
server.telemetryStore.pingPeer(server) | ||
}) | ||
server.wireguardEngine.AddNetworkMapCallback(func(nm *netmap.NetworkMap) { | ||
MemberAuthor
| ||
server.telemetryStore.updateNetworkMap(nm) | ||
server.telemetryStore.pingPeer(server) | ||
}) | ||
go server.watchConnChange() | ||
} else { | ||
server.wireguardEngine.SetNetInfoCallback(nodeUp.setNetInfo) | ||
} | ||
server.wireguardEngine.SetStatusCallback(nodeUp.setStatus) | ||
server.magicConn.SetDERPForcedWebsocketCallback(nodeUp.setDERPForcedWebsocket) | ||
netStack.GetTCPHandlerForFlow = server.forwardTCP | ||
@@ -351,11 +362,15 @@ type Conn struct { | ||
wireguardEngine wgengine.Engine | ||
listeners map[listenKey]*listener | ||
clientType proto.TelemetryEvent_ClientType | ||
createdAt time.Time | ||
telemetrySink TelemetrySink | ||
// telemetryStore will be nil if telemetrySink is nil. | ||
telemetryStore *TelemetryStore | ||
telemetryWg sync.WaitGroup | ||
watchCtx context.Context | ||
watchCancel func() | ||
trafficStats *connstats.Statistics | ||
} | ||
@@ -390,8 +405,8 @@ func (c *Conn) SetNodeCallback(callback func(node *Node)) { | ||
// SetDERPMap updates the DERPMap of a connection. | ||
func (c *Conn) SetDERPMap(derpMap *tailcfg.DERPMap) { | ||
if c.configMaps.setDERPMap(derpMap) && c.telemetryStore != nil { | ||
c.telemetryStore.updateDerpMap(derpMap) | ||
} | ||
} | ||
@@ -540,6 +555,7 @@ func (c *Conn) Closed() <-chan struct{} { | ||
// Close shuts down the Wireguard connection. | ||
func (c *Conn) Close() error { | ||
c.logger.Info(context.Background(), "closing tailnet Conn") | ||
c.watchCancel() | ||
c.telemetryWg.Wait() | ||
c.configMaps.close() | ||
c.nodeUpdater.close() | ||
@@ -709,54 +725,34 @@ func (c *Conn) MagicsockServeHTTPDebug(w http.ResponseWriter, r *http.Request) { | ||
c.magicConn.ServeHTTPDebug(w, r) | ||
} | ||
// SendConnectedTelemetry should be called when connection to a peer with the given IP is established. | ||
func (c *Conn) SendConnectedTelemetry(ip netip.Addr, application string) { | ||
if c.telemetrySink == nil { | ||
return | ||
} | ||
c.telemetryStore.markConnected(&ip, application) | ||
e := c.newTelemetryEvent() | ||
e.Status = proto.TelemetryEvent_CONNECTED | ||
c.sendTelemetryBackground(e) | ||
} | ||
func (c *Conn) SendDisconnectedTelemetry() { | ||
if c.telemetrySink == nil { | ||
return | ||
} | ||
e := c.newTelemetryEvent() | ||
e.Status = proto.TelemetryEvent_DISCONNECTED | ||
c.sendTelemetryBackground(e) | ||
} | ||
func (c *Conn) SendSpeedtestTelemetry(throughputMbits float64) { | ||
if c.telemetrySink == nil { | ||
return | ||
} | ||
e := c.newTelemetryEvent() | ||
e.ThroughputMbits = wrapperspb.Float(float32(throughputMbits)) | ||
e.Status = proto.TelemetryEvent_CONNECTED | ||
c.sendTelemetryBackground(e) | ||
} | ||
// nolint:revive | ||
@@ -769,31 +765,59 @@ func (c *Conn) sendPingTelemetry(pr *ipnstate.PingResult) { | ||
latency := durationpb.New(time.Duration(pr.LatencySeconds * float64(time.Second))) | ||
if pr.Endpoint != "" { | ||
e.P2PLatency = latency | ||
e.P2PEndpoint = c.telemetryStore.toEndpoint(pr.Endpoint) | ||
} else { | ||
e.DerpLatency = latency | ||
} | ||
e.Status = proto.TelemetryEvent_CONNECTED | ||
c.sendTelemetryBackground(e) | ||
} | ||
// The returned telemetry event will not have it's status set. | ||
func (c *Conn) newTelemetryEvent() *proto.TelemetryEvent { | ||
// Infallible | ||
id, _ := c.id.MarshalBinary() | ||
event := c.telemetryStore.newEvent() | ||
event.ClientType = c.clientType | ||
event.Id = id | ||
event.ConnectionAge = durationpb.New(time.Since(c.createdAt)) | ||
return event | ||
} | ||
func (c *Conn) sendTelemetryBackground(e *proto.TelemetryEvent) { | ||
c.telemetryWg.Add(1) | ||
go func() { | ||
defer c.telemetryWg.Done() | ||
c.telemetrySink.SendTelemetryEvent(e) | ||
}() | ||
} | ||
// Watch for changes in the connection type (P2P<->DERP) and send telemetry events. | ||
func (c *Conn) watchConnChange() { | ||
ticker := time.NewTicker(time.Millisecond * 50) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Because we have to manually check if the connection has changed like this, P2PSetup can be ~50ms off it's true value. | ||
defer ticker.Stop() | ||
for { | ||
select { | ||
case <-c.watchCtx.Done(): | ||
return | ||
case <-ticker.C: | ||
} | ||
status := c.Status() | ||
peers := status.Peers() | ||
if len(peers) > 1 { | ||
// Not a CLI<->agent connection, stop watching | ||
return | ||
} else if len(peers) == 0 { | ||
continue | ||
} | ||
peer := status.Peer[peers[0]] | ||
// If the connection type has changed, send a telemetry event with the latest ping stats | ||
if c.telemetryStore.changedConntype(peer.CurAddr) { | ||
c.telemetryStore.pingPeer(c) | ||
} | ||
} | ||
} | ||
// PeerDiagnostics is a checklist of human-readable conditions necessary to establish an encrypted | ||
// tunnel to a peer via a Conn | ||
type PeerDiagnostics struct { | ||
Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.