Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit80e59bd

Browse files
committed
feat: use tailnet v2 API for coordination
1 parent4177202 commit80e59bd

31 files changed

+1189
-1109
lines changed

‎Makefile‎

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,8 @@ gen: \
475475
site/.eslintignore\
476476
site/e2e/provisionerGenerated.ts\
477477
site/src/theme/icons.json\
478-
examples/examples.gen.json
478+
examples/examples.gen.json\
479+
tailnet/tailnettest/coordinatormock.go
479480
.PHONY: gen
480481

481482
# Mark all generated files as fresh so make thinks they're up-to-date. This is
@@ -502,6 +503,7 @@ gen/mark-fresh:
502503
site/e2e/provisionerGenerated.ts\
503504
site/src/theme/icons.json\
504505
examples/examples.gen.json\
506+
tailnet/tailnettest/coordinatormock.go\
505507
"
506508
forfilein$$files;do
507509
echo"$$file"
@@ -529,6 +531,9 @@ coderd/database/querier.go: coderd/database/sqlc.yaml coderd/database/dump.sql $
529531
coderd/database/dbmock/dbmock.go: coderd/database/db.go coderd/database/querier.go
530532
go generate ./coderd/database/dbmock/
531533

534+
tailnet/tailnettest/coordinatormock.go: tailnet/coordinator.go
535+
go generate ./tailnet/tailnettest/
536+
532537
tailnet/proto/tailnet.pb.go: tailnet/proto/tailnet.proto
533538
protoc\
534539
--go_out=.\

‎agent/agent.go‎

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"sync"
2323
"time"
2424

25+
tailnetproto"github.com/coder/coder/v2/tailnet/proto"
26+
2527
"github.com/go-chi/chi/v5"
2628
"github.com/google/uuid"
2729
"github.com/prometheus/client_golang/prometheus"
@@ -30,6 +32,7 @@ import (
3032
"golang.org/x/exp/slices"
3133
"golang.org/x/sync/errgroup"
3234
"golang.org/x/xerrors"
35+
"storj.io/drpc"
3336
"tailscale.com/net/speedtest"
3437
"tailscale.com/tailcfg"
3538
"tailscale.com/types/netlogtype"
@@ -86,7 +89,7 @@ type Options struct {
8689

8790
typeClientinterface {
8891
Manifest(ctx context.Context) (agentsdk.Manifest,error)
89-
Listen(ctx context.Context) (net.Conn,error)
92+
Listen(ctx context.Context) (drpc.Conn,error)
9093
DERPMapUpdates(ctx context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer,error)
9194
ReportStats(ctx context.Context,log slog.Logger,statsChan<-chan*agentsdk.Stats,setIntervalfunc(time.Duration)) (io.Closer,error)
9295
PostLifecycle(ctx context.Context,state agentsdk.PostLifecycleRequest)error
@@ -1058,20 +1061,34 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
10581061
ctx,cancel:=context.WithCancel(ctx)
10591062
defercancel()
10601063

1061-
coordinator,err:=a.client.Listen(ctx)
1064+
conn,err:=a.client.Listen(ctx)
10621065
iferr!=nil {
10631066
returnerr
10641067
}
1065-
defercoordinator.Close()
1068+
deferfunc() {
1069+
cErr:=conn.Close()
1070+
ifcErr!=nil {
1071+
a.logger.Debug(ctx,"error closing drpc connection",slog.Error(err))
1072+
}
1073+
}()
1074+
1075+
tClient:=tailnetproto.NewDRPCTailnetClient(conn)
1076+
coordinate,err:=tClient.Coordinate(ctx)
1077+
iferr!=nil {
1078+
returnxerrors.Errorf("failed to connect to the coordinate endpoint: %w",err)
1079+
}
1080+
deferfunc() {
1081+
cErr:=coordinate.Close()
1082+
ifcErr!=nil {
1083+
a.logger.Debug(ctx,"error closing Coordinate client",slog.Error(err))
1084+
}
1085+
}()
10661086
a.logger.Info(ctx,"connected to coordination endpoint")
1067-
sendNodes,errChan:=tailnet.ServeCoordinator(coordinator,func(nodes []*tailnet.Node)error {
1068-
returnnetwork.UpdateNodes(nodes,false)
1069-
})
1070-
network.SetNodeCallback(sendNodes)
1087+
coordination:=tailnet.NewRemoteCoordination(a.logger,coordinate,network,uuid.Nil)
10711088
select {
10721089
case<-ctx.Done():
10731090
returnctx.Err()
1074-
caseerr:=<-errChan:
1091+
caseerr:=<-coordination.Error():
10751092
returnerr
10761093
}
10771094
}

‎agent/agent_test.go‎

Lines changed: 48 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1664,9 +1664,11 @@ func TestAgent_UpdatedDERP(t *testing.T) {
16641664
require.NotNil(t,originalDerpMap)
16651665

16661666
coordinator:=tailnet.NewCoordinator(logger)
1667-
deferfunc() {
1667+
// use t.Cleanup so the coordinator closing doesn't deadlock with in-memory
1668+
// coordination
1669+
t.Cleanup(func() {
16681670
_=coordinator.Close()
1669-
}()
1671+
})
16701672
agentID:=uuid.New()
16711673
statsCh:=make(chan*agentsdk.Stats,50)
16721674
fs:=afero.NewMemMapFs()
@@ -1681,41 +1683,42 @@ func TestAgent_UpdatedDERP(t *testing.T) {
16811683
statsCh,
16821684
coordinator,
16831685
)
1684-
closer:=agent.New(agent.Options{
1686+
uut:=agent.New(agent.Options{
16851687
Client:client,
16861688
Filesystem:fs,
16871689
Logger:logger.Named("agent"),
16881690
ReconnectingPTYTimeout:time.Minute,
16891691
})
1690-
deferfunc() {
1691-
_=closer.Close()
1692-
}()
1692+
t.Cleanup(func() {
1693+
_=uut.Close()
1694+
})
16931695

16941696
// Setup a client connection.
1695-
newClientConn:=func(derpMap*tailcfg.DERPMap)*codersdk.WorkspaceAgentConn {
1697+
newClientConn:=func(derpMap*tailcfg.DERPMap,namestring)*codersdk.WorkspaceAgentConn {
16961698
conn,err:=tailnet.NewConn(&tailnet.Options{
16971699
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(),128)},
16981700
DERPMap:derpMap,
1699-
Logger:logger.Named("client"),
1701+
Logger:logger.Named(name),
17001702
})
17011703
require.NoError(t,err)
1702-
clientConn,serverConn:=net.Pipe()
1703-
serveClientDone:=make(chanstruct{})
17041704
t.Cleanup(func() {
1705-
_=clientConn.Close()
1706-
_=serverConn.Close()
1705+
t.Logf("closing conn %s",name)
17071706
_=conn.Close()
1708-
<-serveClientDone
17091707
})
1710-
gofunc() {
1711-
deferclose(serveClientDone)
1712-
err:=coordinator.ServeClient(serverConn,uuid.New(),agentID)
1713-
assert.NoError(t,err)
1714-
}()
1715-
sendNode,_:=tailnet.ServeCoordinator(clientConn,func(nodes []*tailnet.Node)error {
1716-
returnconn.UpdateNodes(nodes,false)
1708+
testCtx,testCtxCancel:=context.WithCancel(context.Background())
1709+
t.Cleanup(testCtxCancel)
1710+
clientID:=uuid.New()
1711+
coordination:=tailnet.NewInMemoryCoordination(
1712+
testCtx,logger,
1713+
clientID,agentID,
1714+
coordinator,conn)
1715+
t.Cleanup(func() {
1716+
t.Logf("closing coordination %s",name)
1717+
err:=coordination.Close()
1718+
iferr!=nil {
1719+
t.Logf("error closing in-memory coordination: %s",err.Error())
1720+
}
17171721
})
1718-
conn.SetNodeCallback(sendNode)
17191722
// Force DERP.
17201723
conn.SetBlockEndpoints(true)
17211724

@@ -1724,6 +1727,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17241727
CloseFunc:func()error {returncodersdk.ErrSkipClose },
17251728
})
17261729
t.Cleanup(func() {
1730+
t.Logf("closing sdkConn %s",name)
17271731
_=sdkConn.Close()
17281732
})
17291733
ctx,cancel:=context.WithTimeout(context.Background(),testutil.WaitLong)
@@ -1734,7 +1738,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17341738

17351739
returnsdkConn
17361740
}
1737-
conn1:=newClientConn(originalDerpMap)
1741+
conn1:=newClientConn(originalDerpMap,"client1")
17381742

17391743
// Change the DERP map.
17401744
newDerpMap,_:=tailnettest.RunDERPAndSTUN(t)
@@ -1753,27 +1757,34 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17531757
DERPMap:newDerpMap,
17541758
})
17551759
require.NoError(t,err)
1760+
t.Logf("client Pushed DERPMap update")
17561761

17571762
require.Eventually(t,func()bool {
1758-
conn:=closer.TailnetConn()
1763+
conn:=uut.TailnetConn()
17591764
ifconn==nil {
17601765
returnfalse
17611766
}
17621767
regionIDs:=conn.DERPMap().RegionIDs()
1763-
returnlen(regionIDs)==1&&regionIDs[0]==2&&conn.Node().PreferredDERP==2
1768+
preferredDERP:=conn.Node().PreferredDERP
1769+
t.Logf("agent Conn DERPMap with regionIDs %v, PreferredDERP %d",regionIDs,preferredDERP)
1770+
returnlen(regionIDs)==1&&regionIDs[0]==2&&preferredDERP==2
17641771
},testutil.WaitLong,testutil.IntervalFast)
1772+
t.Logf("agent got the new DERPMap")
17651773

17661774
// Connect from a second client and make sure it uses the new DERP map.
1767-
conn2:=newClientConn(newDerpMap)
1775+
conn2:=newClientConn(newDerpMap,"client2")
17681776
require.Equal(t, []int{2},conn2.DERPMap().RegionIDs())
1777+
t.Log("conn2 got the new DERPMap")
17691778

17701779
// If the first client gets a DERP map update, it should be able to
17711780
// reconnect just fine.
17721781
conn1.SetDERPMap(newDerpMap)
17731782
require.Equal(t, []int{2},conn1.DERPMap().RegionIDs())
1783+
t.Log("set the new DERPMap on conn1")
17741784
ctx,cancel:=context.WithTimeout(context.Background(),testutil.WaitLong)
17751785
defercancel()
17761786
require.True(t,conn1.AwaitReachable(ctx))
1787+
t.Log("conn1 reached agent with new DERP")
17771788
}
17781789

17791790
funcTestAgent_Speedtest(t*testing.T) {
@@ -2050,22 +2061,22 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
20502061
Logger:logger.Named("client"),
20512062
})
20522063
require.NoError(t,err)
2053-
clientConn,serverConn:=net.Pipe()
2054-
serveClientDone:=make(chanstruct{})
20552064
t.Cleanup(func() {
2056-
_=clientConn.Close()
2057-
_=serverConn.Close()
20582065
_=conn.Close()
2059-
<-serveClientDone
20602066
})
2061-
gofunc() {
2062-
deferclose(serveClientDone)
2063-
coordinator.ServeClient(serverConn,uuid.New(),metadata.AgentID)
2064-
}()
2065-
sendNode,_:=tailnet.ServeCoordinator(clientConn,func(nodes []*tailnet.Node)error {
2066-
returnconn.UpdateNodes(nodes,false)
2067+
testCtx,testCtxCancel:=context.WithCancel(context.Background())
2068+
t.Cleanup(testCtxCancel)
2069+
clientID:=uuid.New()
2070+
coordination:=tailnet.NewInMemoryCoordination(
2071+
testCtx,logger,
2072+
clientID,metadata.AgentID,
2073+
coordinator,conn)
2074+
t.Cleanup(func() {
2075+
err:=coordination.Close()
2076+
iferr!=nil {
2077+
t.Logf("error closing in-mem coordination: %s",err.Error())
2078+
}
20672079
})
2068-
conn.SetNodeCallback(sendNode)
20692080
agentConn:=codersdk.NewWorkspaceAgentConn(conn, codersdk.WorkspaceAgentConnOptions{
20702081
AgentID:metadata.AgentID,
20712082
})

‎agent/agenttest/client.go‎

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,28 @@ package agenttest
33
import (
44
"context"
55
"io"
6-
"net"
76
"sync"
7+
"sync/atomic"
88
"testing"
99
"time"
1010

11+
"github.com/stretchr/testify/require"
12+
13+
"storj.io/drpc"
14+
"storj.io/drpc/drpcmux"
15+
"storj.io/drpc/drpcserver"
16+
"tailscale.com/tailcfg"
17+
18+
"github.com/coder/coder/v2/tailnet/proto"
19+
1120
"github.com/google/uuid"
1221
"golang.org/x/exp/maps"
1322
"golang.org/x/xerrors"
1423

1524
"cdr.dev/slog"
1625
"github.com/coder/coder/v2/codersdk"
1726
"github.com/coder/coder/v2/codersdk/agentsdk"
27+
drpcsdk"github.com/coder/coder/v2/codersdk/drpc"
1828
"github.com/coder/coder/v2/tailnet"
1929
"github.com/coder/coder/v2/testutil"
2030
)
@@ -24,18 +34,39 @@ func NewClient(t testing.TB,
2434
agentID uuid.UUID,
2535
manifest agentsdk.Manifest,
2636
statsChanchan*agentsdk.Stats,
27-
coordinator tailnet.CoordinatorV1,
37+
coordinator tailnet.Coordinator,
2838
)*Client {
2939
ifmanifest.AgentID==uuid.Nil {
3040
manifest.AgentID=agentID
3141
}
42+
coordPtr:= atomic.Pointer[tailnet.Coordinator]{}
43+
coordPtr.Store(&coordinator)
44+
mux:=drpcmux.New()
45+
drpcService:=&tailnet.DRPCService{
46+
CoordPtr:&coordPtr,
47+
Logger:logger,
48+
// TODO: handle DERPMap too!
49+
DerpMapUpdateFrequency:time.Hour,
50+
DerpMapFn:func()*tailcfg.DERPMap {panic("not implemented") },
51+
}
52+
err:=proto.DRPCRegisterTailnet(mux,drpcService)
53+
require.NoError(t,err)
54+
server:=drpcserver.NewWithOptions(mux, drpcserver.Options{
55+
Log:func(errerror) {
56+
ifxerrors.Is(err,io.EOF) {
57+
return
58+
}
59+
logger.Debug(context.Background(),"drpc server error",slog.Error(err))
60+
},
61+
})
3262
return&Client{
3363
t:t,
3464
logger:logger.Named("client"),
3565
agentID:agentID,
3666
manifest:manifest,
3767
statsChan:statsChan,
3868
coordinator:coordinator,
69+
server:server,
3970
derpMapUpdates:make(chan agentsdk.DERPMapUpdate),
4071
}
4172
}
@@ -47,7 +78,8 @@ type Client struct {
4778
manifest agentsdk.Manifest
4879
metadatamap[string]agentsdk.Metadata
4980
statsChanchan*agentsdk.Stats
50-
coordinator tailnet.CoordinatorV1
81+
coordinator tailnet.Coordinator
82+
server*drpcserver.Server
5183
LastWorkspaceAgentfunc()
5284
PatchWorkspaceLogsfunc()error
5385
GetServiceBannerFuncfunc() (codersdk.ServiceBannerConfig,error)
@@ -63,20 +95,29 @@ func (c *Client) Manifest(_ context.Context) (agentsdk.Manifest, error) {
6395
returnc.manifest,nil
6496
}
6597

66-
func (c*Client)Listen(_ context.Context) (net.Conn,error) {
67-
clientConn,serverConn:=net.Pipe()
98+
func (c*Client)Listen(_ context.Context) (drpc.Conn,error) {
99+
conn,lis:=drpcsdk.MemTransportPipe()
68100
closed:=make(chanstruct{})
69101
c.LastWorkspaceAgent=func() {
70-
_=serverConn.Close()
71-
_=clientConn.Close()
102+
_=conn.Close()
103+
_=lis.Close()
72104
<-closed
73105
}
74106
c.t.Cleanup(c.LastWorkspaceAgent)
107+
serveCtx,cancel:=context.WithCancel(context.Background())
108+
c.t.Cleanup(cancel)
109+
auth:= tailnet.AgentTunnelAuth{}
110+
streamID:= tailnet.StreamID{
111+
Name:"agenttest",
112+
ID:c.agentID,
113+
Auth:auth,
114+
}
115+
serveCtx=tailnet.WithStreamID(serveCtx,streamID)
75116
gofunc() {
76-
_=c.coordinator.ServeAgent(serverConn,c.agentID,"")
117+
_=c.server.Serve(serveCtx,lis)
77118
close(closed)
78119
}()
79-
returnclientConn,nil
120+
returnconn,nil
80121
}
81122

82123
func (c*Client)ReportStats(ctx context.Context,_ slog.Logger,statsChan<-chan*agentsdk.Stats,setIntervalfunc(time.Duration)) (io.Closer,error) {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp