@@ -13,12 +13,8 @@ import (
1313"github.com/hashicorp/yamux"
1414"github.com/stretchr/testify/assert"
1515"github.com/stretchr/testify/require"
16- "golang.org/x/xerrors"
17- "google.golang.org/protobuf/types/known/durationpb"
18- "google.golang.org/protobuf/types/known/timestamppb"
1916"nhooyr.io/websocket"
2017"storj.io/drpc"
21- "storj.io/drpc/drpcerr"
2218"tailscale.com/tailcfg"
2319
2420"cdr.dev/slog"
@@ -385,7 +381,12 @@ func TestTailnetAPIConnector_TelemetrySuccess(t *testing.T) {
385381DERPMapUpdateFrequency :time .Millisecond ,
386382DERPMapFn :func ()* tailcfg.DERPMap {return <- derpMapCh },
387383NetworkTelemetryHandler :func (batch []* proto.TelemetryEvent ) {
388- testutil .RequireSendCtx (ctx ,t ,eventCh ,batch )
384+ select {
385+ case <- ctx .Done ():
386+ t .Error ("timeout sending telemetry event" )
387+ case eventCh <- batch :
388+ t .Log ("sent telemetry batch" )
389+ }
389390},
390391ResumeTokenProvider :tailnet .NewInsecureTestResumeTokenProvider (),
391392})
@@ -409,11 +410,10 @@ func TestTailnetAPIConnector_TelemetrySuccess(t *testing.T) {
409410
410411uut := newTailnetAPIConnector (ctx ,logger ,agentID ,svr .URL ,quartz .NewReal (),& websocket.DialOptions {})
411412uut .runConnector (fConn )
412- require .Eventually (t ,func ()bool {
413- uut .clientMu .Lock ()
414- defer uut .clientMu .Unlock ()
415- return uut .client != nil
416- },testutil .WaitShort ,testutil .IntervalFast )
413+ // Coordinate calls happen _after_ telemetry is connected up, so we use this
414+ // to ensure telemetry is connected before sending our event
415+ cc := testutil .RequireRecvCtx (ctx ,t ,fCoord .CoordinateCalls )
416+ defer close (cc .Resps )
417417
418418uut .SendTelemetryEvent (& proto.TelemetryEvent {
419419Id : []byte ("test event" ),
@@ -425,86 +425,6 @@ func TestTailnetAPIConnector_TelemetrySuccess(t *testing.T) {
425425require .Equal (t , []byte ("test event" ),testEvents [0 ].Id )
426426}
427427
428- func TestTailnetAPIConnector_TelemetryUnimplemented (t * testing.T ) {
429- t .Parallel ()
430- ctx := testutil .Context (t ,testutil .WaitShort )
431- logger := slogtest .Make (t ,nil ).Leveled (slog .LevelDebug )
432- agentID := uuid.UUID {0x55 }
433- fConn := newFakeTailnetConn ()
434-
435- fakeDRPCClient := newFakeDRPCClient ()
436- uut := & tailnetAPIConnector {
437- ctx :ctx ,
438- logger :logger ,
439- agentID :agentID ,
440- coordinateURL :"" ,
441- clock :quartz .NewReal (),
442- dialOptions :& websocket.DialOptions {},
443- connected :make (chan error ,1 ),
444- closed :make (chan struct {}),
445- customDialFn :func () (proto.DRPCTailnetClient ,error ) {
446- return fakeDRPCClient ,nil
447- },
448- }
449- uut .runConnector (fConn )
450- require .Eventually (t ,func ()bool {
451- uut .clientMu .Lock ()
452- defer uut .clientMu .Unlock ()
453- return uut .client != nil
454- },testutil .WaitShort ,testutil .IntervalFast )
455-
456- fakeDRPCClient .telemetryError = drpcerr .WithCode (xerrors .New ("Unimplemented" ),0 )
457- uut .SendTelemetryEvent (& proto.TelemetryEvent {})
458- require .False (t ,uut .telemetryUnavailable .Load ())
459- require .Equal (t ,int64 (1 ),atomic .LoadInt64 (& fakeDRPCClient .postTelemetryCalls ))
460-
461- fakeDRPCClient .telemetryError = drpcerr .WithCode (xerrors .New ("Unimplemented" ),drpcerr .Unimplemented )
462- uut .SendTelemetryEvent (& proto.TelemetryEvent {})
463- require .True (t ,uut .telemetryUnavailable .Load ())
464- uut .SendTelemetryEvent (& proto.TelemetryEvent {})
465- require .Equal (t ,int64 (2 ),atomic .LoadInt64 (& fakeDRPCClient .postTelemetryCalls ))
466- }
467-
468- func TestTailnetAPIConnector_TelemetryNotRecognised (t * testing.T ) {
469- t .Parallel ()
470- ctx := testutil .Context (t ,testutil .WaitShort )
471- logger := slogtest .Make (t ,nil ).Leveled (slog .LevelDebug )
472- agentID := uuid.UUID {0x55 }
473- fConn := newFakeTailnetConn ()
474-
475- fakeDRPCClient := newFakeDRPCClient ()
476- uut := & tailnetAPIConnector {
477- ctx :ctx ,
478- logger :logger ,
479- agentID :agentID ,
480- coordinateURL :"" ,
481- clock :quartz .NewReal (),
482- dialOptions :& websocket.DialOptions {},
483- connected :make (chan error ,1 ),
484- closed :make (chan struct {}),
485- customDialFn :func () (proto.DRPCTailnetClient ,error ) {
486- return fakeDRPCClient ,nil
487- },
488- }
489- uut .runConnector (fConn )
490- require .Eventually (t ,func ()bool {
491- uut .clientMu .Lock ()
492- defer uut .clientMu .Unlock ()
493- return uut .client != nil
494- },testutil .WaitShort ,testutil .IntervalFast )
495-
496- fakeDRPCClient .telemetryError = drpc .ProtocolError .New ("Protocol Error" )
497- uut .SendTelemetryEvent (& proto.TelemetryEvent {})
498- require .False (t ,uut .telemetryUnavailable .Load ())
499- require .Equal (t ,int64 (1 ),atomic .LoadInt64 (& fakeDRPCClient .postTelemetryCalls ))
500-
501- fakeDRPCClient .telemetryError = drpc .ProtocolError .New ("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry" )
502- uut .SendTelemetryEvent (& proto.TelemetryEvent {})
503- require .True (t ,uut .telemetryUnavailable .Load ())
504- uut .SendTelemetryEvent (& proto.TelemetryEvent {})
505- require .Equal (t ,int64 (2 ),atomic .LoadInt64 (& fakeDRPCClient .postTelemetryCalls ))
506- }
507-
508428type fakeTailnetConn struct {}
509429
510430func (* fakeTailnetConn )UpdatePeers ([]* proto.CoordinateResponse_PeerUpdate )error {
@@ -524,65 +444,6 @@ func newFakeTailnetConn() *fakeTailnetConn {
524444return & fakeTailnetConn {}
525445}
526446
527- type fakeDRPCClient struct {
528- postTelemetryCalls int64
529- refreshTokenFn func (context.Context ,* proto.RefreshResumeTokenRequest ) (* proto.RefreshResumeTokenResponse ,error )
530- telemetryError error
531- fakeDRPPCMapStream
532- }
533-
534- var _ proto.DRPCTailnetClient = & fakeDRPCClient {}
535-
536- func newFakeDRPCClient ()* fakeDRPCClient {
537- return & fakeDRPCClient {
538- postTelemetryCalls :0 ,
539- fakeDRPPCMapStream :fakeDRPPCMapStream {
540- fakeDRPCStream :fakeDRPCStream {
541- ch :make (chan struct {}),
542- },
543- },
544- }
545- }
546-
547- // Coordinate implements proto.DRPCTailnetClient.
548- func (f * fakeDRPCClient )Coordinate (_ context.Context ) (proto.DRPCTailnet_CoordinateClient ,error ) {
549- return & f .fakeDRPCStream ,nil
550- }
551-
552- // DRPCConn implements proto.DRPCTailnetClient.
553- func (* fakeDRPCClient )DRPCConn () drpc.Conn {
554- return & fakeDRPCConn {}
555- }
556-
557- // PostTelemetry implements proto.DRPCTailnetClient.
558- func (f * fakeDRPCClient )PostTelemetry (_ context.Context ,_ * proto.TelemetryRequest ) (* proto.TelemetryResponse ,error ) {
559- atomic .AddInt64 (& f .postTelemetryCalls ,1 )
560- return nil ,f .telemetryError
561- }
562-
563- // StreamDERPMaps implements proto.DRPCTailnetClient.
564- func (f * fakeDRPCClient )StreamDERPMaps (_ context.Context ,_ * proto.StreamDERPMapsRequest ) (proto.DRPCTailnet_StreamDERPMapsClient ,error ) {
565- return & f .fakeDRPPCMapStream ,nil
566- }
567-
568- // RefreshResumeToken implements proto.DRPCTailnetClient.
569- func (f * fakeDRPCClient )RefreshResumeToken (_ context.Context ,_ * proto.RefreshResumeTokenRequest ) (* proto.RefreshResumeTokenResponse ,error ) {
570- if f .refreshTokenFn != nil {
571- return f .refreshTokenFn (context .Background (),nil )
572- }
573-
574- return & proto.RefreshResumeTokenResponse {
575- Token :"test" ,
576- RefreshIn :durationpb .New (30 * time .Minute ),
577- ExpiresAt :timestamppb .New (time .Now ().Add (time .Hour )),
578- },nil
579- }
580-
581- // WorkspaceUpdates implements proto.DRPCTailnetClient.
582- func (* fakeDRPCClient )WorkspaceUpdates (context.Context ,* proto.WorkspaceUpdatesRequest ) (proto.DRPCTailnet_WorkspaceUpdatesClient ,error ) {
583- panic ("unimplemented" )
584- }
585-
586447type fakeDRPCConn struct {}
587448
588449var _ drpc.Conn = & fakeDRPCConn {}