Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit594fbc6

Browse files
committed
chore(vpn): upsert agents with their network status
1 parent26561dd commit594fbc6

File tree

5 files changed

+566
-158
lines changed

5 files changed

+566
-158
lines changed

‎vpn/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@ import (
55
"net/http"
66
"net/netip"
77
"net/url"
8+
"time"
89

910
"golang.org/x/xerrors"
1011
"nhooyr.io/websocket"
12+
"tailscale.com/ipn/ipnstate"
1113
"tailscale.com/net/dns"
1214
"tailscale.com/wgengine/router"
1315

16+
"github.com/google/uuid"
1417
"github.com/tailscale/wireguard-go/tun"
1518

1619
"cdr.dev/slog"
@@ -23,6 +26,8 @@ import (
2326

2427
typeConninterface {
2528
CurrentWorkspaceState() (tailnet.WorkspaceUpdate,error)
29+
GetPeerDiagnostics(peerID uuid.UUID) tailnet.PeerDiagnostics
30+
Ping(ctx context.Context,ip netip.Addr) (time.Duration,bool,*ipnstate.PingResult,error)
2631
Close()error
2732
}
2833

‎vpn/tunnel.go

Lines changed: 162 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,36 @@ import (
77
"fmt"
88
"io"
99
"net/http"
10+
"net/netip"
1011
"net/url"
1112
"reflect"
1213
"strconv"
1314
"sync"
15+
"time"
1416
"unicode"
1517

18+
"golang.org/x/exp/maps"
1619
"golang.org/x/xerrors"
20+
"google.golang.org/protobuf/types/known/durationpb"
21+
"google.golang.org/protobuf/types/known/timestamppb"
1722
"tailscale.com/net/dns"
23+
"tailscale.com/util/dnsname"
1824
"tailscale.com/wgengine/router"
1925

26+
"github.com/google/uuid"
27+
2028
"cdr.dev/slog"
2129
"github.com/coder/coder/v2/coderd/util/ptr"
2230
"github.com/coder/coder/v2/tailnet"
2331
)
2432

33+
// The interval at which the tunnel sends network status updates to the manager.
34+
constnetStatusInterval=30*time.Second
35+
2536
typeTunnelstruct {
2637
speaker[*TunnelMessage,*ManagerMessage,ManagerMessage]
2738
ctx context.Context
39+
netLoopDonechanstruct{}
2840
requestLoopDonechanstruct{}
2941

3042
logger slog.Logger
@@ -35,6 +47,10 @@ type Tunnel struct {
3547
clientClient
3648
connConn
3749

50+
mu sync.Mutex
51+
// agents contains the agents that are currently connected to the tunnel.
52+
agentsmap[uuid.UUID]*tailnet.Agent
53+
3854
// router and dnsConfigurator may be nil
3955
router router.Router
4056
dnsConfigurator dns.OSConfigurator
@@ -61,14 +77,17 @@ func NewTunnel(
6177
ctx:ctx,
6278
logger:logger,
6379
requestLoopDone:make(chanstruct{}),
80+
netLoopDone:make(chanstruct{}),
6481
client:client,
82+
agents:make(map[uuid.UUID]*tailnet.Agent),
6583
}
6684

6785
for_,opt:=rangeopts {
6886
opt(t)
6987
}
7088
t.speaker.start()
7189
got.requestLoop()
90+
got.netStatusLoop()
7291
returnt,nil
7392
}
7493

@@ -97,6 +116,20 @@ func (t *Tunnel) requestLoop() {
97116
}
98117
}
99118

119+
func (t*Tunnel)netStatusLoop() {
120+
ticker:=time.NewTicker(netStatusInterval)
121+
deferticker.Stop()
122+
deferclose(t.netLoopDone)
123+
for {
124+
select {
125+
case<-t.ctx.Done():
126+
return
127+
case<-ticker.C:
128+
t.sendAgentUpdate()
129+
}
130+
}
131+
}
132+
100133
// handleRPC handles unary RPCs from the manager.
101134
func (t*Tunnel)handleRPC(req*ManagerMessage,msgIDuint64)*TunnelMessage {
102135
resp:=&TunnelMessage{}
@@ -107,8 +140,12 @@ func (t *Tunnel) handleRPC(req *ManagerMessage, msgID uint64) *TunnelMessage {
107140
iferr!=nil {
108141
t.logger.Critical(t.ctx,"failed to get current workspace state",slog.Error(err))
109142
}
143+
update,err:=t.createPeerUpdate(state)
144+
iferr!=nil {
145+
t.logger.Error(t.ctx,"failed to populate agent network info",slog.Error(err))
146+
}
110147
resp.Msg=&TunnelMessage_PeerUpdate{
111-
PeerUpdate:convertWorkspaceUpdate(state),
148+
PeerUpdate:update,
112149
}
113150
returnresp
114151
case*ManagerMessage_Start:
@@ -189,9 +226,13 @@ func (t *Tunnel) ApplyNetworkSettings(ctx context.Context, ns *NetworkSettingsRe
189226
}
190227

191228
func (t*Tunnel)Update(update tailnet.WorkspaceUpdate)error {
229+
peerUpdate,err:=t.createPeerUpdate(update)
230+
iferr!=nil {
231+
t.logger.Error(t.ctx,"failed to populate agent network info",slog.Error(err))
232+
}
192233
msg:=&TunnelMessage{
193234
Msg:&TunnelMessage_PeerUpdate{
194-
PeerUpdate:convertWorkspaceUpdate(update),
235+
PeerUpdate:peerUpdate,
195236
},
196237
}
197238
select {
@@ -288,35 +329,30 @@ func sinkEntryToPb(e slog.SinkEntry) *Log {
288329
returnl
289330
}
290331

291-
funcconvertWorkspaceUpdate(update tailnet.WorkspaceUpdate)*PeerUpdate {
332+
// createPeerUpdate creates a PeerUpdate message from a workspace update, populating
333+
// the network status of the agents.
334+
func (t*Tunnel)createPeerUpdate(update tailnet.WorkspaceUpdate) (*PeerUpdate,error) {
292335
out:=&PeerUpdate{
293336
UpsertedWorkspaces:make([]*Workspace,len(update.UpsertedWorkspaces)),
294337
UpsertedAgents:make([]*Agent,len(update.UpsertedAgents)),
295338
DeletedWorkspaces:make([]*Workspace,len(update.DeletedWorkspaces)),
296339
DeletedAgents:make([]*Agent,len(update.DeletedAgents)),
297340
}
341+
342+
t.saveUpdate(update)
343+
298344
fori,ws:=rangeupdate.UpsertedWorkspaces {
299345
out.UpsertedWorkspaces[i]=&Workspace{
300346
Id:tailnet.UUIDToByteSlice(ws.ID),
301347
Name:ws.Name,
302348
Status:Workspace_Status(ws.Status),
303349
}
304350
}
305-
fori,agent:=rangeupdate.UpsertedAgents {
306-
fqdn:=make([]string,0,len(agent.Hosts))
307-
forname:=rangeagent.Hosts {
308-
fqdn=append(fqdn,name.WithoutTrailingDot())
309-
}
310-
out.UpsertedAgents[i]=&Agent{
311-
Id:tailnet.UUIDToByteSlice(agent.ID),
312-
Name:agent.Name,
313-
WorkspaceId:tailnet.UUIDToByteSlice(agent.WorkspaceID),
314-
Fqdn:fqdn,
315-
IpAddrs: []string{tailnet.CoderServicePrefix.AddrFromUUID(agent.ID).String()},
316-
// TODO: Populate
317-
LastHandshake:nil,
318-
}
351+
upsertedAgents,err:=t.populateAgents(update.UpsertedAgents)
352+
iferr!=nil {
353+
returnnil,xerrors.Errorf("failed to populate agent network info: %w",err)
319354
}
355+
out.UpsertedAgents=upsertedAgents
320356
fori,ws:=rangeupdate.DeletedWorkspaces {
321357
out.DeletedWorkspaces[i]=&Workspace{
322358
Id:tailnet.UUIDToByteSlice(ws.ID),
@@ -327,19 +363,109 @@ func convertWorkspaceUpdate(update tailnet.WorkspaceUpdate) *PeerUpdate {
327363
fori,agent:=rangeupdate.DeletedAgents {
328364
fqdn:=make([]string,0,len(agent.Hosts))
329365
forname:=rangeagent.Hosts {
330-
fqdn=append(fqdn,name.WithoutTrailingDot())
366+
fqdn=append(fqdn,name.WithTrailingDot())
331367
}
332368
out.DeletedAgents[i]=&Agent{
369+
Id:tailnet.UUIDToByteSlice(agent.ID),
370+
Name:agent.Name,
371+
WorkspaceId:tailnet.UUIDToByteSlice(agent.WorkspaceID),
372+
Fqdn:fqdn,
373+
IpAddrs:hostsToIPStrings(agent.Hosts),
374+
LastHandshake:nil,
375+
Latency:nil,
376+
}
377+
}
378+
returnout,nil
379+
}
380+
381+
// Given a list of agents, populate their network info, and return them as proto agents.
382+
func (t*Tunnel)populateAgents(agents []*tailnet.Agent) ([]*Agent,error) {
383+
ift.conn==nil {
384+
returnnil,xerrors.New("no active connection")
385+
}
386+
387+
out:=make([]*Agent,0,len(agents))
388+
varwg sync.WaitGroup
389+
pingCtx,cancelFunc:=context.WithTimeout(context.Background(),5*time.Second)
390+
defercancelFunc()
391+
392+
for_,agent:=rangeagents {
393+
fqdn:=make([]string,0,len(agent.Hosts))
394+
forname:=rangeagent.Hosts {
395+
fqdn=append(fqdn,name.WithTrailingDot())
396+
}
397+
protoAgent:=&Agent{
333398
Id:tailnet.UUIDToByteSlice(agent.ID),
334399
Name:agent.Name,
335400
WorkspaceId:tailnet.UUIDToByteSlice(agent.WorkspaceID),
336401
Fqdn:fqdn,
337-
IpAddrs: []string{tailnet.CoderServicePrefix.AddrFromUUID(agent.ID).String()},
338-
// TODO: Populate
339-
LastHandshake:nil,
402+
IpAddrs:hostsToIPStrings(agent.Hosts),
340403
}
404+
agentIP:=tailnet.CoderServicePrefix.AddrFromUUID(agent.ID)
405+
wg.Add(1)
406+
gofunc() {
407+
deferwg.Done()
408+
duration,_,_,err:=t.conn.Ping(pingCtx,agentIP)
409+
iferr!=nil {
410+
return
411+
}
412+
protoAgent.Latency=durationpb.New(duration)
413+
}()
414+
diags:=t.conn.GetPeerDiagnostics(agent.ID)
415+
//nolint:revive // outdated rule
416+
protoAgent.LastHandshake=timestamppb.New(diags.LastWireguardHandshake)
417+
out=append(out,protoAgent)
418+
}
419+
wg.Wait()
420+
421+
returnout,nil
422+
}
423+
424+
// saveUpdate saves the workspace update to the tunnel's state, such that it can
425+
// be used to populate automated peer updates.
426+
func (t*Tunnel)saveUpdate(update tailnet.WorkspaceUpdate) {
427+
t.mu.Lock()
428+
defert.mu.Unlock()
429+
430+
for_,agent:=rangeupdate.UpsertedAgents {
431+
t.agents[agent.ID]=agent
432+
}
433+
for_,agent:=rangeupdate.DeletedAgents {
434+
delete(t.agents,agent.ID)
435+
}
436+
}
437+
438+
// sendAgentUpdate sends a peer update message to the manager with the current
439+
// state of the agents, including the latest network status.
440+
func (t*Tunnel)sendAgentUpdate() {
441+
// The lock must be held until we send the message,
442+
// else we risk upserting a deleted agent.
443+
t.mu.Lock()
444+
defert.mu.Unlock()
445+
446+
upsertedAgents,err:=t.populateAgents(maps.Values(t.agents))
447+
iferr!=nil {
448+
t.logger.Error(t.ctx,"failed to produce agent network status update",slog.Error(err))
449+
return
450+
}
451+
452+
iflen(upsertedAgents)==0 {
453+
return
454+
}
455+
456+
msg:=&TunnelMessage{
457+
Msg:&TunnelMessage_PeerUpdate{
458+
PeerUpdate:&PeerUpdate{
459+
UpsertedAgents:upsertedAgents,
460+
},
461+
},
462+
}
463+
464+
select {
465+
case<-t.ctx.Done():
466+
return
467+
caset.sendCh<-msg:
341468
}
342-
returnout
343469
}
344470

345471
// the following are taken from sloghuman:
@@ -399,3 +525,17 @@ func quote(key string) string {
399525
}
400526
returnquoted
401527
}
528+
529+
funchostsToIPStrings(hostsmap[dnsname.FQDN][]netip.Addr) []string {
530+
seen:=make(map[netip.Addr]struct{})
531+
varresult []string
532+
for_,inner:=rangehosts {
533+
for_,elem:=rangeinner {
534+
if_,exists:=seen[elem];!exists {
535+
seen[elem]=struct{}{}
536+
result=append(result,elem.String())
537+
}
538+
}
539+
}
540+
returnresult
541+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp