@@ -31,6 +31,7 @@ import (
31
31
"tailscale.com/types/key"
32
32
tslogger"tailscale.com/types/logger"
33
33
"tailscale.com/types/netlogtype"
34
+ "tailscale.com/types/netmap"
34
35
"tailscale.com/wgengine"
35
36
"tailscale.com/wgengine/capture"
36
37
"tailscale.com/wgengine/magicsock"
@@ -262,17 +263,8 @@ func NewConn(options *Options) (conn *Conn, err error) {
262
263
)
263
264
nodeUp .setAddresses (options .Addresses )
264
265
nodeUp .setBlockEndpoints (options .BlockEndpoints )
265
- wireguardEngine .SetStatusCallback (nodeUp .setStatus )
266
- magicConn .SetDERPForcedWebsocketCallback (nodeUp .setDERPForcedWebsocket )
267
- if telemetryStore != nil {
268
- wireguardEngine .SetNetInfoCallback (func (ni * tailcfg.NetInfo ) {
269
- nodeUp .setNetInfo (ni )
270
- telemetryStore .setNetInfo (ni )
271
- })
272
- }else {
273
- wireguardEngine .SetNetInfoCallback (nodeUp .setNetInfo )
274
- }
275
266
267
+ ctx ,ctxCancel := context .WithCancel (context .Background ())
276
268
server := & Conn {
277
269
id :uuid .New (),
278
270
closed :make (chan struct {}),
@@ -290,13 +282,32 @@ func NewConn(options *Options) (conn *Conn, err error) {
290
282
configMaps :cfgMaps ,
291
283
nodeUpdater :nodeUp ,
292
284
telemetrySink :options .TelemetrySink ,
293
- telemeteryStore :telemetryStore ,
285
+ telemetryStore :telemetryStore ,
286
+ createdAt :time .Now (),
287
+ watchCtx :ctx ,
288
+ watchCancel :ctxCancel ,
294
289
}
295
290
defer func () {
296
291
if err != nil {
297
292
_ = server .Close ()
298
293
}
299
294
}()
295
+ if server .telemetryStore != nil {
296
+ server .wireguardEngine .SetNetInfoCallback (func (ni * tailcfg.NetInfo ) {
297
+ server .telemetryStore .setNetInfo (ni )
298
+ nodeUp .setNetInfo (ni )
299
+ server .telemetryStore .pingPeer (server )
300
+ })
301
+ server .wireguardEngine .AddNetworkMapCallback (func (nm * netmap.NetworkMap ) {
302
+ server .telemetryStore .updateNetworkMap (nm )
303
+ server .telemetryStore .pingPeer (server )
304
+ })
305
+ go server .watchConnChange ()
306
+ }else {
307
+ server .wireguardEngine .SetNetInfoCallback (nodeUp .setNetInfo )
308
+ }
309
+ server .wireguardEngine .SetStatusCallback (nodeUp .setStatus )
310
+ server .magicConn .SetDERPForcedWebsocketCallback (nodeUp .setDERPForcedWebsocket )
300
311
301
312
netStack .GetTCPHandlerForFlow = server .forwardTCP
302
313
@@ -351,11 +362,15 @@ type Conn struct {
351
362
wireguardEngine wgengine.Engine
352
363
listeners map [listenKey ]* listener
353
364
clientType proto.TelemetryEvent_ClientType
365
+ createdAt time.Time
354
366
355
367
telemetrySink TelemetrySink
356
- // telemeteryStore will be nil if telemetrySink is nil.
357
- telemeteryStore * TelemetryStore
358
- telemetryWg sync.WaitGroup
368
+ // telemetryStore will be nil if telemetrySink is nil.
369
+ telemetryStore * TelemetryStore
370
+ telemetryWg sync.WaitGroup
371
+
372
+ watchCtx context.Context
373
+ watchCancel func ()
359
374
360
375
trafficStats * connstats.Statistics
361
376
}
@@ -390,8 +405,8 @@ func (c *Conn) SetNodeCallback(callback func(node *Node)) {
390
405
391
406
// SetDERPMap updates the DERPMap of a connection.
392
407
func (c * Conn )SetDERPMap (derpMap * tailcfg.DERPMap ) {
393
- if c .configMaps .setDERPMap (derpMap )&& c .telemeteryStore != nil {
394
- c .telemeteryStore .updateDerpMap (derpMap )
408
+ if c .configMaps .setDERPMap (derpMap )&& c .telemetryStore != nil {
409
+ c .telemetryStore .updateDerpMap (derpMap )
395
410
}
396
411
}
397
412
@@ -540,6 +555,7 @@ func (c *Conn) Closed() <-chan struct{} {
540
555
// Close shuts down the Wireguard connection.
541
556
func (c * Conn )Close ()error {
542
557
c .logger .Info (context .Background (),"closing tailnet Conn" )
558
+ c .watchCancel ()
543
559
c .telemetryWg .Wait ()
544
560
c .configMaps .close ()
545
561
c .nodeUpdater .close ()
@@ -709,54 +725,34 @@ func (c *Conn) MagicsockServeHTTPDebug(w http.ResponseWriter, r *http.Request) {
709
725
c .magicConn .ServeHTTPDebug (w ,r )
710
726
}
711
727
728
+ // SendConnectedTelemetry should be called when connection to a peer with the given IP is established.
712
729
func (c * Conn )SendConnectedTelemetry (ip netip.Addr ,application string ) {
713
730
if c .telemetrySink == nil {
714
731
return
715
732
}
733
+ c .telemetryStore .markConnected (& ip ,application )
716
734
e := c .newTelemetryEvent ()
717
735
e .Status = proto .TelemetryEvent_CONNECTED
718
- e .Application = application
719
- pip ,ok := c .wireguardEngine .PeerForIP (ip )
720
- if ok {
721
- e .NodeIdRemote = uint64 (pip .Node .ID )
722
- }
723
- c .telemetryWg .Add (1 )
724
- go func () {
725
- defer c .telemetryWg .Done ()
726
- c .telemetrySink .SendTelemetryEvent (e )
727
- }()
736
+ c .sendTelemetryBackground (e )
728
737
}
729
738
730
- func (c * Conn )SendDisconnectedTelemetry (ip netip. Addr , application string ) {
739
+ func (c * Conn )SendDisconnectedTelemetry () {
731
740
if c .telemetrySink == nil {
732
741
return
733
742
}
734
743
e := c .newTelemetryEvent ()
735
744
e .Status = proto .TelemetryEvent_DISCONNECTED
736
- e .Application = application
737
- pip ,ok := c .wireguardEngine .PeerForIP (ip )
738
- if ok {
739
- e .NodeIdRemote = uint64 (pip .Node .ID )
740
- }
741
- c .telemetryWg .Add (1 )
742
- go func () {
743
- defer c .telemetryWg .Done ()
744
- c .telemetrySink .SendTelemetryEvent (e )
745
- }()
745
+ c .sendTelemetryBackground (e )
746
746
}
747
747
748
748
func (c * Conn )SendSpeedtestTelemetry (throughputMbits float64 ) {
749
749
if c .telemetrySink == nil {
750
750
return
751
751
}
752
752
e := c .newTelemetryEvent ()
753
- e .Status = proto .TelemetryEvent_CONNECTED
754
753
e .ThroughputMbits = wrapperspb .Float (float32 (throughputMbits ))
755
- c .telemetryWg .Add (1 )
756
- go func () {
757
- defer c .telemetryWg .Done ()
758
- c .telemetrySink .SendTelemetryEvent (e )
759
- }()
754
+ e .Status = proto .TelemetryEvent_CONNECTED
755
+ c .sendTelemetryBackground (e )
760
756
}
761
757
762
758
// nolint:revive
@@ -769,31 +765,59 @@ func (c *Conn) sendPingTelemetry(pr *ipnstate.PingResult) {
769
765
latency := durationpb .New (time .Duration (pr .LatencySeconds * float64 (time .Second )))
770
766
if pr .Endpoint != "" {
771
767
e .P2PLatency = latency
772
- e .P2PEndpoint = c .telemeteryStore .toEndpoint (pr .Endpoint )
768
+ e .P2PEndpoint = c .telemetryStore .toEndpoint (pr .Endpoint )
773
769
}else {
774
770
e .DerpLatency = latency
775
771
}
776
772
e .Status = proto .TelemetryEvent_CONNECTED
777
- c .telemetryWg .Add (1 )
778
- go func () {
779
- defer c .telemetryWg .Done ()
780
- c .telemetrySink .SendTelemetryEvent (e )
781
- }()
773
+ c .sendTelemetryBackground (e )
782
774
}
783
775
784
776
// The returned telemetry event will not have it's status set.
785
777
func (c * Conn )newTelemetryEvent ()* proto.TelemetryEvent {
786
778
// Infallible
787
779
id ,_ := c .id .MarshalBinary ()
788
- event := c .telemeteryStore .newEvent ()
780
+ event := c .telemetryStore .newEvent ()
789
781
event .ClientType = c .clientType
790
782
event .Id = id
791
- selfNode := c .Node ()
792
- event .NodeIdSelf = uint64 (selfNode .ID )
793
- event .HomeDerp = strconv .Itoa (selfNode .PreferredDERP )
783
+ event .ConnectionAge = durationpb .New (time .Since (c .createdAt ))
794
784
return event
795
785
}
796
786
787
+ func (c * Conn )sendTelemetryBackground (e * proto.TelemetryEvent ) {
788
+ c .telemetryWg .Add (1 )
789
+ go func () {
790
+ defer c .telemetryWg .Done ()
791
+ c .telemetrySink .SendTelemetryEvent (e )
792
+ }()
793
+ }
794
+
795
+ // Watch for changes in the connection type (P2P<->DERP) and send telemetry events.
796
+ func (c * Conn )watchConnChange () {
797
+ ticker := time .NewTicker (time .Millisecond * 50 )
798
+ defer ticker .Stop ()
799
+ for {
800
+ select {
801
+ case <- c .watchCtx .Done ():
802
+ return
803
+ case <- ticker .C :
804
+ }
805
+ status := c .Status ()
806
+ peers := status .Peers ()
807
+ if len (peers )> 1 {
808
+ // Not a CLI<->agent connection, stop watching
809
+ return
810
+ }else if len (peers )== 0 {
811
+ continue
812
+ }
813
+ peer := status .Peer [peers [0 ]]
814
+ // If the connection type has changed, send a telemetry event with the latest ping stats
815
+ if c .telemetryStore .changedConntype (peer .CurAddr ) {
816
+ c .telemetryStore .pingPeer (c )
817
+ }
818
+ }
819
+ }
820
+
797
821
// PeerDiagnostics is a checklist of human-readable conditions necessary to establish an encrypted
798
822
// tunnel to a peer via a Conn
799
823
type PeerDiagnostics struct {