Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2f0a999

Browse files
authored
chore: add derpserver to wsproxy, add proxies to derpmap (#7311)
1 parent70692c2 commit2f0a999

File tree

58 files changed

+3001
-386
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+3001
-386
lines changed

‎agent/agent.go

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/spf13/afero"
2828
"go.uber.org/atomic"
2929
"golang.org/x/exp/slices"
30+
"golang.org/x/sync/errgroup"
3031
"golang.org/x/xerrors"
3132
"tailscale.com/net/speedtest"
3233
"tailscale.com/tailcfg"
@@ -72,6 +73,7 @@ type Options struct {
7273
typeClientinterface {
7374
Manifest(ctx context.Context) (agentsdk.Manifest,error)
7475
Listen(ctx context.Context) (net.Conn,error)
76+
DERPMapUpdates(ctx context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer,error)
7577
ReportStats(ctx context.Context,log slog.Logger,statsChan<-chan*agentsdk.Stats,setIntervalfunc(time.Duration)) (io.Closer,error)
7678
PostLifecycle(ctx context.Context,state agentsdk.PostLifecycleRequest)error
7779
PostAppHealth(ctx context.Context,req agentsdk.PostAppHealthsRequest)error
@@ -699,12 +701,26 @@ func (a *agent) run(ctx context.Context) error {
699701
network.SetBlockEndpoints(manifest.DisableDirectConnections)
700702
}
701703

702-
a.logger.Debug(ctx,"running tailnet connection coordinator")
703-
err=a.runCoordinator(ctx,network)
704-
iferr!=nil {
705-
returnxerrors.Errorf("run coordinator: %w",err)
706-
}
707-
returnnil
704+
eg,egCtx:=errgroup.WithContext(ctx)
705+
eg.Go(func()error {
706+
a.logger.Debug(egCtx,"running tailnet connection coordinator")
707+
err:=a.runCoordinator(egCtx,network)
708+
iferr!=nil {
709+
returnxerrors.Errorf("run coordinator: %w",err)
710+
}
711+
returnnil
712+
})
713+
714+
eg.Go(func()error {
715+
a.logger.Debug(egCtx,"running derp map subscriber")
716+
err:=a.runDERPMapSubscriber(egCtx,network)
717+
iferr!=nil {
718+
returnxerrors.Errorf("run derp map subscriber: %w",err)
719+
}
720+
returnnil
721+
})
722+
723+
returneg.Wait()
708724
}
709725

710726
func (a*agent)wireguardAddresses(agentID uuid.UUID) []netip.Prefix {
@@ -927,6 +943,34 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
927943
}
928944
}
929945

946+
// runDERPMapSubscriber runs a coordinator and returns if a reconnect should occur.
947+
func (a*agent)runDERPMapSubscriber(ctx context.Context,network*tailnet.Conn)error {
948+
ctx,cancel:=context.WithCancel(ctx)
949+
defercancel()
950+
951+
updates,closer,err:=a.client.DERPMapUpdates(ctx)
952+
iferr!=nil {
953+
returnerr
954+
}
955+
defercloser.Close()
956+
957+
a.logger.Info(ctx,"connected to derp map endpoint")
958+
for {
959+
select {
960+
case<-ctx.Done():
961+
returnctx.Err()
962+
caseupdate:=<-updates:
963+
ifupdate.Err!=nil {
964+
returnupdate.Err
965+
}
966+
ifupdate.DERPMap!=nil&&!tailnet.CompareDERPMaps(network.DERPMap(),update.DERPMap) {
967+
a.logger.Info(ctx,"updating derp map due to detected changes")
968+
network.SetDERPMap(update.DERPMap)
969+
}
970+
}
971+
}
972+
}
973+
930974
func (a*agent)runStartupScript(ctx context.Context,scriptstring)error {
931975
returna.runScript(ctx,"startup",script)
932976
}

‎agent/agent_test.go

Lines changed: 116 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1717,6 +1717,120 @@ func TestAgent_Dial(t *testing.T) {
17171717
}
17181718
}
17191719

1720+
// TestAgent_UpdatedDERP checks that agents can handle their DERP map being
1721+
// updated, and that clients can also handle it.
1722+
funcTestAgent_UpdatedDERP(t*testing.T) {
1723+
t.Parallel()
1724+
1725+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
1726+
1727+
originalDerpMap,_:=tailnettest.RunDERPAndSTUN(t)
1728+
require.NotNil(t,originalDerpMap)
1729+
1730+
coordinator:=tailnet.NewCoordinator(logger)
1731+
deferfunc() {
1732+
_=coordinator.Close()
1733+
}()
1734+
agentID:=uuid.New()
1735+
statsCh:=make(chan*agentsdk.Stats,50)
1736+
fs:=afero.NewMemMapFs()
1737+
client:=agenttest.NewClient(t,
1738+
logger.Named("agent"),
1739+
agentID,
1740+
agentsdk.Manifest{
1741+
DERPMap:originalDerpMap,
1742+
// Force DERP.
1743+
DisableDirectConnections:true,
1744+
},
1745+
statsCh,
1746+
coordinator,
1747+
)
1748+
closer:=agent.New(agent.Options{
1749+
Client:client,
1750+
Filesystem:fs,
1751+
Logger:logger.Named("agent"),
1752+
ReconnectingPTYTimeout:time.Minute,
1753+
})
1754+
deferfunc() {
1755+
_=closer.Close()
1756+
}()
1757+
1758+
// Setup a client connection.
1759+
newClientConn:=func(derpMap*tailcfg.DERPMap)*codersdk.WorkspaceAgentConn {
1760+
conn,err:=tailnet.NewConn(&tailnet.Options{
1761+
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(),128)},
1762+
DERPMap:derpMap,
1763+
Logger:logger.Named("client"),
1764+
})
1765+
require.NoError(t,err)
1766+
clientConn,serverConn:=net.Pipe()
1767+
serveClientDone:=make(chanstruct{})
1768+
t.Cleanup(func() {
1769+
_=clientConn.Close()
1770+
_=serverConn.Close()
1771+
_=conn.Close()
1772+
<-serveClientDone
1773+
})
1774+
gofunc() {
1775+
deferclose(serveClientDone)
1776+
err:=coordinator.ServeClient(serverConn,uuid.New(),agentID)
1777+
assert.NoError(t,err)
1778+
}()
1779+
sendNode,_:=tailnet.ServeCoordinator(clientConn,func(nodes []*tailnet.Node)error {
1780+
returnconn.UpdateNodes(nodes,false)
1781+
})
1782+
conn.SetNodeCallback(sendNode)
1783+
// Force DERP.
1784+
conn.SetBlockEndpoints(true)
1785+
1786+
sdkConn:=codersdk.NewWorkspaceAgentConn(conn, codersdk.WorkspaceAgentConnOptions{
1787+
AgentID:agentID,
1788+
CloseFunc:func()error {returncodersdk.ErrSkipClose },
1789+
})
1790+
t.Cleanup(func() {
1791+
_=sdkConn.Close()
1792+
})
1793+
ctx,cancel:=context.WithTimeout(context.Background(),testutil.WaitLong)
1794+
defercancel()
1795+
if!sdkConn.AwaitReachable(ctx) {
1796+
t.Fatal("agent not reachable")
1797+
}
1798+
1799+
returnsdkConn
1800+
}
1801+
conn1:=newClientConn(originalDerpMap)
1802+
1803+
// Change the DERP map.
1804+
newDerpMap,_:=tailnettest.RunDERPAndSTUN(t)
1805+
require.NotNil(t,newDerpMap)
1806+
1807+
// Change the region ID.
1808+
newDerpMap.Regions[2]=newDerpMap.Regions[1]
1809+
delete(newDerpMap.Regions,1)
1810+
newDerpMap.Regions[2].RegionID=2
1811+
for_,node:=rangenewDerpMap.Regions[2].Nodes {
1812+
node.RegionID=2
1813+
}
1814+
1815+
// Push a new DERP map to the agent.
1816+
err:=client.PushDERPMapUpdate(agentsdk.DERPMapUpdate{
1817+
DERPMap:newDerpMap,
1818+
})
1819+
require.NoError(t,err)
1820+
1821+
// Connect from a second client and make sure it uses the new DERP map.
1822+
conn2:=newClientConn(newDerpMap)
1823+
require.Equal(t, []int{2},conn2.DERPMap().RegionIDs())
1824+
1825+
// If the first client gets a DERP map update, it should be able to
1826+
// reconnect just fine.
1827+
conn1.SetDERPMap(newDerpMap)
1828+
require.Equal(t, []int{2},conn1.DERPMap().RegionIDs())
1829+
ctx,cancel:=context.WithTimeout(context.Background(),testutil.WaitLong)
1830+
defercancel()
1831+
require.True(t,conn1.AwaitReachable(ctx))
1832+
}
1833+
17201834
funcTestAgent_Speedtest(t*testing.T) {
17211835
t.Parallel()
17221836
t.Skip("This test is relatively flakey because of Tailscale's speedtest code...")
@@ -1940,8 +2054,8 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
19402054
deferclose(serveClientDone)
19412055
coordinator.ServeClient(serverConn,uuid.New(),metadata.AgentID)
19422056
}()
1943-
sendNode,_:=tailnet.ServeCoordinator(clientConn,func(node []*tailnet.Node)error {
1944-
returnconn.UpdateNodes(node,false)
2057+
sendNode,_:=tailnet.ServeCoordinator(clientConn,func(nodes []*tailnet.Node)error {
2058+
returnconn.UpdateNodes(nodes,false)
19452059
})
19462060
conn.SetNodeCallback(sendNode)
19472061
agentConn:=codersdk.NewWorkspaceAgentConn(conn, codersdk.WorkspaceAgentConnOptions{

‎agent/agenttest/client.go

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ import (
1010

1111
"github.com/google/uuid"
1212
"golang.org/x/exp/maps"
13+
"golang.org/x/xerrors"
1314

1415
"cdr.dev/slog"
1516
"github.com/coder/coder/codersdk"
1617
"github.com/coder/coder/codersdk/agentsdk"
1718
"github.com/coder/coder/tailnet"
19+
"github.com/coder/coder/testutil"
1820
)
1921

2022
funcNewClient(t testing.TB,
@@ -28,12 +30,13 @@ func NewClient(t testing.TB,
2830
manifest.AgentID=agentID
2931
}
3032
return&Client{
31-
t:t,
32-
logger:logger.Named("client"),
33-
agentID:agentID,
34-
manifest:manifest,
35-
statsChan:statsChan,
36-
coordinator:coordinator,
33+
t:t,
34+
logger:logger.Named("client"),
35+
agentID:agentID,
36+
manifest:manifest,
37+
statsChan:statsChan,
38+
coordinator:coordinator,
39+
derpMapUpdates:make(chan agentsdk.DERPMapUpdate),
3740
}
3841
}
3942

@@ -53,6 +56,7 @@ type Client struct {
5356
lifecycleStates []codersdk.WorkspaceAgentLifecycle
5457
startup agentsdk.PostStartupRequest
5558
logs []agentsdk.StartupLog
59+
derpMapUpdateschan agentsdk.DERPMapUpdate
5660
}
5761

5862
func (c*Client)Manifest(_ context.Context) (agentsdk.Manifest,error) {
@@ -191,6 +195,26 @@ func (c *Client) GetServiceBanner(ctx context.Context) (codersdk.ServiceBannerCo
191195
return codersdk.ServiceBannerConfig{},nil
192196
}
193197

198+
func (c*Client)PushDERPMapUpdate(update agentsdk.DERPMapUpdate)error {
199+
timer:=time.NewTimer(testutil.WaitShort)
200+
defertimer.Stop()
201+
select {
202+
casec.derpMapUpdates<-update:
203+
case<-timer.C:
204+
returnxerrors.New("timeout waiting to push derp map update")
205+
}
206+
207+
returnnil
208+
}
209+
210+
func (c*Client)DERPMapUpdates(_ context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer,error) {
211+
closed:=make(chanstruct{})
212+
returnc.derpMapUpdates,closeFunc(func()error {
213+
close(closed)
214+
returnnil
215+
}),nil
216+
}
217+
194218
typecloseFuncfunc()error
195219

196220
func (ccloseFunc)Close()error {

‎cli/netcheck.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func (r *RootCmd) netcheck() *clibase.Cmd {
2626
ctx,cancel:=context.WithTimeout(inv.Context(),30*time.Second)
2727
defercancel()
2828

29-
connInfo,err:=client.WorkspaceAgentConnectionInfo(ctx)
29+
connInfo,err:=client.WorkspaceAgentConnectionInfoGeneric(ctx)
3030
iferr!=nil {
3131
returnerr
3232
}

‎cli/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
477477
AppHostnameRegex:appHostnameRegex,
478478
Logger:logger.Named("coderd"),
479479
Database:dbfake.New(),
480-
DERPMap:derpMap,
480+
BaseDERPMap:derpMap,
481481
Pubsub:pubsub.NewInMemory(),
482482
CacheDir:cacheDir,
483483
GoogleTokenValidator:googleTokenValidator,
@@ -822,7 +822,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
822822

823823
ifcfg.Prometheus.Enable {
824824
// Agent metrics require reference to the tailnet coordinator, so must be initiated after Coder API.
825-
closeAgentsFunc,err:=prometheusmetrics.Agents(ctx,logger,options.PrometheusRegistry,coderAPI.Database,&coderAPI.TailnetCoordinator,options.DERPMap,coderAPI.Options.AgentInactiveDisconnectTimeout,0)
825+
closeAgentsFunc,err:=prometheusmetrics.Agents(ctx,logger,options.PrometheusRegistry,coderAPI.Database,&coderAPI.TailnetCoordinator,coderAPI.DERPMap,coderAPI.Options.AgentInactiveDisconnectTimeout,0)
826826
iferr!=nil {
827827
returnxerrors.Errorf("register agents prometheus metric: %w",err)
828828
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp