Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2f11961

Browse files
committed
feat: use tailnet v2 API for coordination
1 parent64caaac commit2f11961

30 files changed

+1170
-1107
lines changed

‎Makefile‎

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,8 @@ gen: \
475475
site/.eslintignore\
476476
site/e2e/provisionerGenerated.ts\
477477
site/src/theme/icons.json\
478-
examples/examples.gen.json
478+
examples/examples.gen.json\
479+
tailnet/tailnettest/coordinatormock.go
479480
.PHONY: gen
480481

481482
# Mark all generated files as fresh so make thinks they're up-to-date. This is
@@ -502,6 +503,7 @@ gen/mark-fresh:
502503
site/e2e/provisionerGenerated.ts\
503504
site/src/theme/icons.json\
504505
examples/examples.gen.json\
506+
tailnet/tailnettest/coordinatormock.go\
505507
"
506508
forfilein$$files;do
507509
echo"$$file"
@@ -529,6 +531,9 @@ coderd/database/querier.go: coderd/database/sqlc.yaml coderd/database/dump.sql $
529531
coderd/database/dbmock/dbmock.go: coderd/database/db.go coderd/database/querier.go
530532
go generate ./coderd/database/dbmock/
531533

534+
tailnet/tailnettest/coordinatormock.go: tailnet/coordinator.go
535+
go generate ./tailnet/tailnettest/
536+
532537
tailnet/proto/tailnet.pb.go: tailnet/proto/tailnet.proto
533538
protoc\
534539
--go_out=.\

‎agent/agent.go‎

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"golang.org/x/exp/slices"
3131
"golang.org/x/sync/errgroup"
3232
"golang.org/x/xerrors"
33+
"storj.io/drpc"
3334
"tailscale.com/net/speedtest"
3435
"tailscale.com/tailcfg"
3536
"tailscale.com/types/netlogtype"
@@ -47,6 +48,7 @@ import (
4748
"github.com/coder/coder/v2/codersdk"
4849
"github.com/coder/coder/v2/codersdk/agentsdk"
4950
"github.com/coder/coder/v2/tailnet"
51+
tailnetproto"github.com/coder/coder/v2/tailnet/proto"
5052
)
5153

5254
const (
@@ -86,7 +88,7 @@ type Options struct {
8688

8789
typeClientinterface {
8890
Manifest(ctx context.Context) (agentsdk.Manifest,error)
89-
Listen(ctx context.Context) (net.Conn,error)
91+
Listen(ctx context.Context) (drpc.Conn,error)
9092
DERPMapUpdates(ctx context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer,error)
9193
ReportStats(ctx context.Context,log slog.Logger,statsChan<-chan*agentsdk.Stats,setIntervalfunc(time.Duration)) (io.Closer,error)
9294
PostLifecycle(ctx context.Context,state agentsdk.PostLifecycleRequest)error
@@ -1058,20 +1060,34 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
10581060
ctx,cancel:=context.WithCancel(ctx)
10591061
defercancel()
10601062

1061-
coordinator,err:=a.client.Listen(ctx)
1063+
conn,err:=a.client.Listen(ctx)
10621064
iferr!=nil {
10631065
returnerr
10641066
}
1065-
defercoordinator.Close()
1067+
deferfunc() {
1068+
cErr:=conn.Close()
1069+
ifcErr!=nil {
1070+
a.logger.Debug(ctx,"error closing drpc connection",slog.Error(err))
1071+
}
1072+
}()
1073+
1074+
tClient:=tailnetproto.NewDRPCTailnetClient(conn)
1075+
coordinate,err:=tClient.Coordinate(ctx)
1076+
iferr!=nil {
1077+
returnxerrors.Errorf("failed to connect to the coordinate endpoint: %w",err)
1078+
}
1079+
deferfunc() {
1080+
cErr:=coordinate.Close()
1081+
ifcErr!=nil {
1082+
a.logger.Debug(ctx,"error closing Coordinate client",slog.Error(err))
1083+
}
1084+
}()
10661085
a.logger.Info(ctx,"connected to coordination endpoint")
1067-
sendNodes,errChan:=tailnet.ServeCoordinator(coordinator,func(nodes []*tailnet.Node)error {
1068-
returnnetwork.UpdateNodes(nodes,false)
1069-
})
1070-
network.SetNodeCallback(sendNodes)
1086+
coordination:=tailnet.NewRemoteCoordination(a.logger,coordinate,network,uuid.Nil)
10711087
select {
10721088
case<-ctx.Done():
10731089
returnctx.Err()
1074-
caseerr:=<-errChan:
1090+
caseerr:=<-coordination.Error():
10751091
returnerr
10761092
}
10771093
}

‎agent/agent_test.go‎

Lines changed: 48 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1677,9 +1677,11 @@ func TestAgent_UpdatedDERP(t *testing.T) {
16771677
require.NotNil(t,originalDerpMap)
16781678

16791679
coordinator:=tailnet.NewCoordinator(logger)
1680-
deferfunc() {
1680+
// use t.Cleanup so the coordinator closing doesn't deadlock with in-memory
1681+
// coordination
1682+
t.Cleanup(func() {
16811683
_=coordinator.Close()
1682-
}()
1684+
})
16831685
agentID:=uuid.New()
16841686
statsCh:=make(chan*agentsdk.Stats,50)
16851687
fs:=afero.NewMemMapFs()
@@ -1694,41 +1696,42 @@ func TestAgent_UpdatedDERP(t *testing.T) {
16941696
statsCh,
16951697
coordinator,
16961698
)
1697-
closer:=agent.New(agent.Options{
1699+
uut:=agent.New(agent.Options{
16981700
Client:client,
16991701
Filesystem:fs,
17001702
Logger:logger.Named("agent"),
17011703
ReconnectingPTYTimeout:time.Minute,
17021704
})
1703-
deferfunc() {
1704-
_=closer.Close()
1705-
}()
1705+
t.Cleanup(func() {
1706+
_=uut.Close()
1707+
})
17061708

17071709
// Setup a client connection.
1708-
newClientConn:=func(derpMap*tailcfg.DERPMap)*codersdk.WorkspaceAgentConn {
1710+
newClientConn:=func(derpMap*tailcfg.DERPMap,namestring)*codersdk.WorkspaceAgentConn {
17091711
conn,err:=tailnet.NewConn(&tailnet.Options{
17101712
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(),128)},
17111713
DERPMap:derpMap,
1712-
Logger:logger.Named("client"),
1714+
Logger:logger.Named(name),
17131715
})
17141716
require.NoError(t,err)
1715-
clientConn,serverConn:=net.Pipe()
1716-
serveClientDone:=make(chanstruct{})
17171717
t.Cleanup(func() {
1718-
_=clientConn.Close()
1719-
_=serverConn.Close()
1718+
t.Logf("closing conn %s",name)
17201719
_=conn.Close()
1721-
<-serveClientDone
17221720
})
1723-
gofunc() {
1724-
deferclose(serveClientDone)
1725-
err:=coordinator.ServeClient(serverConn,uuid.New(),agentID)
1726-
assert.NoError(t,err)
1727-
}()
1728-
sendNode,_:=tailnet.ServeCoordinator(clientConn,func(nodes []*tailnet.Node)error {
1729-
returnconn.UpdateNodes(nodes,false)
1721+
testCtx,testCtxCancel:=context.WithCancel(context.Background())
1722+
t.Cleanup(testCtxCancel)
1723+
clientID:=uuid.New()
1724+
coordination:=tailnet.NewInMemoryCoordination(
1725+
testCtx,logger,
1726+
clientID,agentID,
1727+
coordinator,conn)
1728+
t.Cleanup(func() {
1729+
t.Logf("closing coordination %s",name)
1730+
err:=coordination.Close()
1731+
iferr!=nil {
1732+
t.Logf("error closing in-memory coordination: %s",err.Error())
1733+
}
17301734
})
1731-
conn.SetNodeCallback(sendNode)
17321735
// Force DERP.
17331736
conn.SetBlockEndpoints(true)
17341737

@@ -1737,6 +1740,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17371740
CloseFunc:func()error {returncodersdk.ErrSkipClose },
17381741
})
17391742
t.Cleanup(func() {
1743+
t.Logf("closing sdkConn %s",name)
17401744
_=sdkConn.Close()
17411745
})
17421746
ctx,cancel:=context.WithTimeout(context.Background(),testutil.WaitLong)
@@ -1747,7 +1751,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17471751

17481752
returnsdkConn
17491753
}
1750-
conn1:=newClientConn(originalDerpMap)
1754+
conn1:=newClientConn(originalDerpMap,"client1")
17511755

17521756
// Change the DERP map.
17531757
newDerpMap,_:=tailnettest.RunDERPAndSTUN(t)
@@ -1766,27 +1770,34 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17661770
DERPMap:newDerpMap,
17671771
})
17681772
require.NoError(t,err)
1773+
t.Logf("client Pushed DERPMap update")
17691774

17701775
require.Eventually(t,func()bool {
1771-
conn:=closer.TailnetConn()
1776+
conn:=uut.TailnetConn()
17721777
ifconn==nil {
17731778
returnfalse
17741779
}
17751780
regionIDs:=conn.DERPMap().RegionIDs()
1776-
returnlen(regionIDs)==1&&regionIDs[0]==2&&conn.Node().PreferredDERP==2
1781+
preferredDERP:=conn.Node().PreferredDERP
1782+
t.Logf("agent Conn DERPMap with regionIDs %v, PreferredDERP %d",regionIDs,preferredDERP)
1783+
returnlen(regionIDs)==1&&regionIDs[0]==2&&preferredDERP==2
17771784
},testutil.WaitLong,testutil.IntervalFast)
1785+
t.Logf("agent got the new DERPMap")
17781786

17791787
// Connect from a second client and make sure it uses the new DERP map.
1780-
conn2:=newClientConn(newDerpMap)
1788+
conn2:=newClientConn(newDerpMap,"client2")
17811789
require.Equal(t, []int{2},conn2.DERPMap().RegionIDs())
1790+
t.Log("conn2 got the new DERPMap")
17821791

17831792
// If the first client gets a DERP map update, it should be able to
17841793
// reconnect just fine.
17851794
conn1.SetDERPMap(newDerpMap)
17861795
require.Equal(t, []int{2},conn1.DERPMap().RegionIDs())
1796+
t.Log("set the new DERPMap on conn1")
17871797
ctx,cancel:=context.WithTimeout(context.Background(),testutil.WaitLong)
17881798
defercancel()
17891799
require.True(t,conn1.AwaitReachable(ctx))
1800+
t.Log("conn1 reached agent with new DERP")
17901801
}
17911802

17921803
funcTestAgent_Speedtest(t*testing.T) {
@@ -2063,22 +2074,22 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
20632074
Logger:logger.Named("client"),
20642075
})
20652076
require.NoError(t,err)
2066-
clientConn,serverConn:=net.Pipe()
2067-
serveClientDone:=make(chanstruct{})
20682077
t.Cleanup(func() {
2069-
_=clientConn.Close()
2070-
_=serverConn.Close()
20712078
_=conn.Close()
2072-
<-serveClientDone
20732079
})
2074-
gofunc() {
2075-
deferclose(serveClientDone)
2076-
coordinator.ServeClient(serverConn,uuid.New(),metadata.AgentID)
2077-
}()
2078-
sendNode,_:=tailnet.ServeCoordinator(clientConn,func(nodes []*tailnet.Node)error {
2079-
returnconn.UpdateNodes(nodes,false)
2080+
testCtx,testCtxCancel:=context.WithCancel(context.Background())
2081+
t.Cleanup(testCtxCancel)
2082+
clientID:=uuid.New()
2083+
coordination:=tailnet.NewInMemoryCoordination(
2084+
testCtx,logger,
2085+
clientID,metadata.AgentID,
2086+
coordinator,conn)
2087+
t.Cleanup(func() {
2088+
err:=coordination.Close()
2089+
iferr!=nil {
2090+
t.Logf("error closing in-mem coordination: %s",err.Error())
2091+
}
20802092
})
2081-
conn.SetNodeCallback(sendNode)
20822093
agentConn:=codersdk.NewWorkspaceAgentConn(conn, codersdk.WorkspaceAgentConnOptions{
20832094
AgentID:metadata.AgentID,
20842095
})

‎agent/agenttest/client.go‎

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,26 @@ package agenttest
33
import (
44
"context"
55
"io"
6-
"net"
76
"sync"
7+
"sync/atomic"
88
"testing"
99
"time"
1010

1111
"github.com/google/uuid"
12+
"github.com/stretchr/testify/require"
1213
"golang.org/x/exp/maps"
1314
"golang.org/x/xerrors"
15+
"storj.io/drpc"
16+
"storj.io/drpc/drpcmux"
17+
"storj.io/drpc/drpcserver"
18+
"tailscale.com/tailcfg"
1419

1520
"cdr.dev/slog"
1621
"github.com/coder/coder/v2/codersdk"
1722
"github.com/coder/coder/v2/codersdk/agentsdk"
23+
drpcsdk"github.com/coder/coder/v2/codersdk/drpc"
1824
"github.com/coder/coder/v2/tailnet"
25+
"github.com/coder/coder/v2/tailnet/proto"
1926
"github.com/coder/coder/v2/testutil"
2027
)
2128

@@ -24,18 +31,39 @@ func NewClient(t testing.TB,
2431
agentID uuid.UUID,
2532
manifest agentsdk.Manifest,
2633
statsChanchan*agentsdk.Stats,
27-
coordinator tailnet.CoordinatorV1,
34+
coordinator tailnet.Coordinator,
2835
)*Client {
2936
ifmanifest.AgentID==uuid.Nil {
3037
manifest.AgentID=agentID
3138
}
39+
coordPtr:= atomic.Pointer[tailnet.Coordinator]{}
40+
coordPtr.Store(&coordinator)
41+
mux:=drpcmux.New()
42+
drpcService:=&tailnet.DRPCService{
43+
CoordPtr:&coordPtr,
44+
Logger:logger,
45+
// TODO: handle DERPMap too!
46+
DerpMapUpdateFrequency:time.Hour,
47+
DerpMapFn:func()*tailcfg.DERPMap {panic("not implemented") },
48+
}
49+
err:=proto.DRPCRegisterTailnet(mux,drpcService)
50+
require.NoError(t,err)
51+
server:=drpcserver.NewWithOptions(mux, drpcserver.Options{
52+
Log:func(errerror) {
53+
ifxerrors.Is(err,io.EOF) {
54+
return
55+
}
56+
logger.Debug(context.Background(),"drpc server error",slog.Error(err))
57+
},
58+
})
3259
return&Client{
3360
t:t,
3461
logger:logger.Named("client"),
3562
agentID:agentID,
3663
manifest:manifest,
3764
statsChan:statsChan,
3865
coordinator:coordinator,
66+
server:server,
3967
derpMapUpdates:make(chan agentsdk.DERPMapUpdate),
4068
}
4169
}
@@ -47,7 +75,8 @@ type Client struct {
4775
manifest agentsdk.Manifest
4876
metadatamap[string]agentsdk.Metadata
4977
statsChanchan*agentsdk.Stats
50-
coordinator tailnet.CoordinatorV1
78+
coordinator tailnet.Coordinator
79+
server*drpcserver.Server
5180
LastWorkspaceAgentfunc()
5281
PatchWorkspaceLogsfunc()error
5382
GetServiceBannerFuncfunc() (codersdk.ServiceBannerConfig,error)
@@ -63,20 +92,29 @@ func (c *Client) Manifest(_ context.Context) (agentsdk.Manifest, error) {
6392
returnc.manifest,nil
6493
}
6594

66-
func (c*Client)Listen(_ context.Context) (net.Conn,error) {
67-
clientConn,serverConn:=net.Pipe()
95+
func (c*Client)Listen(_ context.Context) (drpc.Conn,error) {
96+
conn,lis:=drpcsdk.MemTransportPipe()
6897
closed:=make(chanstruct{})
6998
c.LastWorkspaceAgent=func() {
70-
_=serverConn.Close()
71-
_=clientConn.Close()
99+
_=conn.Close()
100+
_=lis.Close()
72101
<-closed
73102
}
74103
c.t.Cleanup(c.LastWorkspaceAgent)
104+
serveCtx,cancel:=context.WithCancel(context.Background())
105+
c.t.Cleanup(cancel)
106+
auth:= tailnet.AgentTunnelAuth{}
107+
streamID:= tailnet.StreamID{
108+
Name:"agenttest",
109+
ID:c.agentID,
110+
Auth:auth,
111+
}
112+
serveCtx=tailnet.WithStreamID(serveCtx,streamID)
75113
gofunc() {
76-
_=c.coordinator.ServeAgent(serverConn,c.agentID,"")
114+
_=c.server.Serve(serveCtx,lis)
77115
close(closed)
78116
}()
79-
returnclientConn,nil
117+
returnconn,nil
80118
}
81119

82120
func (c*Client)ReportStats(ctx context.Context,_ slog.Logger,statsChan<-chan*agentsdk.Stats,setIntervalfunc(time.Duration)) (io.Closer,error) {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp