Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd257f81

Browse files
authored
feat: implement DERP streaming on tailnet Client API (#11302)
Implements DERPMap streaming from client API.In a subsequent PR I plan to remove the implementation in coderd/agentapi in favor of the tailnet one
1 parent055a160 commitd257f81

File tree

3 files changed

+72
-19
lines changed

3 files changed

+72
-19
lines changed

‎coderd/coderd.go‎

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,11 @@ func New(options *Options) *API {
479479
}
480480
}
481481
api.TailnetClientService,err=tailnet.NewClientService(
482-
api.Logger.Named("tailnetclient"),&api.TailnetCoordinator)
482+
api.Logger.Named("tailnetclient"),
483+
&api.TailnetCoordinator,
484+
api.Options.DERPMapUpdateFrequency,
485+
api.DERPMap,
486+
)
483487
iferr!=nil {
484488
api.Logger.Fatal(api.ctx,"failed to initialize tailnet client service",slog.Error(err))
485489
}

‎tailnet/service.go‎

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ import (
77
"strconv"
88
"strings"
99
"sync/atomic"
10+
"time"
1011

1112
"github.com/google/uuid"
1213
"github.com/hashicorp/yamux"
1314
"storj.io/drpc/drpcmux"
1415
"storj.io/drpc/drpcserver"
16+
"tailscale.com/tailcfg"
1517

1618
"cdr.dev/slog"
1719
"github.com/coder/coder/v2/tailnet/proto"
@@ -92,10 +94,22 @@ type ClientService struct {
9294

9395
// NewClientService returns a ClientService based on the given Coordinator pointer. The pointer is
9496
// loaded on each processed connection.
95-
funcNewClientService(logger slog.Logger,coordPtr*atomic.Pointer[Coordinator]) (*ClientService,error) {
97+
funcNewClientService(
98+
logger slog.Logger,
99+
coordPtr*atomic.Pointer[Coordinator],
100+
derpMapUpdateFrequency time.Duration,
101+
derpMapFnfunc()*tailcfg.DERPMap,
102+
) (
103+
*ClientService,error,
104+
) {
96105
s:=&ClientService{logger:logger,coordPtr:coordPtr}
97106
mux:=drpcmux.New()
98-
drpcService:=NewDRPCService(logger,coordPtr)
107+
drpcService:=&DRPCService{
108+
CoordPtr:coordPtr,
109+
Logger:logger,
110+
DerpMapUpdateFrequency:derpMapUpdateFrequency,
111+
DerpMapFn:derpMapFn,
112+
}
99113
err:=proto.DRPCRegisterClient(mux,drpcService)
100114
iferr!=nil {
101115
returnnil,xerrors.Errorf("register DRPC service: %w",err)
@@ -145,20 +159,37 @@ func (s *ClientService) ServeClient(ctx context.Context, version string, conn ne
145159

146160
// DRPCService is the dRPC-based, version 2.x of the tailnet API and implements proto.DRPCClientServer
147161
typeDRPCServicestruct {
148-
coordPtr*atomic.Pointer[Coordinator]
149-
logger slog.Logger
162+
CoordPtr*atomic.Pointer[Coordinator]
163+
Logger slog.Logger
164+
DerpMapUpdateFrequency time.Duration
165+
DerpMapFnfunc()*tailcfg.DERPMap
150166
}
151167

152-
funcNewDRPCService(logger slog.Logger,coordPtr*atomic.Pointer[Coordinator])*DRPCService {
153-
return&DRPCService{
154-
coordPtr:coordPtr,
155-
logger:logger,
156-
}
157-
}
168+
func (s*DRPCService)StreamDERPMaps(_*proto.StreamDERPMapsRequest,stream proto.DRPCClient_StreamDERPMapsStream)error {
169+
deferstream.Close()
170+
171+
ticker:=time.NewTicker(s.DerpMapUpdateFrequency)
172+
deferticker.Stop()
158173

159-
func (*DRPCService)StreamDERPMaps(*proto.StreamDERPMapsRequest, proto.DRPCClient_StreamDERPMapsStream)error {
160-
// TODO integrate with Dean's PR implementation
161-
returnxerrors.New("unimplemented")
174+
varlastDERPMap*tailcfg.DERPMap
175+
for {
176+
derpMap:=s.DerpMapFn()
177+
iflastDERPMap==nil||!CompareDERPMaps(lastDERPMap,derpMap) {
178+
protoDERPMap:=DERPMapToProto(derpMap)
179+
err:=stream.Send(protoDERPMap)
180+
iferr!=nil {
181+
returnxerrors.Errorf("send derp map: %w",err)
182+
}
183+
lastDERPMap=derpMap
184+
}
185+
186+
ticker.Reset(s.DerpMapUpdateFrequency)
187+
select {
188+
case<-stream.Context().Done():
189+
returnnil
190+
case<-ticker.C:
191+
}
192+
}
162193
}
163194

164195
func (s*DRPCService)CoordinateTailnet(stream proto.DRPCClient_CoordinateTailnetStream)error {
@@ -168,9 +199,9 @@ func (s *DRPCService) CoordinateTailnet(stream proto.DRPCClient_CoordinateTailne
168199
_=stream.Close()
169200
returnxerrors.New("no Stream ID")
170201
}
171-
logger:=s.logger.With(slog.F("peer_id",streamID),slog.F("name",streamID.Name))
202+
logger:=s.Logger.With(slog.F("peer_id",streamID),slog.F("name",streamID.Name))
172203
logger.Debug(ctx,"starting tailnet Coordinate")
173-
coord:=*(s.coordPtr.Load())
204+
coord:=*(s.CoordPtr.Load())
174205
reqs,resps:=coord.Coordinate(ctx,streamID.ID,streamID.Name,streamID.Auth)
175206
c:=communicator{
176207
logger:logger,

‎tailnet/service_test.go‎

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import (
88
"net/http"
99
"sync/atomic"
1010
"testing"
11+
"time"
1112

1213
"golang.org/x/xerrors"
14+
"tailscale.com/tailcfg"
1315

1416
"github.com/google/uuid"
1517

@@ -94,7 +96,11 @@ func TestClientService_ServeClient_V2(t *testing.T) {
9496
coordPtr:= atomic.Pointer[tailnet.Coordinator]{}
9597
coordPtr.Store(&coord)
9698
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
97-
uut,err:=tailnet.NewClientService(logger,&coordPtr)
99+
derpMap:=&tailcfg.DERPMap{Regions:map[int]*tailcfg.DERPRegion{999: {RegionCode:"test"}}}
100+
uut,err:=tailnet.NewClientService(
101+
logger,&coordPtr,
102+
time.Millisecond,func()*tailcfg.DERPMap {returnderpMap },
103+
)
98104
require.NoError(t,err)
99105

100106
ctx:=testutil.Context(t,testutil.WaitShort)
@@ -112,6 +118,8 @@ func TestClientService_ServeClient_V2(t *testing.T) {
112118

113119
client,err:=tailnet.NewDRPCClient(c)
114120
require.NoError(t,err)
121+
122+
// Coordinate
115123
stream,err:=client.CoordinateTailnet(ctx)
116124
require.NoError(t,err)
117125
deferstream.Close()
@@ -145,7 +153,17 @@ func TestClientService_ServeClient_V2(t *testing.T) {
145153
err=stream.Close()
146154
require.NoError(t,err)
147155

148-
// stream ^^ is just one RPC; we need to close the Conn to end the session.
156+
// DERP Map
157+
dms,err:=client.StreamDERPMaps(ctx,&proto.StreamDERPMapsRequest{})
158+
require.NoError(t,err)
159+
160+
gotDermMap,err:=dms.Recv()
161+
require.NoError(t,err)
162+
require.Equal(t,"test",gotDermMap.GetRegions()[999].GetRegionCode())
163+
err=dms.Close()
164+
require.NoError(t,err)
165+
166+
// RPCs closed; we need to close the Conn to end the session.
149167
err=c.Close()
150168
require.NoError(t,err)
151169
err=testutil.RequireRecvCtx(ctx,t,errCh)
@@ -159,7 +177,7 @@ func TestClientService_ServeClient_V1(t *testing.T) {
159177
coordPtr:= atomic.Pointer[tailnet.Coordinator]{}
160178
coordPtr.Store(&coord)
161179
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
162-
uut,err:=tailnet.NewClientService(logger,&coordPtr)
180+
uut,err:=tailnet.NewClientService(logger,&coordPtr,0,nil)
163181
require.NoError(t,err)
164182

165183
ctx:=testutil.Context(t,testutil.WaitShort)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp