Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0b51bd5

Browse files
committed
chore: refactor sending telemetry
1 parentcfb06e9 commit0b51bd5

File tree

4 files changed

+254
-186
lines changed

4 files changed

+254
-186
lines changed

‎codersdk/workspacesdk/connector.go

Lines changed: 5 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,12 @@ import (
88
"net/http"
99
"net/url"
1010
"slices"
11-
"strings"
1211
"sync"
13-
"sync/atomic"
1412
"time"
1513

1614
"github.com/google/uuid"
1715
"golang.org/x/xerrors"
1816
"nhooyr.io/websocket"
19-
"storj.io/drpc"
20-
"storj.io/drpc/drpcerr"
2117

2218
"cdr.dev/slog"
2319
"github.com/coder/coder/v2/buildinfo"
@@ -66,19 +62,12 @@ type tailnetAPIConnector struct {
6662
dialOptions*websocket.DialOptions
6763
derpCtrl tailnet.DERPController
6864
coordCtrl tailnet.CoordinationController
69-
customDialFnfunc() (proto.DRPCTailnetClient,error)
70-
71-
clientMu sync.RWMutex
72-
client proto.DRPCTailnetClient
65+
telCtrl*tailnet.BasicTelemetryController
7366

7467
connectedchanerror
7568
resumeToken*proto.RefreshResumeTokenResponse
7669
isFirstbool
7770
closedchanstruct{}
78-
79-
// Only set to true if we get a response from the server that it doesn't support
80-
// network telemetry.
81-
telemetryUnavailable atomic.Bool
8271
}
8372

8473
// Create a new tailnetAPIConnector without running it
@@ -92,6 +81,7 @@ func newTailnetAPIConnector(ctx context.Context, logger slog.Logger, agentID uui
9281
dialOptions:dialOptions,
9382
connected:make(chanerror,1),
9483
closed:make(chanstruct{}),
84+
telCtrl:tailnet.NewBasicTelemetryController(logger),
9585
}
9686
}
9787

@@ -124,9 +114,6 @@ func (tac *tailnetAPIConnector) runConnector(conn tailnetConn) {
124114
iferr!=nil {
125115
continue
126116
}
127-
tac.clientMu.Lock()
128-
tac.client=tailnetClient
129-
tac.clientMu.Unlock()
130117
tac.logger.Debug(tac.ctx,"obtained tailnet API v2+ client")
131118
tac.runConnectorOnce(tailnetClient)
132119
tac.logger.Debug(tac.ctx,"tailnet API v2+ connection lost")
@@ -141,9 +128,6 @@ var permanentErrorStatuses = []int{
141128
}
142129

143130
func (tac*tailnetAPIConnector)dial() (proto.DRPCTailnetClient,error) {
144-
iftac.customDialFn!=nil {
145-
returntac.customDialFn()
146-
}
147131
tac.logger.Debug(tac.ctx,"dialing Coder tailnet v2+ API")
148132

149133
u,err:=url.Parse(tac.coordinateURL)
@@ -228,6 +212,8 @@ func (tac *tailnetAPIConnector) runConnectorOnce(client proto.DRPCTailnetClient)
228212
}
229213
}()
230214

215+
tac.telCtrl.New(client)// synchronous, doesn't need a goroutine
216+
231217
refreshTokenCtx,refreshTokenCancel:=context.WithCancel(tac.ctx)
232218
wg:= sync.WaitGroup{}
233219
wg.Add(3)
@@ -245,10 +231,7 @@ func (tac *tailnetAPIConnector) runConnectorOnce(client proto.DRPCTailnetClient)
245231
// we do NOT want to gracefully disconnect on the coordinate() routine. So, we'll just
246232
// close the underlying connection. This will trigger a retry of the control plane in
247233
// run().
248-
tac.clientMu.Lock()
249234
client.DRPCConn().Close()
250-
tac.client=nil
251-
tac.clientMu.Unlock()
252235
// Note that derpMap() logs it own errors, we don't bother here.
253236
}
254237
}()
@@ -351,20 +334,5 @@ func (tac *tailnetAPIConnector) refreshToken(ctx context.Context, client proto.D
351334
}
352335

353336
func (tac*tailnetAPIConnector)SendTelemetryEvent(event*proto.TelemetryEvent) {
354-
tac.clientMu.RLock()
355-
// We hold the lock for the entire telemetry request, but this would only block
356-
// a coordinate retry, and closing the connection.
357-
defertac.clientMu.RUnlock()
358-
iftac.client==nil||tac.telemetryUnavailable.Load() {
359-
return
360-
}
361-
ctx,cancel:=context.WithTimeout(tac.ctx,5*time.Second)
362-
defercancel()
363-
_,err:=tac.client.PostTelemetry(ctx,&proto.TelemetryRequest{
364-
Events: []*proto.TelemetryEvent{event},
365-
})
366-
ifdrpcerr.Code(err)==drpcerr.Unimplemented||drpc.ProtocolError.Has(err)&&strings.Contains(err.Error(),"unknown rpc: ") {
367-
tac.logger.Debug(tac.ctx,"attempted to send telemetry to a server that doesn't support it",slog.Error(err))
368-
tac.telemetryUnavailable.Store(true)
369-
}
337+
tac.telCtrl.SendTelemetryEvent(event)
370338
}

‎codersdk/workspacesdk/connector_internal_test.go

Lines changed: 10 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,8 @@ import (
1313
"github.com/hashicorp/yamux"
1414
"github.com/stretchr/testify/assert"
1515
"github.com/stretchr/testify/require"
16-
"golang.org/x/xerrors"
17-
"google.golang.org/protobuf/types/known/durationpb"
18-
"google.golang.org/protobuf/types/known/timestamppb"
1916
"nhooyr.io/websocket"
2017
"storj.io/drpc"
21-
"storj.io/drpc/drpcerr"
2218
"tailscale.com/tailcfg"
2319

2420
"cdr.dev/slog"
@@ -385,7 +381,12 @@ func TestTailnetAPIConnector_TelemetrySuccess(t *testing.T) {
385381
DERPMapUpdateFrequency:time.Millisecond,
386382
DERPMapFn:func()*tailcfg.DERPMap {return<-derpMapCh },
387383
NetworkTelemetryHandler:func(batch []*proto.TelemetryEvent) {
388-
testutil.RequireSendCtx(ctx,t,eventCh,batch)
384+
select {
385+
case<-ctx.Done():
386+
t.Error("timeout sending telemetry event")
387+
caseeventCh<-batch:
388+
t.Log("sent telemetry batch")
389+
}
389390
},
390391
ResumeTokenProvider:tailnet.NewInsecureTestResumeTokenProvider(),
391392
})
@@ -409,11 +410,10 @@ func TestTailnetAPIConnector_TelemetrySuccess(t *testing.T) {
409410

410411
uut:=newTailnetAPIConnector(ctx,logger,agentID,svr.URL,quartz.NewReal(),&websocket.DialOptions{})
411412
uut.runConnector(fConn)
412-
require.Eventually(t,func()bool {
413-
uut.clientMu.Lock()
414-
deferuut.clientMu.Unlock()
415-
returnuut.client!=nil
416-
},testutil.WaitShort,testutil.IntervalFast)
413+
// Coordinate calls happen _after_ telemetry is connected up, so we use this
414+
// to ensure telemetry is connected before sending our event
415+
cc:=testutil.RequireRecvCtx(ctx,t,fCoord.CoordinateCalls)
416+
deferclose(cc.Resps)
417417

418418
uut.SendTelemetryEvent(&proto.TelemetryEvent{
419419
Id: []byte("test event"),
@@ -425,86 +425,6 @@ func TestTailnetAPIConnector_TelemetrySuccess(t *testing.T) {
425425
require.Equal(t, []byte("test event"),testEvents[0].Id)
426426
}
427427

428-
funcTestTailnetAPIConnector_TelemetryUnimplemented(t*testing.T) {
429-
t.Parallel()
430-
ctx:=testutil.Context(t,testutil.WaitShort)
431-
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
432-
agentID:= uuid.UUID{0x55}
433-
fConn:=newFakeTailnetConn()
434-
435-
fakeDRPCClient:=newFakeDRPCClient()
436-
uut:=&tailnetAPIConnector{
437-
ctx:ctx,
438-
logger:logger,
439-
agentID:agentID,
440-
coordinateURL:"",
441-
clock:quartz.NewReal(),
442-
dialOptions:&websocket.DialOptions{},
443-
connected:make(chanerror,1),
444-
closed:make(chanstruct{}),
445-
customDialFn:func() (proto.DRPCTailnetClient,error) {
446-
returnfakeDRPCClient,nil
447-
},
448-
}
449-
uut.runConnector(fConn)
450-
require.Eventually(t,func()bool {
451-
uut.clientMu.Lock()
452-
deferuut.clientMu.Unlock()
453-
returnuut.client!=nil
454-
},testutil.WaitShort,testutil.IntervalFast)
455-
456-
fakeDRPCClient.telemetryError=drpcerr.WithCode(xerrors.New("Unimplemented"),0)
457-
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
458-
require.False(t,uut.telemetryUnavailable.Load())
459-
require.Equal(t,int64(1),atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls))
460-
461-
fakeDRPCClient.telemetryError=drpcerr.WithCode(xerrors.New("Unimplemented"),drpcerr.Unimplemented)
462-
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
463-
require.True(t,uut.telemetryUnavailable.Load())
464-
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
465-
require.Equal(t,int64(2),atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls))
466-
}
467-
468-
funcTestTailnetAPIConnector_TelemetryNotRecognised(t*testing.T) {
469-
t.Parallel()
470-
ctx:=testutil.Context(t,testutil.WaitShort)
471-
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
472-
agentID:= uuid.UUID{0x55}
473-
fConn:=newFakeTailnetConn()
474-
475-
fakeDRPCClient:=newFakeDRPCClient()
476-
uut:=&tailnetAPIConnector{
477-
ctx:ctx,
478-
logger:logger,
479-
agentID:agentID,
480-
coordinateURL:"",
481-
clock:quartz.NewReal(),
482-
dialOptions:&websocket.DialOptions{},
483-
connected:make(chanerror,1),
484-
closed:make(chanstruct{}),
485-
customDialFn:func() (proto.DRPCTailnetClient,error) {
486-
returnfakeDRPCClient,nil
487-
},
488-
}
489-
uut.runConnector(fConn)
490-
require.Eventually(t,func()bool {
491-
uut.clientMu.Lock()
492-
deferuut.clientMu.Unlock()
493-
returnuut.client!=nil
494-
},testutil.WaitShort,testutil.IntervalFast)
495-
496-
fakeDRPCClient.telemetryError=drpc.ProtocolError.New("Protocol Error")
497-
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
498-
require.False(t,uut.telemetryUnavailable.Load())
499-
require.Equal(t,int64(1),atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls))
500-
501-
fakeDRPCClient.telemetryError=drpc.ProtocolError.New("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry")
502-
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
503-
require.True(t,uut.telemetryUnavailable.Load())
504-
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
505-
require.Equal(t,int64(2),atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls))
506-
}
507-
508428
typefakeTailnetConnstruct{}
509429

510430
func (*fakeTailnetConn)UpdatePeers([]*proto.CoordinateResponse_PeerUpdate)error {
@@ -524,65 +444,6 @@ func newFakeTailnetConn() *fakeTailnetConn {
524444
return&fakeTailnetConn{}
525445
}
526446

527-
typefakeDRPCClientstruct {
528-
postTelemetryCallsint64
529-
refreshTokenFnfunc(context.Context,*proto.RefreshResumeTokenRequest) (*proto.RefreshResumeTokenResponse,error)
530-
telemetryErrorerror
531-
fakeDRPPCMapStream
532-
}
533-
534-
var_ proto.DRPCTailnetClient=&fakeDRPCClient{}
535-
536-
funcnewFakeDRPCClient()*fakeDRPCClient {
537-
return&fakeDRPCClient{
538-
postTelemetryCalls:0,
539-
fakeDRPPCMapStream:fakeDRPPCMapStream{
540-
fakeDRPCStream:fakeDRPCStream{
541-
ch:make(chanstruct{}),
542-
},
543-
},
544-
}
545-
}
546-
547-
// Coordinate implements proto.DRPCTailnetClient.
548-
func (f*fakeDRPCClient)Coordinate(_ context.Context) (proto.DRPCTailnet_CoordinateClient,error) {
549-
return&f.fakeDRPCStream,nil
550-
}
551-
552-
// DRPCConn implements proto.DRPCTailnetClient.
553-
func (*fakeDRPCClient)DRPCConn() drpc.Conn {
554-
return&fakeDRPCConn{}
555-
}
556-
557-
// PostTelemetry implements proto.DRPCTailnetClient.
558-
func (f*fakeDRPCClient)PostTelemetry(_ context.Context,_*proto.TelemetryRequest) (*proto.TelemetryResponse,error) {
559-
atomic.AddInt64(&f.postTelemetryCalls,1)
560-
returnnil,f.telemetryError
561-
}
562-
563-
// StreamDERPMaps implements proto.DRPCTailnetClient.
564-
func (f*fakeDRPCClient)StreamDERPMaps(_ context.Context,_*proto.StreamDERPMapsRequest) (proto.DRPCTailnet_StreamDERPMapsClient,error) {
565-
return&f.fakeDRPPCMapStream,nil
566-
}
567-
568-
// RefreshResumeToken implements proto.DRPCTailnetClient.
569-
func (f*fakeDRPCClient)RefreshResumeToken(_ context.Context,_*proto.RefreshResumeTokenRequest) (*proto.RefreshResumeTokenResponse,error) {
570-
iff.refreshTokenFn!=nil {
571-
returnf.refreshTokenFn(context.Background(),nil)
572-
}
573-
574-
return&proto.RefreshResumeTokenResponse{
575-
Token:"test",
576-
RefreshIn:durationpb.New(30*time.Minute),
577-
ExpiresAt:timestamppb.New(time.Now().Add(time.Hour)),
578-
},nil
579-
}
580-
581-
// WorkspaceUpdates implements proto.DRPCTailnetClient.
582-
func (*fakeDRPCClient)WorkspaceUpdates(context.Context,*proto.WorkspaceUpdatesRequest) (proto.DRPCTailnet_WorkspaceUpdatesClient,error) {
583-
panic("unimplemented")
584-
}
585-
586447
typefakeDRPCConnstruct{}
587448

588449
var_ drpc.Conn=&fakeDRPCConn{}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp