Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite8db21c

Browse files
chore: add additional network telemetry stats & events (#13800)
1 parent38035da commite8db21c

File tree

9 files changed

+279
-79
lines changed

9 files changed

+279
-79
lines changed

‎cli/ssh.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ func (r *RootCmd) ssh() *serpent.Command {
437437
}
438438

439439
err=sshSession.Wait()
440-
conn.SendDisconnectedTelemetry("ssh")
440+
conn.SendDisconnectedTelemetry()
441441
iferr!=nil {
442442
ifexitErr:= (&gossh.ExitError{});errors.As(err,&exitErr) {
443443
// Clear the error since it's not useful beyond

‎coderd/telemetry/telemetry.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1240,7 +1240,7 @@ type NetworkEvent struct {
12401240
NodeIDSelfuint64`json:"node_id_self"`
12411241
NodeIDRemoteuint64`json:"node_id_remote"`
12421242
P2PEndpointNetworkEventP2PEndpoint`json:"p2p_endpoint"`
1243-
HomeDERPstring`json:"home_derp"`
1243+
HomeDERPint`json:"home_derp"`
12441244
DERPMapDERPMap`json:"derp_map"`
12451245
LatestNetcheckNetcheck`json:"latest_netcheck"`
12461246

@@ -1286,7 +1286,7 @@ func NetworkEventFromProto(proto *tailnetproto.TelemetryEvent) (NetworkEvent, er
12861286
NodeIDSelf:proto.NodeIdSelf,
12871287
NodeIDRemote:proto.NodeIdRemote,
12881288
P2PEndpoint:p2pEndpointFromProto(proto.P2PEndpoint),
1289-
HomeDERP:proto.HomeDerp,
1289+
HomeDERP:int(proto.HomeDerp),
12901290
DERPMap:derpMapFromProto(proto.DerpMap),
12911291
LatestNetcheck:netcheckFromProto(proto.LatestNetcheck),
12921292

‎codersdk/workspacesdk/agentconn.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,3 @@ func (c *AgentConn) apiClient() *http.Client {
380380
func (c*AgentConn)GetPeerDiagnostics() tailnet.PeerDiagnostics {
381381
returnc.Conn.GetPeerDiagnostics(c.opts.AgentID)
382382
}
383-
384-
func (c*AgentConn)SendDisconnectedTelemetry(applicationstring) {
385-
c.Conn.SendDisconnectedTelemetry(c.agentAddress(),application)
386-
}

‎codersdk/workspacesdk/connector_internal_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,12 +228,12 @@ func TestTailnetAPIConnector_TelemetryUnimplemented(t *testing.T) {
228228
returnuut.client!=nil
229229
},testutil.WaitShort,testutil.IntervalFast)
230230

231-
fakeDRPCClient.telemeteryErorr=drpcerr.WithCode(xerrors.New("Unimplemented"),0)
231+
fakeDRPCClient.telemetryError=drpcerr.WithCode(xerrors.New("Unimplemented"),0)
232232
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
233233
require.False(t,uut.telemetryUnavailable.Load())
234234
require.Equal(t,int64(1),atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls))
235235

236-
fakeDRPCClient.telemeteryErorr=drpcerr.WithCode(xerrors.New("Unimplemented"),drpcerr.Unimplemented)
236+
fakeDRPCClient.telemetryError=drpcerr.WithCode(xerrors.New("Unimplemented"),drpcerr.Unimplemented)
237237
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
238238
require.True(t,uut.telemetryUnavailable.Load())
239239
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
@@ -268,12 +268,12 @@ func TestTailnetAPIConnector_TelemetryNotRecognised(t *testing.T) {
268268
returnuut.client!=nil
269269
},testutil.WaitShort,testutil.IntervalFast)
270270

271-
fakeDRPCClient.telemeteryErorr=drpc.ProtocolError.New("Protocol Error")
271+
fakeDRPCClient.telemetryError=drpc.ProtocolError.New("Protocol Error")
272272
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
273273
require.False(t,uut.telemetryUnavailable.Load())
274274
require.Equal(t,int64(1),atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls))
275275

276-
fakeDRPCClient.telemeteryErorr=drpc.ProtocolError.New("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry")
276+
fakeDRPCClient.telemetryError=drpc.ProtocolError.New("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry")
277277
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
278278
require.True(t,uut.telemetryUnavailable.Load())
279279
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
@@ -301,7 +301,7 @@ func newFakeTailnetConn() *fakeTailnetConn {
301301

302302
typefakeDRPCClientstruct {
303303
postTelemetryCallsint64
304-
telemeteryErorrerror
304+
telemetryErrorerror
305305
fakeDRPPCMapStream
306306
}
307307

@@ -331,7 +331,7 @@ func (*fakeDRPCClient) DRPCConn() drpc.Conn {
331331
// PostTelemetry implements proto.DRPCTailnetClient.
332332
func (f*fakeDRPCClient)PostTelemetry(_ context.Context,_*proto.TelemetryRequest) (*proto.TelemetryResponse,error) {
333333
atomic.AddInt64(&f.postTelemetryCalls,1)
334-
returnnil,f.telemeteryErorr
334+
returnnil,f.telemetryError
335335
}
336336

337337
// StreamDERPMaps implements proto.DRPCTailnetClient.

‎tailnet/conn.go

Lines changed: 77 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"tailscale.com/types/key"
3232
tslogger"tailscale.com/types/logger"
3333
"tailscale.com/types/netlogtype"
34+
"tailscale.com/types/netmap"
3435
"tailscale.com/wgengine"
3536
"tailscale.com/wgengine/capture"
3637
"tailscale.com/wgengine/magicsock"
@@ -262,17 +263,8 @@ func NewConn(options *Options) (conn *Conn, err error) {
262263
)
263264
nodeUp.setAddresses(options.Addresses)
264265
nodeUp.setBlockEndpoints(options.BlockEndpoints)
265-
wireguardEngine.SetStatusCallback(nodeUp.setStatus)
266-
magicConn.SetDERPForcedWebsocketCallback(nodeUp.setDERPForcedWebsocket)
267-
iftelemetryStore!=nil {
268-
wireguardEngine.SetNetInfoCallback(func(ni*tailcfg.NetInfo) {
269-
nodeUp.setNetInfo(ni)
270-
telemetryStore.setNetInfo(ni)
271-
})
272-
}else {
273-
wireguardEngine.SetNetInfoCallback(nodeUp.setNetInfo)
274-
}
275266

267+
ctx,ctxCancel:=context.WithCancel(context.Background())
276268
server:=&Conn{
277269
id:uuid.New(),
278270
closed:make(chanstruct{}),
@@ -290,13 +282,32 @@ func NewConn(options *Options) (conn *Conn, err error) {
290282
configMaps:cfgMaps,
291283
nodeUpdater:nodeUp,
292284
telemetrySink:options.TelemetrySink,
293-
telemeteryStore:telemetryStore,
285+
telemetryStore:telemetryStore,
286+
createdAt:time.Now(),
287+
watchCtx:ctx,
288+
watchCancel:ctxCancel,
294289
}
295290
deferfunc() {
296291
iferr!=nil {
297292
_=server.Close()
298293
}
299294
}()
295+
ifserver.telemetryStore!=nil {
296+
server.wireguardEngine.SetNetInfoCallback(func(ni*tailcfg.NetInfo) {
297+
server.telemetryStore.setNetInfo(ni)
298+
nodeUp.setNetInfo(ni)
299+
server.telemetryStore.pingPeer(server)
300+
})
301+
server.wireguardEngine.AddNetworkMapCallback(func(nm*netmap.NetworkMap) {
302+
server.telemetryStore.updateNetworkMap(nm)
303+
server.telemetryStore.pingPeer(server)
304+
})
305+
goserver.watchConnChange()
306+
}else {
307+
server.wireguardEngine.SetNetInfoCallback(nodeUp.setNetInfo)
308+
}
309+
server.wireguardEngine.SetStatusCallback(nodeUp.setStatus)
310+
server.magicConn.SetDERPForcedWebsocketCallback(nodeUp.setDERPForcedWebsocket)
300311

301312
netStack.GetTCPHandlerForFlow=server.forwardTCP
302313

@@ -351,11 +362,15 @@ type Conn struct {
351362
wireguardEngine wgengine.Engine
352363
listenersmap[listenKey]*listener
353364
clientType proto.TelemetryEvent_ClientType
365+
createdAt time.Time
354366

355367
telemetrySinkTelemetrySink
356-
// telemeteryStore will be nil if telemetrySink is nil.
357-
telemeteryStore*TelemetryStore
358-
telemetryWg sync.WaitGroup
368+
// telemetryStore will be nil if telemetrySink is nil.
369+
telemetryStore*TelemetryStore
370+
telemetryWg sync.WaitGroup
371+
372+
watchCtx context.Context
373+
watchCancelfunc()
359374

360375
trafficStats*connstats.Statistics
361376
}
@@ -390,8 +405,8 @@ func (c *Conn) SetNodeCallback(callback func(node *Node)) {
390405

391406
// SetDERPMap updates the DERPMap of a connection.
392407
func (c*Conn)SetDERPMap(derpMap*tailcfg.DERPMap) {
393-
ifc.configMaps.setDERPMap(derpMap)&&c.telemeteryStore!=nil {
394-
c.telemeteryStore.updateDerpMap(derpMap)
408+
ifc.configMaps.setDERPMap(derpMap)&&c.telemetryStore!=nil {
409+
c.telemetryStore.updateDerpMap(derpMap)
395410
}
396411
}
397412

@@ -540,6 +555,7 @@ func (c *Conn) Closed() <-chan struct{} {
540555
// Close shuts down the Wireguard connection.
541556
func (c*Conn)Close()error {
542557
c.logger.Info(context.Background(),"closing tailnet Conn")
558+
c.watchCancel()
543559
c.telemetryWg.Wait()
544560
c.configMaps.close()
545561
c.nodeUpdater.close()
@@ -709,54 +725,34 @@ func (c *Conn) MagicsockServeHTTPDebug(w http.ResponseWriter, r *http.Request) {
709725
c.magicConn.ServeHTTPDebug(w,r)
710726
}
711727

728+
// SendConnectedTelemetry should be called when connection to a peer with the given IP is established.
712729
func (c*Conn)SendConnectedTelemetry(ip netip.Addr,applicationstring) {
713730
ifc.telemetrySink==nil {
714731
return
715732
}
733+
c.telemetryStore.markConnected(&ip,application)
716734
e:=c.newTelemetryEvent()
717735
e.Status=proto.TelemetryEvent_CONNECTED
718-
e.Application=application
719-
pip,ok:=c.wireguardEngine.PeerForIP(ip)
720-
ifok {
721-
e.NodeIdRemote=uint64(pip.Node.ID)
722-
}
723-
c.telemetryWg.Add(1)
724-
gofunc() {
725-
deferc.telemetryWg.Done()
726-
c.telemetrySink.SendTelemetryEvent(e)
727-
}()
736+
c.sendTelemetryBackground(e)
728737
}
729738

730-
func (c*Conn)SendDisconnectedTelemetry(ip netip.Addr,applicationstring) {
739+
func (c*Conn)SendDisconnectedTelemetry() {
731740
ifc.telemetrySink==nil {
732741
return
733742
}
734743
e:=c.newTelemetryEvent()
735744
e.Status=proto.TelemetryEvent_DISCONNECTED
736-
e.Application=application
737-
pip,ok:=c.wireguardEngine.PeerForIP(ip)
738-
ifok {
739-
e.NodeIdRemote=uint64(pip.Node.ID)
740-
}
741-
c.telemetryWg.Add(1)
742-
gofunc() {
743-
deferc.telemetryWg.Done()
744-
c.telemetrySink.SendTelemetryEvent(e)
745-
}()
745+
c.sendTelemetryBackground(e)
746746
}
747747

748748
func (c*Conn)SendSpeedtestTelemetry(throughputMbitsfloat64) {
749749
ifc.telemetrySink==nil {
750750
return
751751
}
752752
e:=c.newTelemetryEvent()
753-
e.Status=proto.TelemetryEvent_CONNECTED
754753
e.ThroughputMbits=wrapperspb.Float(float32(throughputMbits))
755-
c.telemetryWg.Add(1)
756-
gofunc() {
757-
deferc.telemetryWg.Done()
758-
c.telemetrySink.SendTelemetryEvent(e)
759-
}()
754+
e.Status=proto.TelemetryEvent_CONNECTED
755+
c.sendTelemetryBackground(e)
760756
}
761757

762758
// nolint:revive
@@ -769,31 +765,59 @@ func (c *Conn) sendPingTelemetry(pr *ipnstate.PingResult) {
769765
latency:=durationpb.New(time.Duration(pr.LatencySeconds*float64(time.Second)))
770766
ifpr.Endpoint!="" {
771767
e.P2PLatency=latency
772-
e.P2PEndpoint=c.telemeteryStore.toEndpoint(pr.Endpoint)
768+
e.P2PEndpoint=c.telemetryStore.toEndpoint(pr.Endpoint)
773769
}else {
774770
e.DerpLatency=latency
775771
}
776772
e.Status=proto.TelemetryEvent_CONNECTED
777-
c.telemetryWg.Add(1)
778-
gofunc() {
779-
deferc.telemetryWg.Done()
780-
c.telemetrySink.SendTelemetryEvent(e)
781-
}()
773+
c.sendTelemetryBackground(e)
782774
}
783775

784776
// The returned telemetry event will not have it's status set.
785777
func (c*Conn)newTelemetryEvent()*proto.TelemetryEvent {
786778
// Infallible
787779
id,_:=c.id.MarshalBinary()
788-
event:=c.telemeteryStore.newEvent()
780+
event:=c.telemetryStore.newEvent()
789781
event.ClientType=c.clientType
790782
event.Id=id
791-
selfNode:=c.Node()
792-
event.NodeIdSelf=uint64(selfNode.ID)
793-
event.HomeDerp=strconv.Itoa(selfNode.PreferredDERP)
783+
event.ConnectionAge=durationpb.New(time.Since(c.createdAt))
794784
returnevent
795785
}
796786

787+
func (c*Conn)sendTelemetryBackground(e*proto.TelemetryEvent) {
788+
c.telemetryWg.Add(1)
789+
gofunc() {
790+
deferc.telemetryWg.Done()
791+
c.telemetrySink.SendTelemetryEvent(e)
792+
}()
793+
}
794+
795+
// Watch for changes in the connection type (P2P<->DERP) and send telemetry events.
796+
func (c*Conn)watchConnChange() {
797+
ticker:=time.NewTicker(time.Millisecond*50)
798+
deferticker.Stop()
799+
for {
800+
select {
801+
case<-c.watchCtx.Done():
802+
return
803+
case<-ticker.C:
804+
}
805+
status:=c.Status()
806+
peers:=status.Peers()
807+
iflen(peers)>1 {
808+
// Not a CLI<->agent connection, stop watching
809+
return
810+
}elseiflen(peers)==0 {
811+
continue
812+
}
813+
peer:=status.Peer[peers[0]]
814+
// If the connection type has changed, send a telemetry event with the latest ping stats
815+
ifc.telemetryStore.changedConntype(peer.CurAddr) {
816+
c.telemetryStore.pingPeer(c)
817+
}
818+
}
819+
}
820+
797821
// PeerDiagnostics is a checklist of human-readable conditions necessary to establish an encrypted
798822
// tunnel to a peer via a Conn
799823
typePeerDiagnosticsstruct {

‎tailnet/proto/tailnet.pb.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.

‎tailnet/proto/tailnet.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ message TelemetryEvent {
169169
uint64node_id_self=7;
170170
uint64node_id_remote=8;
171171
P2PEndpointp2p_endpoint=9;
172-
stringhome_derp=10;
172+
int32home_derp=10;
173173
DERPMapderp_map=11;
174174
Netchecklatest_netcheck=12;
175175

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp