@@ -13,7 +13,10 @@ import (
13
13
"github.com/hashicorp/yamux"
14
14
"github.com/stretchr/testify/assert"
15
15
"github.com/stretchr/testify/require"
16
+ "golang.org/x/xerrors"
16
17
"nhooyr.io/websocket"
18
+ "storj.io/drpc"
19
+ "storj.io/drpc/drpcerr"
17
20
"tailscale.com/tailcfg"
18
21
19
22
"cdr.dev/slog"
@@ -139,6 +142,140 @@ func TestTailnetAPIConnector_UplevelVersion(t *testing.T) {
139
142
require .NotEmpty (t ,sdkErr .Helper )
140
143
}
141
144
145
+ func TestTailnetAPIConnector_TelemetrySuccess (t * testing.T ) {
146
+ t .Parallel ()
147
+ ctx := testutil .Context (t ,testutil .WaitShort )
148
+ logger := slogtest .Make (t ,nil ).Leveled (slog .LevelDebug )
149
+ agentID := uuid.UUID {0x55 }
150
+ clientID := uuid.UUID {0x66 }
151
+ fCoord := tailnettest .NewFakeCoordinator ()
152
+ var coord tailnet.Coordinator = fCoord
153
+ coordPtr := atomic.Pointer [tailnet.Coordinator ]{}
154
+ coordPtr .Store (& coord )
155
+ derpMapCh := make (chan * tailcfg.DERPMap )
156
+ defer close (derpMapCh )
157
+ eventCh := make (chan []* proto.TelemetryEvent ,1 )
158
+ svc ,err := tailnet .NewClientService (tailnet.ClientServiceOptions {
159
+ Logger :logger ,
160
+ CoordPtr :& coordPtr ,
161
+ DERPMapUpdateFrequency :time .Millisecond ,
162
+ DERPMapFn :func ()* tailcfg.DERPMap {return <- derpMapCh },
163
+ NetworkTelemetryHandler :func (batch []* proto.TelemetryEvent ) {
164
+ eventCh <- batch
165
+ },
166
+ })
167
+ require .NoError (t ,err )
168
+
169
+ svr := httptest .NewServer (http .HandlerFunc (func (w http.ResponseWriter ,r * http.Request ) {
170
+ sws ,err := websocket .Accept (w ,r ,nil )
171
+ if ! assert .NoError (t ,err ) {
172
+ return
173
+ }
174
+ ctx ,nc := codersdk .WebsocketNetConn (r .Context (),sws ,websocket .MessageBinary )
175
+ err = svc .ServeConnV2 (ctx ,nc , tailnet.StreamID {
176
+ Name :"client" ,
177
+ ID :clientID ,
178
+ Auth : tailnet.ClientCoordinateeAuth {AgentID :agentID },
179
+ })
180
+ assert .NoError (t ,err )
181
+ }))
182
+
183
+ fConn := newFakeTailnetConn ()
184
+
185
+ uut := newTailnetAPIConnector (ctx ,logger ,agentID ,svr .URL ,& websocket.DialOptions {})
186
+ uut .runConnector (fConn )
187
+ require .Eventually (t ,func ()bool {
188
+ uut .clientMu .Lock ()
189
+ defer uut .clientMu .Unlock ()
190
+ return uut .client != nil
191
+ },testutil .WaitShort ,testutil .IntervalFast )
192
+
193
+ uut .SendTelemetryEvent (& proto.TelemetryEvent {
194
+ Id : []byte ("test event" ),
195
+ })
196
+
197
+ testEvents := testutil .RequireRecvCtx (ctx ,t ,eventCh )
198
+
199
+ require .Len (t ,testEvents ,1 )
200
+ require .Equal (t , []byte ("test event" ),testEvents [0 ].Id )
201
+ }
202
+
203
+ // Server doesn't support telemetry / server unimplemented telemetry
204
+
205
+ func TestTailnetAPIConnector_TelemetryUnimplemented (t * testing.T ) {
206
+ t .Parallel ()
207
+ ctx := testutil .Context (t ,testutil .WaitShort )
208
+ logger := slogtest .Make (t ,nil ).Leveled (slog .LevelDebug )
209
+ agentID := uuid.UUID {0x55 }
210
+ fConn := newFakeTailnetConn ()
211
+
212
+ fakeDRPCClient := newFakeDRPCClient ()
213
+ uut := & tailnetAPIConnector {
214
+ ctx :ctx ,
215
+ logger :logger ,
216
+ agentID :agentID ,
217
+ coordinateURL :"" ,
218
+ dialOptions :& websocket.DialOptions {},
219
+ conn :nil ,
220
+ connected :make (chan error ,1 ),
221
+ closed :make (chan struct {}),
222
+ customDialFn :func () (proto.DRPCTailnetClient ,error ) {
223
+ return fakeDRPCClient ,nil
224
+ },
225
+ }
226
+ uut .runConnector (fConn )
227
+ require .Eventually (t ,func ()bool {
228
+ uut .clientMu .Lock ()
229
+ defer uut .clientMu .Unlock ()
230
+ return uut .client != nil
231
+ },testutil .WaitShort ,testutil .IntervalFast )
232
+
233
+ fakeDRPCClient .telemeteryErorr = drpcerr .WithCode (xerrors .New ("Unimplemented" ),0 )
234
+ uut .SendTelemetryEvent (& proto.TelemetryEvent {})
235
+ require .False (t ,uut .telemetryUnavailable .Load ())
236
+
237
+ fakeDRPCClient .telemeteryErorr = drpcerr .WithCode (xerrors .New ("Unimplemented" ),drpcerr .Unimplemented )
238
+ uut .SendTelemetryEvent (& proto.TelemetryEvent {})
239
+ require .True (t ,uut .telemetryUnavailable .Load ())
240
+ }
241
+
242
+ func TestTailnetAPIConnector_TelemetryNotRecognised (t * testing.T ) {
243
+ t .Parallel ()
244
+ ctx := testutil .Context (t ,testutil .WaitShort )
245
+ logger := slogtest .Make (t ,nil ).Leveled (slog .LevelDebug )
246
+ agentID := uuid.UUID {0x55 }
247
+ fConn := newFakeTailnetConn ()
248
+
249
+ fakeDRPCClient := newFakeDRPCClient ()
250
+ uut := & tailnetAPIConnector {
251
+ ctx :ctx ,
252
+ logger :logger ,
253
+ agentID :agentID ,
254
+ coordinateURL :"" ,
255
+ dialOptions :& websocket.DialOptions {},
256
+ conn :nil ,
257
+ connected :make (chan error ,1 ),
258
+ closed :make (chan struct {}),
259
+ customDialFn :func () (proto.DRPCTailnetClient ,error ) {
260
+ return fakeDRPCClient ,nil
261
+ },
262
+ }
263
+ uut .runConnector (fConn )
264
+ require .Eventually (t ,func ()bool {
265
+ uut .clientMu .Lock ()
266
+ defer uut .clientMu .Unlock ()
267
+ return uut .client != nil
268
+ },testutil .WaitShort ,testutil .IntervalFast )
269
+
270
+ fakeDRPCClient .telemeteryErorr = drpc .ProtocolError .New ("Protocol Error" )
271
+ uut .SendTelemetryEvent (& proto.TelemetryEvent {})
272
+ require .False (t ,uut .telemetryUnavailable .Load ())
273
+
274
+ fakeDRPCClient .telemeteryErorr = drpc .ProtocolError .New ("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry" )
275
+ uut .SendTelemetryEvent (& proto.TelemetryEvent {})
276
+ require .True (t ,uut .telemetryUnavailable .Load ())
277
+ }
278
+
142
279
type fakeTailnetConn struct {}
143
280
144
281
func (* fakeTailnetConn )UpdatePeers ([]* proto.CoordinateResponse_PeerUpdate )error {
@@ -157,3 +294,120 @@ func (*fakeTailnetConn) SetTunnelDestination(uuid.UUID) {}
157
294
func newFakeTailnetConn ()* fakeTailnetConn {
158
295
return & fakeTailnetConn {}
159
296
}
297
+
298
+ type fakeDRPCClient struct {
299
+ telemeteryErorr error
300
+ fakeDRPPCMapStream
301
+ }
302
+
303
+ var _ proto.DRPCTailnetClient = & fakeDRPCClient {}
304
+
305
+ func newFakeDRPCClient ()* fakeDRPCClient {
306
+ return & fakeDRPCClient {
307
+ fakeDRPPCMapStream :fakeDRPPCMapStream {
308
+ fakeDRPCStream :fakeDRPCStream {
309
+ ch :make (chan struct {}),
310
+ },
311
+ },
312
+ }
313
+ }
314
+
315
+ // Coordinate implements proto.DRPCTailnetClient.
316
+ func (f * fakeDRPCClient )Coordinate (_ context.Context ) (proto.DRPCTailnet_CoordinateClient ,error ) {
317
+ return & f .fakeDRPCStream ,nil
318
+ }
319
+
320
+ // DRPCConn implements proto.DRPCTailnetClient.
321
+ func (* fakeDRPCClient )DRPCConn () drpc.Conn {
322
+ return & fakeDRPCConn {}
323
+ }
324
+
325
+ // PostTelemetry implements proto.DRPCTailnetClient.
326
+ func (f * fakeDRPCClient )PostTelemetry (_ context.Context ,in * proto.TelemetryRequest ) (* proto.TelemetryResponse ,error ) {
327
+ return nil ,f .telemeteryErorr
328
+ }
329
+
330
+ // StreamDERPMaps implements proto.DRPCTailnetClient.
331
+ func (f * fakeDRPCClient )StreamDERPMaps (_ context.Context ,_ * proto.StreamDERPMapsRequest ) (proto.DRPCTailnet_StreamDERPMapsClient ,error ) {
332
+ return & f .fakeDRPPCMapStream ,nil
333
+ }
334
+
335
+ type fakeDRPCConn struct {}
336
+
337
+ var _ drpc.Conn = & fakeDRPCConn {}
338
+
339
+ // Close implements drpc.Conn.
340
+ func (* fakeDRPCConn )Close ()error {
341
+ return nil
342
+ }
343
+
344
+ // Closed implements drpc.Conn.
345
+ func (* fakeDRPCConn )Closed ()<- chan struct {} {
346
+ return nil
347
+ }
348
+
349
+ // Invoke implements drpc.Conn.
350
+ func (* fakeDRPCConn )Invoke (_ context.Context ,_ string ,_ drpc.Encoding ,_ drpc.Message ,_ drpc.Message )error {
351
+ return nil
352
+ }
353
+
354
+ // NewStream implements drpc.Conn.
355
+ func (* fakeDRPCConn )NewStream (_ context.Context ,_ string ,_ drpc.Encoding ) (drpc.Stream ,error ) {
356
+ return nil ,nil
357
+ }
358
+
359
+ type fakeDRPCStream struct {
360
+ ch chan struct {}
361
+ }
362
+
363
+ var _ proto.DRPCTailnet_CoordinateClient = & fakeDRPCStream {}
364
+
365
+ // Close implements proto.DRPCTailnet_CoordinateClient.
366
+ func (f * fakeDRPCStream )Close ()error {
367
+ close (f .ch )
368
+ return nil
369
+ }
370
+
371
+ // CloseSend implements proto.DRPCTailnet_CoordinateClient.
372
+ func (* fakeDRPCStream )CloseSend ()error {
373
+ return nil
374
+ }
375
+
376
+ // Context implements proto.DRPCTailnet_CoordinateClient.
377
+ func (* fakeDRPCStream )Context () context.Context {
378
+ return nil
379
+ }
380
+
381
+ // MsgRecv implements proto.DRPCTailnet_CoordinateClient.
382
+ func (* fakeDRPCStream )MsgRecv (_ drpc.Message ,_ drpc.Encoding )error {
383
+ return nil
384
+ }
385
+
386
+ // MsgSend implements proto.DRPCTailnet_CoordinateClient.
387
+ func (* fakeDRPCStream )MsgSend (_ drpc.Message ,_ drpc.Encoding )error {
388
+ return nil
389
+ }
390
+
391
+ // Recv implements proto.DRPCTailnet_CoordinateClient.
392
+ func (f * fakeDRPCStream )Recv () (* proto.CoordinateResponse ,error ) {
393
+ <- f .ch
394
+ return & proto.CoordinateResponse {},nil
395
+ }
396
+
397
+ // Send implements proto.DRPCTailnet_CoordinateClient.
398
+ func (f * fakeDRPCStream )Send (* proto.CoordinateRequest )error {
399
+ <- f .ch
400
+ return nil
401
+ }
402
+
403
+ type fakeDRPPCMapStream struct {
404
+ fakeDRPCStream
405
+ }
406
+
407
+ var _ proto.DRPCTailnet_StreamDERPMapsClient = & fakeDRPPCMapStream {}
408
+
409
+ // Recv implements proto.DRPCTailnet_StreamDERPMapsClient.
410
+ func (f * fakeDRPPCMapStream )Recv () (* proto.DERPMap ,error ) {
411
+ <- f .fakeDRPCStream .ch
412
+ return & proto.DERPMap {},nil
413
+ }