Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit409c522

Browse files
committed
chore: refactor DERP setting loop
1 parentf0a4de5 commit409c522

File tree

5 files changed

+183
-22
lines changed

5 files changed

+183
-22
lines changed

‎codersdk/workspacesdk/connector.go

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"nhooyr.io/websocket"
1919
"storj.io/drpc"
2020
"storj.io/drpc/drpcerr"
21-
"tailscale.com/tailcfg"
2221

2322
"cdr.dev/slog"
2423
"github.com/coder/coder/v2/buildinfo"
@@ -37,7 +36,7 @@ var tailnetConnectorGracefulTimeout = time.Second
3736
// @typescript-ignore tailnetConn
3837
typetailnetConninterface {
3938
tailnet.Coordinatee
40-
SetDERPMap(derpMap*tailcfg.DERPMap)
39+
tailnet.DERPMapSetter
4140
}
4241

4342
// tailnetAPIConnector dials the tailnet API (v2+) and then uses the API with a tailnet.Conn to
@@ -65,7 +64,7 @@ type tailnetAPIConnector struct {
6564
coordinateURLstring
6665
clock quartz.Clock
6766
dialOptions*websocket.DialOptions
68-
conntailnetConn
67+
derpCtrltailnet.DERPController
6968
coordCtrl tailnet.CoordinationController
7069
customDialFnfunc() (proto.DRPCTailnetClient,error)
7170

@@ -91,7 +90,6 @@ func newTailnetAPIConnector(ctx context.Context, logger slog.Logger, agentID uui
9190
coordinateURL:coordinateURL,
9291
clock:clock,
9392
dialOptions:dialOptions,
94-
conn:nil,
9593
connected:make(chanerror,1),
9694
closed:make(chanstruct{}),
9795
}
@@ -112,7 +110,7 @@ func (tac *tailnetAPIConnector) manageGracefulTimeout() {
112110

113111
// Runs a tailnetAPIConnector using the provided connection
114112
func (tac*tailnetAPIConnector)runConnector(conntailnetConn) {
115-
tac.conn=conn
113+
tac.derpCtrl=tailnet.NewBasicDERPController(tac.logger,conn)
116114
tac.coordCtrl=tailnet.NewSingleDestController(tac.logger,conn,tac.agentID)
117115
tac.gracefulCtx,tac.cancelGracefulCtx=context.WithCancel(context.Background())
118116
gotac.manageGracefulTimeout()
@@ -294,7 +292,9 @@ func (tac *tailnetAPIConnector) coordinate(client proto.DRPCTailnetClient) {
294292
}
295293

296294
func (tac*tailnetAPIConnector)derpMap(client proto.DRPCTailnetClient)error {
297-
s,err:=client.StreamDERPMaps(tac.ctx,&proto.StreamDERPMapsRequest{})
295+
s:=&tailnet.DERPFromDRPCWrapper{}
296+
varerrerror
297+
s.Client,err=client.StreamDERPMaps(tac.ctx,&proto.StreamDERPMapsRequest{})
298298
iferr!=nil {
299299
returnxerrors.Errorf("failed to connect to StreamDERPMaps RPC: %w",err)
300300
}
@@ -304,21 +304,15 @@ func (tac *tailnetAPIConnector) derpMap(client proto.DRPCTailnetClient) error {
304304
tac.logger.Debug(tac.ctx,"error closing StreamDERPMaps RPC",slog.Error(cErr))
305305
}
306306
}()
307-
for {
308-
dmp,err:=s.Recv()
309-
iferr!=nil {
310-
ifxerrors.Is(err,context.Canceled)||xerrors.Is(err,context.DeadlineExceeded) {
311-
returnnil
312-
}
313-
if!xerrors.Is(err,io.EOF) {
314-
tac.logger.Error(tac.ctx,"error receiving DERP Map",slog.Error(err))
315-
}
316-
returnerr
317-
}
318-
tac.logger.Debug(tac.ctx,"got new DERP Map",slog.F("derp_map",dmp))
319-
dm:=tailnet.DERPMapFromProto(dmp)
320-
tac.conn.SetDERPMap(dm)
307+
cw:=tac.derpCtrl.New(s)
308+
err=<-cw.Wait()
309+
ifxerrors.Is(err,context.Canceled)||xerrors.Is(err,context.DeadlineExceeded) {
310+
returnnil
311+
}
312+
iferr!=nil&&!xerrors.Is(err,io.EOF) {
313+
tac.logger.Error(tac.ctx,"error receiving DERP Map",slog.Error(err))
321314
}
315+
returnerr
322316
}
323317

324318
func (tac*tailnetAPIConnector)refreshToken(ctx context.Context,client proto.DRPCTailnetClient) {

‎codersdk/workspacesdk/connector_internal_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,6 @@ func TestTailnetAPIConnector_TelemetryUnimplemented(t *testing.T) {
440440
coordinateURL:"",
441441
clock:quartz.NewReal(),
442442
dialOptions:&websocket.DialOptions{},
443-
conn:nil,
444443
connected:make(chanerror,1),
445444
closed:make(chanstruct{}),
446445
customDialFn:func() (proto.DRPCTailnetClient,error) {
@@ -481,7 +480,6 @@ func TestTailnetAPIConnector_TelemetryNotRecognised(t *testing.T) {
481480
coordinateURL:"",
482481
clock:quartz.NewReal(),
483482
dialOptions:&websocket.DialOptions{},
484-
conn:nil,
485483
connected:make(chanerror,1),
486484
closed:make(chanstruct{}),
487485
customDialFn:func() (proto.DRPCTailnetClient,error) {

‎tailnet/controllers.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,3 +359,84 @@ func NewInMemoryCoordinatorClient(
359359
)
360360
returnc
361361
}
362+
363+
typeDERPMapSetterinterface {
364+
SetDERPMap(derpMap*tailcfg.DERPMap)
365+
}
366+
367+
typebasicDERPControllerstruct {
368+
logger slog.Logger
369+
setterDERPMapSetter
370+
}
371+
372+
func (b*basicDERPController)New(clientDERPClient)CloserWaiter {
373+
l:=&derpSetLoop{
374+
logger:b.logger,
375+
setter:b.setter,
376+
client:client,
377+
errChan:make(chanerror,1),
378+
recvLoopDone:make(chanstruct{}),
379+
}
380+
gol.recvLoop()
381+
returnl
382+
}
383+
384+
funcNewBasicDERPController(logger slog.Logger,setterDERPMapSetter)DERPController {
385+
return&basicDERPController{
386+
logger:logger,
387+
setter:setter,
388+
}
389+
}
390+
391+
typederpSetLoopstruct {
392+
logger slog.Logger
393+
setterDERPMapSetter
394+
clientDERPClient
395+
396+
sync.Mutex
397+
closedbool
398+
errChanchanerror
399+
recvLoopDonechanstruct{}
400+
}
401+
402+
func (l*derpSetLoop)Close(ctx context.Context)error {
403+
l.Lock()
404+
deferl.Unlock()
405+
ifl.closed {
406+
select {
407+
case<-ctx.Done():
408+
returnctx.Err()
409+
case<-l.recvLoopDone:
410+
returnnil
411+
}
412+
}
413+
l.closed=true
414+
cErr:=l.client.Close()
415+
select {
416+
case<-ctx.Done():
417+
returnctx.Err()
418+
case<-l.recvLoopDone:
419+
returncErr
420+
}
421+
}
422+
423+
func (l*derpSetLoop)Wait()<-chanerror {
424+
returnl.errChan
425+
}
426+
427+
func (l*derpSetLoop)recvLoop() {
428+
deferclose(l.recvLoopDone)
429+
for {
430+
dm,err:=l.client.Recv()
431+
iferr!=nil {
432+
l.logger.Debug(context.Background(),"failed to receive DERP message",slog.Error(err))
433+
select {
434+
casel.errChan<-err:
435+
default:
436+
}
437+
return
438+
}
439+
l.logger.Debug(context.Background(),"got new DERP Map",slog.F("derp_map",dm))
440+
l.setter.SetDERPMap(dm)
441+
}
442+
}

‎tailnet/controllers_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/google/uuid"
1313
"github.com/stretchr/testify/require"
1414
"go.uber.org/mock/gomock"
15+
"golang.org/x/xerrors"
1516
"tailscale.com/tailcfg"
1617
"tailscale.com/types/key"
1718

@@ -281,3 +282,72 @@ func (f *fakeCoordinatee) SetNodeCallback(callback func(*tailnet.Node)) {
281282
deferf.Unlock()
282283
f.callback=callback
283284
}
285+
286+
funcTestNewBasicDERPController_Mainline(t*testing.T) {
287+
t.Parallel()
288+
fs:=make(chan*tailcfg.DERPMap)
289+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
290+
uut:=tailnet.NewBasicDERPController(logger,fakeSetter(fs))
291+
fc:=fakeDERPClient{
292+
ch:make(chan*tailcfg.DERPMap),
293+
}
294+
c:=uut.New(fc)
295+
ctx:=testutil.Context(t,testutil.WaitShort)
296+
expectDM:=&tailcfg.DERPMap{}
297+
testutil.RequireSendCtx(ctx,t,fc.ch,expectDM)
298+
gotDM:=testutil.RequireRecvCtx(ctx,t,fs)
299+
require.Equal(t,expectDM,gotDM)
300+
err:=c.Close(ctx)
301+
require.NoError(t,err)
302+
err=testutil.RequireRecvCtx(ctx,t,c.Wait())
303+
require.ErrorIs(t,err,io.EOF)
304+
// ensure Close is idempotent
305+
err=c.Close(ctx)
306+
require.NoError(t,err)
307+
}
308+
309+
funcTestNewBasicDERPController_RecvErr(t*testing.T) {
310+
t.Parallel()
311+
fs:=make(chan*tailcfg.DERPMap)
312+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
313+
uut:=tailnet.NewBasicDERPController(logger,fakeSetter(fs))
314+
expectedErr:=xerrors.New("a bad thing happened")
315+
fc:=fakeDERPClient{
316+
ch:make(chan*tailcfg.DERPMap),
317+
err:expectedErr,
318+
}
319+
c:=uut.New(fc)
320+
ctx:=testutil.Context(t,testutil.WaitShort)
321+
err:=testutil.RequireRecvCtx(ctx,t,c.Wait())
322+
require.ErrorIs(t,err,expectedErr)
323+
// ensure Close is idempotent
324+
err=c.Close(ctx)
325+
require.NoError(t,err)
326+
}
327+
328+
typefakeSetterchan*tailcfg.DERPMap
329+
330+
func (sfakeSetter)SetDERPMap(derpMap*tailcfg.DERPMap) {
331+
s<-derpMap
332+
}
333+
334+
typefakeDERPClientstruct {
335+
chchan*tailcfg.DERPMap
336+
errerror
337+
}
338+
339+
func (ffakeDERPClient)Close()error {
340+
close(f.ch)
341+
returnnil
342+
}
343+
344+
func (ffakeDERPClient)Recv() (*tailcfg.DERPMap,error) {
345+
iff.err!=nil {
346+
returnnil,f.err
347+
}
348+
dm,ok:=<-f.ch
349+
ifok {
350+
returndm,nil
351+
}
352+
returnnil,io.EOF
353+
}

‎tailnet/convert.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,3 +298,21 @@ func WorkspaceStatusToProto(status codersdk.WorkspaceStatus) proto.Workspace_Sta
298298
returnproto.Workspace_UNKNOWN
299299
}
300300
}
301+
302+
typeDERPFromDRPCWrapperstruct {
303+
Client proto.DRPCTailnet_StreamDERPMapsClient
304+
}
305+
306+
func (w*DERPFromDRPCWrapper)Close()error {
307+
returnw.Client.Close()
308+
}
309+
310+
func (w*DERPFromDRPCWrapper)Recv() (*tailcfg.DERPMap,error) {
311+
p,err:=w.Client.Recv()
312+
iferr!=nil {
313+
returnnil,err
314+
}
315+
returnDERPMapFromProto(p),nil
316+
}
317+
318+
var_DERPClient=&DERPFromDRPCWrapper{}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp