Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitcc0751d

Browse files
committed
feat: agent uses Tailnet v2 API for DERPMap updates
1 parent3e0e7f8 commitcc0751d

File tree

8 files changed

+106
-199
lines changed

8 files changed

+106
-199
lines changed

‎agent/agent.go

Lines changed: 34 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ type Options struct {
8989
typeClientinterface {
9090
Manifest(ctx context.Context) (agentsdk.Manifest,error)
9191
Listen(ctx context.Context) (drpc.Conn,error)
92-
DERPMapUpdates(ctx context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer,error)
9392
ReportStats(ctx context.Context,log slog.Logger,statsChan<-chan*agentsdk.Stats,setIntervalfunc(time.Duration)) (io.Closer,error)
9493
PostLifecycle(ctx context.Context,state agentsdk.PostLifecycleRequest)error
9594
PostAppHealth(ctx context.Context,req agentsdk.PostAppHealthsRequest)error
@@ -822,10 +821,22 @@ func (a *agent) run(ctx context.Context) error {
822821
network.SetBlockEndpoints(manifest.DisableDirectConnections)
823822
}
824823

824+
// Listen returns the dRPC connection we use for both Coordinator and DERPMap updates
825+
conn,err:=a.client.Listen(ctx)
826+
iferr!=nil {
827+
returnerr
828+
}
829+
deferfunc() {
830+
cErr:=conn.Close()
831+
ifcErr!=nil {
832+
a.logger.Debug(ctx,"error closing drpc connection",slog.Error(err))
833+
}
834+
}()
835+
825836
eg,egCtx:=errgroup.WithContext(ctx)
826837
eg.Go(func()error {
827838
a.logger.Debug(egCtx,"running tailnet connection coordinator")
828-
err:=a.runCoordinator(egCtx,network)
839+
err:=a.runCoordinator(egCtx,conn,network)
829840
iferr!=nil {
830841
returnxerrors.Errorf("run coordinator: %w",err)
831842
}
@@ -834,7 +845,7 @@ func (a *agent) run(ctx context.Context) error {
834845

835846
eg.Go(func()error {
836847
a.logger.Debug(egCtx,"running derp map subscriber")
837-
err:=a.runDERPMapSubscriber(egCtx,network)
848+
err:=a.runDERPMapSubscriber(egCtx,conn,network)
838849
iferr!=nil {
839850
returnxerrors.Errorf("run derp map subscriber: %w",err)
840851
}
@@ -1056,21 +1067,8 @@ func (a *agent) createTailnet(ctx context.Context, agentID uuid.UUID, derpMap *t
10561067

10571068
// runCoordinator runs a coordinator and returns whether a reconnect
10581069
// should occur.
1059-
func (a*agent)runCoordinator(ctx context.Context,network*tailnet.Conn)error {
1060-
ctx,cancel:=context.WithCancel(ctx)
1061-
defercancel()
1062-
1063-
conn,err:=a.client.Listen(ctx)
1064-
iferr!=nil {
1065-
returnerr
1066-
}
1067-
deferfunc() {
1068-
cErr:=conn.Close()
1069-
ifcErr!=nil {
1070-
a.logger.Debug(ctx,"error closing drpc connection",slog.Error(err))
1071-
}
1072-
}()
1073-
1070+
func (a*agent)runCoordinator(ctx context.Context,conn drpc.Conn,network*tailnet.Conn)error {
1071+
defera.logger.Debug(ctx,"disconnected from coordination RPC")
10741072
tClient:=tailnetproto.NewDRPCTailnetClient(conn)
10751073
coordinate,err:=tClient.Coordinate(ctx)
10761074
iferr!=nil {
@@ -1082,7 +1080,7 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
10821080
a.logger.Debug(ctx,"error closing Coordinate client",slog.Error(err))
10831081
}
10841082
}()
1085-
a.logger.Info(ctx,"connected to coordinationendpoint")
1083+
a.logger.Info(ctx,"connected to coordinationRPC")
10861084
coordination:=tailnet.NewRemoteCoordination(a.logger,coordinate,network,uuid.Nil)
10871085
select {
10881086
case<-ctx.Done():
@@ -1093,30 +1091,29 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
10931091
}
10941092

10951093
// runDERPMapSubscriber runs a coordinator and returns if a reconnect should occur.
1096-
func (a*agent)runDERPMapSubscriber(ctx context.Context,network*tailnet.Conn)error {
1094+
func (a*agent)runDERPMapSubscriber(ctx context.Context,conn drpc.Conn,network*tailnet.Conn)error {
1095+
defera.logger.Debug(ctx,"disconnected from derp map RPC")
10971096
ctx,cancel:=context.WithCancel(ctx)
10981097
defercancel()
1099-
1100-
updates,closer,err:=a.client.DERPMapUpdates(ctx)
1098+
tClient:=tailnetproto.NewDRPCTailnetClient(conn)
1099+
stream,err:=tClient.StreamDERPMaps(ctx,&tailnetproto.StreamDERPMapsRequest{})
11011100
iferr!=nil {
1102-
returnerr
1101+
returnxerrors.Errorf("stream DERP Maps: %w",err)
11031102
}
1104-
defercloser.Close()
1105-
1106-
a.logger.Info(ctx,"connected to derp map endpoint")
1103+
deferfunc() {
1104+
cErr:=stream.Close()
1105+
ifcErr!=nil {
1106+
a.logger.Debug(ctx,"error closing DERPMap stream",slog.Error(err))
1107+
}
1108+
}()
1109+
a.logger.Info(ctx,"connected to derp map RPC")
11071110
for {
1108-
select {
1109-
case<-ctx.Done():
1110-
returnctx.Err()
1111-
caseupdate:=<-updates:
1112-
ifupdate.Err!=nil {
1113-
returnupdate.Err
1114-
}
1115-
ifupdate.DERPMap!=nil&&!tailnet.CompareDERPMaps(network.DERPMap(),update.DERPMap) {
1116-
a.logger.Info(ctx,"updating derp map due to detected changes")
1117-
network.SetDERPMap(update.DERPMap)
1118-
}
1111+
dmp,err:=stream.Recv()
1112+
iferr!=nil {
1113+
returnxerrors.Errorf("recv DERPMap error: %w",err)
11191114
}
1115+
dm:=tailnet.DERPMapFromProto(dmp)
1116+
network.SetDERPMap(dm)
11201117
}
11211118
}
11221119

‎agent/agent_test.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1349,6 +1349,7 @@ func TestAgent_Lifecycle(t *testing.T) {
13491349
make(chan*agentsdk.Stats,50),
13501350
tailnet.NewCoordinator(logger),
13511351
)
1352+
deferclient.Close()
13521353

13531354
fs:=afero.NewMemMapFs()
13541355
agent:=agent.New(agent.Options{
@@ -1683,13 +1684,18 @@ func TestAgent_UpdatedDERP(t *testing.T) {
16831684
statsCh,
16841685
coordinator,
16851686
)
1687+
t.Cleanup(func() {
1688+
t.Log("closing client")
1689+
client.Close()
1690+
})
16861691
uut:=agent.New(agent.Options{
16871692
Client:client,
16881693
Filesystem:fs,
16891694
Logger:logger.Named("agent"),
16901695
ReconnectingPTYTimeout:time.Minute,
16911696
})
16921697
t.Cleanup(func() {
1698+
t.Log("closing agent")
16931699
_=uut.Close()
16941700
})
16951701

@@ -1718,6 +1724,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17181724
iferr!=nil {
17191725
t.Logf("error closing in-memory coordination: %s",err.Error())
17201726
}
1727+
t.Logf("closed coordination %s",name)
17211728
})
17221729
// Force DERP.
17231730
conn.SetBlockEndpoints(true)
@@ -1753,11 +1760,9 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17531760
}
17541761

17551762
// Push a new DERP map to the agent.
1756-
err:=client.PushDERPMapUpdate(agentsdk.DERPMapUpdate{
1757-
DERPMap:newDerpMap,
1758-
})
1763+
err:=client.PushDERPMapUpdate(newDerpMap)
17591764
require.NoError(t,err)
1760-
t.Logf("client PushedDERPMap update")
1765+
t.Logf("pushedDERPMap update to agent")
17611766

17621767
require.Eventually(t,func()bool {
17631768
conn:=uut.TailnetConn()
@@ -1826,6 +1831,7 @@ func TestAgent_Reconnect(t *testing.T) {
18261831
statsCh,
18271832
coordinator,
18281833
)
1834+
deferclient.Close()
18291835
initialized:= atomic.Int32{}
18301836
closer:=agent.New(agent.Options{
18311837
ExchangeToken:func(ctx context.Context) (string,error) {
@@ -1862,6 +1868,7 @@ func TestAgent_WriteVSCodeConfigs(t *testing.T) {
18621868
make(chan*agentsdk.Stats,50),
18631869
coordinator,
18641870
)
1871+
deferclient.Close()
18651872
filesystem:=afero.NewMemMapFs()
18661873
closer:=agent.New(agent.Options{
18671874
ExchangeToken:func(ctx context.Context) (string,error) {
@@ -2039,6 +2046,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
20392046
statsCh:=make(chan*agentsdk.Stats,50)
20402047
fs:=afero.NewMemMapFs()
20412048
c:=agenttest.NewClient(t,logger.Named("agent"),metadata.AgentID,metadata,statsCh,coordinator)
2049+
t.Cleanup(c.Close)
20422050

20432051
options:= agent.Options{
20442052
Client:c,

‎agent/agenttest/client.go

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ func NewClient(t testing.TB,
3939
coordPtr:= atomic.Pointer[tailnet.Coordinator]{}
4040
coordPtr.Store(&coordinator)
4141
mux:=drpcmux.New()
42+
derpMapUpdates:=make(chan*tailcfg.DERPMap)
4243
drpcService:=&tailnet.DRPCService{
43-
CoordPtr:&coordPtr,
44-
Logger:logger,
45-
// TODO: handle DERPMap too!
46-
DerpMapUpdateFrequency:time.Hour,
47-
DerpMapFn:func()*tailcfg.DERPMap {panic("not implemented") },
44+
CoordPtr:&coordPtr,
45+
Logger:logger,
46+
DerpMapUpdateFrequency:time.Microsecond,
47+
DerpMapFn:func()*tailcfg.DERPMap {return<-derpMapUpdates },
4848
}
4949
err:=proto.DRPCRegisterTailnet(mux,drpcService)
5050
require.NoError(t,err)
@@ -64,7 +64,7 @@ func NewClient(t testing.TB,
6464
statsChan:statsChan,
6565
coordinator:coordinator,
6666
server:server,
67-
derpMapUpdates:make(chan agentsdk.DERPMapUpdate),
67+
derpMapUpdates:derpMapUpdates,
6868
}
6969
}
7070

@@ -85,23 +85,26 @@ type Client struct {
8585
lifecycleStates []codersdk.WorkspaceAgentLifecycle
8686
startup agentsdk.PostStartupRequest
8787
logs []agentsdk.Log
88-
derpMapUpdateschan agentsdk.DERPMapUpdate
88+
derpMapUpdateschan*tailcfg.DERPMap
89+
derpMapOnce sync.Once
90+
}
91+
92+
func (c*Client)Close() {
93+
c.derpMapOnce.Do(func() {close(c.derpMapUpdates) })
8994
}
9095

9196
func (c*Client)Manifest(_ context.Context) (agentsdk.Manifest,error) {
9297
returnc.manifest,nil
9398
}
9499

95-
func (c*Client)Listen(_ context.Context) (drpc.Conn,error) {
100+
func (c*Client)Listen(ctx context.Context) (drpc.Conn,error) {
96101
conn,lis:=drpcsdk.MemTransportPipe()
97-
closed:=make(chanstruct{})
98102
c.LastWorkspaceAgent=func() {
99103
_=conn.Close()
100104
_=lis.Close()
101-
<-closed
102105
}
103106
c.t.Cleanup(c.LastWorkspaceAgent)
104-
serveCtx,cancel:=context.WithCancel(context.Background())
107+
serveCtx,cancel:=context.WithCancel(ctx)
105108
c.t.Cleanup(cancel)
106109
auth:= tailnet.AgentTunnelAuth{}
107110
streamID:= tailnet.StreamID{
@@ -112,7 +115,6 @@ func (c *Client) Listen(_ context.Context) (drpc.Conn, error) {
112115
serveCtx=tailnet.WithStreamID(serveCtx,streamID)
113116
gofunc() {
114117
_=c.server.Serve(serveCtx,lis)
115-
close(closed)
116118
}()
117119
returnconn,nil
118120
}
@@ -235,7 +237,7 @@ func (c *Client) GetServiceBanner(ctx context.Context) (codersdk.ServiceBannerCo
235237
return codersdk.ServiceBannerConfig{},nil
236238
}
237239

238-
func (c*Client)PushDERPMapUpdate(updateagentsdk.DERPMapUpdate)error {
240+
func (c*Client)PushDERPMapUpdate(update*tailcfg.DERPMap)error {
239241
timer:=time.NewTimer(testutil.WaitShort)
240242
defertimer.Stop()
241243
select {
@@ -247,14 +249,6 @@ func (c *Client) PushDERPMapUpdate(update agentsdk.DERPMapUpdate) error {
247249
returnnil
248250
}
249251

250-
func (c*Client)DERPMapUpdates(_ context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer,error) {
251-
closed:=make(chanstruct{})
252-
returnc.derpMapUpdates,closeFunc(func()error {
253-
close(closed)
254-
returnnil
255-
}),nil
256-
}
257-
258252
typecloseFuncfunc()error
259253

260254
func (ccloseFunc)Close()error {

‎coderd/tailnet_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ func setupAgent(t *testing.T, agentAddresses []netip.Prefix) (uuid.UUID, agent.A
178178
})
179179

180180
c:=agenttest.NewClient(t,logger,manifest.AgentID,manifest,make(chan*agentsdk.Stats,50),coord)
181+
t.Cleanup(c.Close)
181182

182183
options:= agent.Options{
183184
Client:c,

‎coderd/wsconncache/wsconncache_test.go

Lines changed: 23 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -171,13 +171,16 @@ func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Durati
171171
_=coordinator.Close()
172172
})
173173
manifest.AgentID=uuid.New()
174+
aC:=&client{
175+
t:t,
176+
agentID:manifest.AgentID,
177+
manifest:manifest,
178+
coordinator:coordinator,
179+
derpMapUpdates:make(chan*tailcfg.DERPMap),
180+
}
181+
t.Cleanup(aC.close)
174182
closer:=agent.New(agent.Options{
175-
Client:&client{
176-
t:t,
177-
agentID:manifest.AgentID,
178-
manifest:manifest,
179-
coordinator:coordinator,
180-
},
183+
Client:aC,
181184
Logger:logger.Named("agent"),
182185
ReconnectingPTYTimeout:ptyTimeout,
183186
Addresses: []netip.Prefix{netip.PrefixFrom(codersdk.WorkspaceAgentIP,128)},
@@ -230,52 +233,37 @@ func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Durati
230233
}
231234

232235
typeclientstruct {
233-
t*testing.T
234-
agentID uuid.UUID
235-
manifest agentsdk.Manifest
236-
coordinator tailnet.Coordinator
237-
}
238-
239-
func (c*client)Manifest(_ context.Context) (agentsdk.Manifest,error) {
240-
returnc.manifest,nil
236+
t*testing.T
237+
agentID uuid.UUID
238+
manifest agentsdk.Manifest
239+
coordinator tailnet.Coordinator
240+
closeOnce sync.Once
241+
derpMapUpdateschan*tailcfg.DERPMap
241242
}
242243

243-
typecloserstruct {
244-
closeFuncfunc()error
244+
func (c*client)close() {
245+
c.closeOnce.Do(func(){close(c.derpMapUpdates) })
245246
}
246247

247-
func (c*closer)Close()error {
248-
returnc.closeFunc()
249-
}
250-
251-
func (*client)DERPMapUpdates(_ context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer,error) {
252-
closed:=make(chanstruct{})
253-
returnmake(<-chan agentsdk.DERPMapUpdate),&closer{
254-
closeFunc:func()error {
255-
close(closed)
256-
returnnil
257-
},
258-
},nil
248+
func (c*client)Manifest(_ context.Context) (agentsdk.Manifest,error) {
249+
returnc.manifest,nil
259250
}
260251

261252
func (c*client)Listen(_ context.Context) (drpc.Conn,error) {
262253
logger:=slogtest.Make(c.t,nil).Leveled(slog.LevelDebug).Named("drpc")
263254
conn,lis:=drpcsdk.MemTransportPipe()
264-
closed:=make(chanstruct{})
265255
c.t.Cleanup(func() {
266256
_=conn.Close()
267257
_=lis.Close()
268-
<-closed
269258
})
270259
coordPtr:= atomic.Pointer[tailnet.Coordinator]{}
271260
coordPtr.Store(&c.coordinator)
272261
mux:=drpcmux.New()
273262
drpcService:=&tailnet.DRPCService{
274-
CoordPtr:&coordPtr,
275-
Logger:logger,
276-
// TODO: handle DERPMap too!
277-
DerpMapUpdateFrequency:time.Hour,
278-
DerpMapFn:func()*tailcfg.DERPMap {panic("not implemented") },
263+
CoordPtr:&coordPtr,
264+
Logger:logger,
265+
DerpMapUpdateFrequency:time.Microsecond,
266+
DerpMapFn:func()*tailcfg.DERPMap {return<-c.derpMapUpdates },
279267
}
280268
err:=proto.DRPCRegisterTailnet(mux,drpcService)
281269
iferr!=nil {
@@ -302,7 +290,6 @@ func (c *client) Listen(_ context.Context) (drpc.Conn, error) {
302290
serveCtx=tailnet.WithStreamID(serveCtx,streamID)
303291
gofunc() {
304292
server.Serve(serveCtx,lis)
305-
close(closed)
306293
}()
307294
returnconn,nil
308295
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp