Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4be37dc

Browse files
committed
tests
1 parente3711ef commit4be37dc

File tree

5 files changed

+265
-7
lines changed

5 files changed

+265
-7
lines changed

‎codersdk/workspacesdk/connector.go‎

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ type tailnetAPIConnector struct {
6363
coordinateURLstring
6464
dialOptions*websocket.DialOptions
6565
conntailnetConn
66+
customDialFnfunc() (proto.DRPCTailnetClient,error)
6667

6768
clientMu sync.RWMutex
6869
client proto.DRPCTailnetClient
@@ -71,9 +72,9 @@ type tailnetAPIConnector struct {
7172
isFirstbool
7273
closedchanstruct{}
7374

74-
//Set to true if we get a response from the server that it doesn't support
75+
//Only set to true if we get a response from the server that it doesn't support
7576
// network telemetry.
76-
telemetryDisabled atomic.Bool
77+
telemetryUnavailable atomic.Bool
7778
}
7879

7980
// Create a new tailnetAPIConnector without running it
@@ -133,6 +134,9 @@ var permanentErrorStatuses = []int{
133134
}
134135

135136
func (tac*tailnetAPIConnector)dial() (proto.DRPCTailnetClient,error) {
137+
iftac.customDialFn!=nil {
138+
returntac.customDialFn()
139+
}
136140
tac.logger.Debug(tac.ctx,"dialing Coder tailnet v2+ API")
137141
// nolint:bodyclose
138142
ws,res,err:=websocket.Dial(tac.ctx,tac.coordinateURL,tac.dialOptions)
@@ -277,7 +281,7 @@ func (tac *tailnetAPIConnector) SendTelemetryEvent(event *proto.TelemetryEvent)
277281
// We hold the lock for the entire telemetry request, but this would only block
278282
// a coordinate retry, and closing the connection.
279283
defertac.clientMu.RUnlock()
280-
iftac.client==nil||tac.telemetryDisabled.Load() {
284+
iftac.client==nil||tac.telemetryUnavailable.Load() {
281285
return
282286
}
283287
ctx,cancel:=context.WithTimeout(tac.ctx,5*time.Second)
@@ -287,6 +291,6 @@ func (tac *tailnetAPIConnector) SendTelemetryEvent(event *proto.TelemetryEvent)
287291
})
288292
ifdrpcerr.Code(err)==drpcerr.Unimplemented||drpc.ProtocolError.Has(err)&&strings.Contains(err.Error(),"unknown rpc: ") {
289293
tac.logger.Debug(tac.ctx,"attempted to send telemetry to a server that doesn't support it",slog.Error(err))
290-
tac.telemetryDisabled.Store(true)
294+
tac.telemetryUnavailable.Store(true)
291295
}
292296
}

‎codersdk/workspacesdk/connector_internal_test.go‎

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ import (
1313
"github.com/hashicorp/yamux"
1414
"github.com/stretchr/testify/assert"
1515
"github.com/stretchr/testify/require"
16+
"golang.org/x/xerrors"
1617
"nhooyr.io/websocket"
18+
"storj.io/drpc"
19+
"storj.io/drpc/drpcerr"
1720
"tailscale.com/tailcfg"
1821

1922
"cdr.dev/slog"
@@ -139,6 +142,140 @@ func TestTailnetAPIConnector_UplevelVersion(t *testing.T) {
139142
require.NotEmpty(t,sdkErr.Helper)
140143
}
141144

145+
funcTestTailnetAPIConnector_TelemetrySuccess(t*testing.T) {
146+
t.Parallel()
147+
ctx:=testutil.Context(t,testutil.WaitShort)
148+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
149+
agentID:= uuid.UUID{0x55}
150+
clientID:= uuid.UUID{0x66}
151+
fCoord:=tailnettest.NewFakeCoordinator()
152+
varcoord tailnet.Coordinator=fCoord
153+
coordPtr:= atomic.Pointer[tailnet.Coordinator]{}
154+
coordPtr.Store(&coord)
155+
derpMapCh:=make(chan*tailcfg.DERPMap)
156+
deferclose(derpMapCh)
157+
eventCh:=make(chan []*proto.TelemetryEvent,1)
158+
svc,err:=tailnet.NewClientService(tailnet.ClientServiceOptions{
159+
Logger:logger,
160+
CoordPtr:&coordPtr,
161+
DERPMapUpdateFrequency:time.Millisecond,
162+
DERPMapFn:func()*tailcfg.DERPMap {return<-derpMapCh },
163+
NetworkTelemetryHandler:func(batch []*proto.TelemetryEvent) {
164+
eventCh<-batch
165+
},
166+
})
167+
require.NoError(t,err)
168+
169+
svr:=httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter,r*http.Request) {
170+
sws,err:=websocket.Accept(w,r,nil)
171+
if!assert.NoError(t,err) {
172+
return
173+
}
174+
ctx,nc:=codersdk.WebsocketNetConn(r.Context(),sws,websocket.MessageBinary)
175+
err=svc.ServeConnV2(ctx,nc, tailnet.StreamID{
176+
Name:"client",
177+
ID:clientID,
178+
Auth: tailnet.ClientCoordinateeAuth{AgentID:agentID},
179+
})
180+
assert.NoError(t,err)
181+
}))
182+
183+
fConn:=newFakeTailnetConn()
184+
185+
uut:=newTailnetAPIConnector(ctx,logger,agentID,svr.URL,&websocket.DialOptions{})
186+
uut.runConnector(fConn)
187+
require.Eventually(t,func()bool {
188+
uut.clientMu.Lock()
189+
deferuut.clientMu.Unlock()
190+
returnuut.client!=nil
191+
},testutil.WaitShort,testutil.IntervalFast)
192+
193+
uut.SendTelemetryEvent(&proto.TelemetryEvent{
194+
Id: []byte("test event"),
195+
})
196+
197+
testEvents:=testutil.RequireRecvCtx(ctx,t,eventCh)
198+
199+
require.Len(t,testEvents,1)
200+
require.Equal(t, []byte("test event"),testEvents[0].Id)
201+
}
202+
203+
// Server doesn't support telemetry / server unimplemented telemetry
204+
205+
funcTestTailnetAPIConnector_TelemetryUnimplemented(t*testing.T) {
206+
t.Parallel()
207+
ctx:=testutil.Context(t,testutil.WaitShort)
208+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
209+
agentID:= uuid.UUID{0x55}
210+
fConn:=newFakeTailnetConn()
211+
212+
fakeDRPCClient:=newFakeDRPCClient()
213+
uut:=&tailnetAPIConnector{
214+
ctx:ctx,
215+
logger:logger,
216+
agentID:agentID,
217+
coordinateURL:"",
218+
dialOptions:&websocket.DialOptions{},
219+
conn:nil,
220+
connected:make(chanerror,1),
221+
closed:make(chanstruct{}),
222+
customDialFn:func() (proto.DRPCTailnetClient,error) {
223+
returnfakeDRPCClient,nil
224+
},
225+
}
226+
uut.runConnector(fConn)
227+
require.Eventually(t,func()bool {
228+
uut.clientMu.Lock()
229+
deferuut.clientMu.Unlock()
230+
returnuut.client!=nil
231+
},testutil.WaitShort,testutil.IntervalFast)
232+
233+
fakeDRPCClient.telemeteryErorr=drpcerr.WithCode(xerrors.New("Unimplemented"),0)
234+
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
235+
require.False(t,uut.telemetryUnavailable.Load())
236+
237+
fakeDRPCClient.telemeteryErorr=drpcerr.WithCode(xerrors.New("Unimplemented"),drpcerr.Unimplemented)
238+
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
239+
require.True(t,uut.telemetryUnavailable.Load())
240+
}
241+
242+
funcTestTailnetAPIConnector_TelemetryNotRecognised(t*testing.T) {
243+
t.Parallel()
244+
ctx:=testutil.Context(t,testutil.WaitShort)
245+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
246+
agentID:= uuid.UUID{0x55}
247+
fConn:=newFakeTailnetConn()
248+
249+
fakeDRPCClient:=newFakeDRPCClient()
250+
uut:=&tailnetAPIConnector{
251+
ctx:ctx,
252+
logger:logger,
253+
agentID:agentID,
254+
coordinateURL:"",
255+
dialOptions:&websocket.DialOptions{},
256+
conn:nil,
257+
connected:make(chanerror,1),
258+
closed:make(chanstruct{}),
259+
customDialFn:func() (proto.DRPCTailnetClient,error) {
260+
returnfakeDRPCClient,nil
261+
},
262+
}
263+
uut.runConnector(fConn)
264+
require.Eventually(t,func()bool {
265+
uut.clientMu.Lock()
266+
deferuut.clientMu.Unlock()
267+
returnuut.client!=nil
268+
},testutil.WaitShort,testutil.IntervalFast)
269+
270+
fakeDRPCClient.telemeteryErorr=drpc.ProtocolError.New("Protocol Error")
271+
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
272+
require.False(t,uut.telemetryUnavailable.Load())
273+
274+
fakeDRPCClient.telemeteryErorr=drpc.ProtocolError.New("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry")
275+
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
276+
require.True(t,uut.telemetryUnavailable.Load())
277+
}
278+
142279
typefakeTailnetConnstruct{}
143280

144281
func (*fakeTailnetConn)UpdatePeers([]*proto.CoordinateResponse_PeerUpdate)error {
@@ -157,3 +294,120 @@ func (*fakeTailnetConn) SetTunnelDestination(uuid.UUID) {}
157294
funcnewFakeTailnetConn()*fakeTailnetConn {
158295
return&fakeTailnetConn{}
159296
}
297+
298+
typefakeDRPCClientstruct {
299+
telemeteryErorrerror
300+
fakeDRPPCMapStream
301+
}
302+
303+
var_ proto.DRPCTailnetClient=&fakeDRPCClient{}
304+
305+
funcnewFakeDRPCClient()*fakeDRPCClient {
306+
return&fakeDRPCClient{
307+
fakeDRPPCMapStream:fakeDRPPCMapStream{
308+
fakeDRPCStream:fakeDRPCStream{
309+
ch:make(chanstruct{}),
310+
},
311+
},
312+
}
313+
}
314+
315+
// Coordinate implements proto.DRPCTailnetClient.
316+
func (f*fakeDRPCClient)Coordinate(_ context.Context) (proto.DRPCTailnet_CoordinateClient,error) {
317+
return&f.fakeDRPCStream,nil
318+
}
319+
320+
// DRPCConn implements proto.DRPCTailnetClient.
321+
func (*fakeDRPCClient)DRPCConn() drpc.Conn {
322+
return&fakeDRPCConn{}
323+
}
324+
325+
// PostTelemetry implements proto.DRPCTailnetClient.
326+
func (f*fakeDRPCClient)PostTelemetry(_ context.Context,in*proto.TelemetryRequest) (*proto.TelemetryResponse,error) {
327+
returnnil,f.telemeteryErorr
328+
}
329+
330+
// StreamDERPMaps implements proto.DRPCTailnetClient.
331+
func (f*fakeDRPCClient)StreamDERPMaps(_ context.Context,_*proto.StreamDERPMapsRequest) (proto.DRPCTailnet_StreamDERPMapsClient,error) {
332+
return&f.fakeDRPPCMapStream,nil
333+
}
334+
335+
typefakeDRPCConnstruct{}
336+
337+
var_ drpc.Conn=&fakeDRPCConn{}
338+
339+
// Close implements drpc.Conn.
340+
func (*fakeDRPCConn)Close()error {
341+
returnnil
342+
}
343+
344+
// Closed implements drpc.Conn.
345+
func (*fakeDRPCConn)Closed()<-chanstruct{} {
346+
returnnil
347+
}
348+
349+
// Invoke implements drpc.Conn.
350+
func (*fakeDRPCConn)Invoke(_ context.Context,_string,_ drpc.Encoding,_ drpc.Message,_ drpc.Message)error {
351+
returnnil
352+
}
353+
354+
// NewStream implements drpc.Conn.
355+
func (*fakeDRPCConn)NewStream(_ context.Context,_string,_ drpc.Encoding) (drpc.Stream,error) {
356+
returnnil,nil
357+
}
358+
359+
typefakeDRPCStreamstruct {
360+
chchanstruct{}
361+
}
362+
363+
var_ proto.DRPCTailnet_CoordinateClient=&fakeDRPCStream{}
364+
365+
// Close implements proto.DRPCTailnet_CoordinateClient.
366+
func (f*fakeDRPCStream)Close()error {
367+
close(f.ch)
368+
returnnil
369+
}
370+
371+
// CloseSend implements proto.DRPCTailnet_CoordinateClient.
372+
func (*fakeDRPCStream)CloseSend()error {
373+
returnnil
374+
}
375+
376+
// Context implements proto.DRPCTailnet_CoordinateClient.
377+
func (*fakeDRPCStream)Context() context.Context {
378+
returnnil
379+
}
380+
381+
// MsgRecv implements proto.DRPCTailnet_CoordinateClient.
382+
func (*fakeDRPCStream)MsgRecv(_ drpc.Message,_ drpc.Encoding)error {
383+
returnnil
384+
}
385+
386+
// MsgSend implements proto.DRPCTailnet_CoordinateClient.
387+
func (*fakeDRPCStream)MsgSend(_ drpc.Message,_ drpc.Encoding)error {
388+
returnnil
389+
}
390+
391+
// Recv implements proto.DRPCTailnet_CoordinateClient.
392+
func (f*fakeDRPCStream)Recv() (*proto.CoordinateResponse,error) {
393+
<-f.ch
394+
return&proto.CoordinateResponse{},nil
395+
}
396+
397+
// Send implements proto.DRPCTailnet_CoordinateClient.
398+
func (f*fakeDRPCStream)Send(*proto.CoordinateRequest)error {
399+
<-f.ch
400+
returnnil
401+
}
402+
403+
typefakeDRPPCMapStreamstruct {
404+
fakeDRPCStream
405+
}
406+
407+
var_ proto.DRPCTailnet_StreamDERPMapsClient=&fakeDRPPCMapStream{}
408+
409+
// Recv implements proto.DRPCTailnet_StreamDERPMapsClient.
410+
func (f*fakeDRPPCMapStream)Recv() (*proto.DERPMap,error) {
411+
<-f.fakeDRPCStream.ch
412+
return&proto.DERPMap{},nil
413+
}

‎tailnet/conn.go‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func NewConn(options *Options) (conn *Conn, err error) {
144144
varerrerror
145145
telemetryStore,err=newTelemetryStore()
146146
iferr!=nil {
147-
returnnil,xerrors.Errorf("create telemetry logsink: %w",err)
147+
returnnil,xerrors.Errorf("create telemetry logstore: %w",err)
148148
}
149149
logger=logger.appendLogger(slog.Make(telemetryStore).Leveled(slog.LevelDebug))
150150
}

‎tailnet/telemetry.go‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ func (m multiLogger) With(fields ...slog.Field) multiLogger {
8686
returnmultiLogger{loggers:loggers}
8787
}
8888

89-
//A logger sink that extracts (anonymized) IP addresses from logs for building
90-
//network telemetry events
89+
//Responsible for storing and anonymizing networking telemetry state.
90+
//Implements slog.Sink and io.Writer to store logs from `tailscale`.
9191
typeTelemetryStorestruct {
9292
// Always self-referential
9393
sink slog.Sink
File renamed without changes.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp