Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita3f95e8

Browse files
committed
chore(vpn): upsert agents with their network status
1 parent1ac2bae commita3f95e8

File tree

5 files changed

+565
-157
lines changed

5 files changed

+565
-157
lines changed

‎vpn/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@ import (
55
"net/http"
66
"net/netip"
77
"net/url"
8+
"time"
89

910
"golang.org/x/xerrors"
1011
"nhooyr.io/websocket"
12+
"tailscale.com/ipn/ipnstate"
1113
"tailscale.com/net/dns"
1214
"tailscale.com/wgengine/router"
1315

16+
"github.com/google/uuid"
1417
"github.com/tailscale/wireguard-go/tun"
1518

1619
"cdr.dev/slog"
@@ -23,6 +26,8 @@ import (
2326

2427
typeConninterface {
2528
CurrentWorkspaceState() (tailnet.WorkspaceUpdate,error)
29+
GetPeerDiagnostics(peerID uuid.UUID) tailnet.PeerDiagnostics
30+
Ping(ctx context.Context,ip netip.Addr) (time.Duration,bool,*ipnstate.PingResult,error)
2631
Close()error
2732
}
2833

‎vpn/tunnel.go

Lines changed: 161 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,36 @@ import (
77
"fmt"
88
"io"
99
"net/http"
10+
"net/netip"
1011
"net/url"
1112
"reflect"
1213
"strconv"
1314
"sync"
15+
"time"
1416
"unicode"
1517

18+
"golang.org/x/exp/maps"
1619
"golang.org/x/xerrors"
20+
"google.golang.org/protobuf/types/known/durationpb"
21+
"google.golang.org/protobuf/types/known/timestamppb"
1722
"tailscale.com/net/dns"
23+
"tailscale.com/util/dnsname"
1824
"tailscale.com/wgengine/router"
1925

26+
"github.com/google/uuid"
27+
2028
"cdr.dev/slog"
2129
"github.com/coder/coder/v2/coderd/util/ptr"
2230
"github.com/coder/coder/v2/tailnet"
2331
)
2432

33+
// The interval at which the tunnel sends network status updates to the manager.
34+
constnetStatusInterval=30*time.Second
35+
2536
typeTunnelstruct {
2637
speaker[*TunnelMessage,*ManagerMessage,ManagerMessage]
2738
ctx context.Context
39+
netLoopDonechanstruct{}
2840
requestLoopDonechanstruct{}
2941

3042
logger slog.Logger
@@ -35,6 +47,10 @@ type Tunnel struct {
3547
clientClient
3648
connConn
3749

50+
mu sync.Mutex
51+
// agents contains the agents that are currently connected to the tunnel.
52+
agentsmap[uuid.UUID]*tailnet.Agent
53+
3854
// clientLogger is deliberately separate, to avoid the tunnel using itself
3955
// as a sink for it's own logs, which could lead to deadlocks
4056
clientLogger slog.Logger
@@ -65,14 +81,17 @@ func NewTunnel(
6581
logger:logger,
6682
clientLogger:slog.Make(),
6783
requestLoopDone:make(chanstruct{}),
84+
netLoopDone:make(chanstruct{}),
6885
client:client,
86+
agents:make(map[uuid.UUID]*tailnet.Agent),
6987
}
7088

7189
for_,opt:=rangeopts {
7290
opt(t)
7391
}
7492
t.speaker.start()
7593
got.requestLoop()
94+
got.netStatusLoop()
7695
returnt,nil
7796
}
7897

@@ -101,6 +120,20 @@ func (t *Tunnel) requestLoop() {
101120
}
102121
}
103122

123+
func (t*Tunnel)netStatusLoop() {
124+
ticker:=time.NewTicker(netStatusInterval)
125+
deferticker.Stop()
126+
deferclose(t.netLoopDone)
127+
for {
128+
select {
129+
case<-t.ctx.Done():
130+
return
131+
case<-ticker.C:
132+
t.sendAgentUpdate()
133+
}
134+
}
135+
}
136+
104137
// handleRPC handles unary RPCs from the manager.
105138
func (t*Tunnel)handleRPC(req*ManagerMessage,msgIDuint64)*TunnelMessage {
106139
resp:=&TunnelMessage{}
@@ -111,8 +144,12 @@ func (t *Tunnel) handleRPC(req *ManagerMessage, msgID uint64) *TunnelMessage {
111144
iferr!=nil {
112145
t.logger.Critical(t.ctx,"failed to get current workspace state",slog.Error(err))
113146
}
147+
update,err:=t.createPeerUpdate(state)
148+
iferr!=nil {
149+
t.logger.Error(t.ctx,"failed to populate agent network info",slog.Error(err))
150+
}
114151
resp.Msg=&TunnelMessage_PeerUpdate{
115-
PeerUpdate:convertWorkspaceUpdate(state),
152+
PeerUpdate:update,
116153
}
117154
returnresp
118155
case*ManagerMessage_Start:
@@ -193,9 +230,13 @@ func (t *Tunnel) ApplyNetworkSettings(ctx context.Context, ns *NetworkSettingsRe
193230
}
194231

195232
func (t*Tunnel)Update(update tailnet.WorkspaceUpdate)error {
233+
peerUpdate,err:=t.createPeerUpdate(update)
234+
iferr!=nil {
235+
t.logger.Error(t.ctx,"failed to populate agent network info",slog.Error(err))
236+
}
196237
msg:=&TunnelMessage{
197238
Msg:&TunnelMessage_PeerUpdate{
198-
PeerUpdate:convertWorkspaceUpdate(update),
239+
PeerUpdate:peerUpdate,
199240
},
200241
}
201242
select {
@@ -292,35 +333,30 @@ func sinkEntryToPb(e slog.SinkEntry) *Log {
292333
returnl
293334
}
294335

295-
funcconvertWorkspaceUpdate(update tailnet.WorkspaceUpdate)*PeerUpdate {
336+
// createPeerUpdate creates a PeerUpdate message from a workspace update, populating
337+
// the network status of the agents.
338+
func (t*Tunnel)createPeerUpdate(update tailnet.WorkspaceUpdate) (*PeerUpdate,error) {
296339
out:=&PeerUpdate{
297340
UpsertedWorkspaces:make([]*Workspace,len(update.UpsertedWorkspaces)),
298341
UpsertedAgents:make([]*Agent,len(update.UpsertedAgents)),
299342
DeletedWorkspaces:make([]*Workspace,len(update.DeletedWorkspaces)),
300343
DeletedAgents:make([]*Agent,len(update.DeletedAgents)),
301344
}
345+
346+
t.saveUpdate(update)
347+
302348
fori,ws:=rangeupdate.UpsertedWorkspaces {
303349
out.UpsertedWorkspaces[i]=&Workspace{
304350
Id:tailnet.UUIDToByteSlice(ws.ID),
305351
Name:ws.Name,
306352
Status:Workspace_Status(ws.Status),
307353
}
308354
}
309-
fori,agent:=rangeupdate.UpsertedAgents {
310-
fqdn:=make([]string,0,len(agent.Hosts))
311-
forname:=rangeagent.Hosts {
312-
fqdn=append(fqdn,name.WithTrailingDot())
313-
}
314-
out.UpsertedAgents[i]=&Agent{
315-
Id:tailnet.UUIDToByteSlice(agent.ID),
316-
Name:agent.Name,
317-
WorkspaceId:tailnet.UUIDToByteSlice(agent.WorkspaceID),
318-
Fqdn:fqdn,
319-
IpAddrs: []string{tailnet.CoderServicePrefix.AddrFromUUID(agent.ID).String()},
320-
// TODO: Populate
321-
LastHandshake:nil,
322-
}
355+
upsertedAgents,err:=t.populateAgents(update.UpsertedAgents)
356+
iferr!=nil {
357+
returnnil,xerrors.Errorf("failed to populate agent network info: %w",err)
323358
}
359+
out.UpsertedAgents=upsertedAgents
324360
fori,ws:=rangeupdate.DeletedWorkspaces {
325361
out.DeletedWorkspaces[i]=&Workspace{
326362
Id:tailnet.UUIDToByteSlice(ws.ID),
@@ -334,16 +370,106 @@ func convertWorkspaceUpdate(update tailnet.WorkspaceUpdate) *PeerUpdate {
334370
fqdn=append(fqdn,name.WithTrailingDot())
335371
}
336372
out.DeletedAgents[i]=&Agent{
373+
Id:tailnet.UUIDToByteSlice(agent.ID),
374+
Name:agent.Name,
375+
WorkspaceId:tailnet.UUIDToByteSlice(agent.WorkspaceID),
376+
Fqdn:fqdn,
377+
IpAddrs:hostsToIPStrings(agent.Hosts),
378+
LastHandshake:nil,
379+
Latency:nil,
380+
}
381+
}
382+
returnout,nil
383+
}
384+
385+
// Given a list of agents, populate their network info, and return them as proto agents.
386+
func (t*Tunnel)populateAgents(agents []*tailnet.Agent) ([]*Agent,error) {
387+
ift.conn==nil {
388+
returnnil,xerrors.New("no active connection")
389+
}
390+
391+
out:=make([]*Agent,0,len(agents))
392+
varwg sync.WaitGroup
393+
pingCtx,cancelFunc:=context.WithTimeout(context.Background(),5*time.Second)
394+
defercancelFunc()
395+
396+
for_,agent:=rangeagents {
397+
fqdn:=make([]string,0,len(agent.Hosts))
398+
forname:=rangeagent.Hosts {
399+
fqdn=append(fqdn,name.WithTrailingDot())
400+
}
401+
protoAgent:=&Agent{
337402
Id:tailnet.UUIDToByteSlice(agent.ID),
338403
Name:agent.Name,
339404
WorkspaceId:tailnet.UUIDToByteSlice(agent.WorkspaceID),
340405
Fqdn:fqdn,
341-
IpAddrs: []string{tailnet.CoderServicePrefix.AddrFromUUID(agent.ID).String()},
342-
// TODO: Populate
343-
LastHandshake:nil,
406+
IpAddrs:hostsToIPStrings(agent.Hosts),
344407
}
408+
agentIP:=tailnet.CoderServicePrefix.AddrFromUUID(agent.ID)
409+
wg.Add(1)
410+
gofunc() {
411+
deferwg.Done()
412+
duration,_,_,err:=t.conn.Ping(pingCtx,agentIP)
413+
iferr!=nil {
414+
return
415+
}
416+
protoAgent.Latency=durationpb.New(duration)
417+
}()
418+
diags:=t.conn.GetPeerDiagnostics(agent.ID)
419+
//nolint:revive // outdated rule
420+
protoAgent.LastHandshake=timestamppb.New(diags.LastWireguardHandshake)
421+
out=append(out,protoAgent)
422+
}
423+
wg.Wait()
424+
425+
returnout,nil
426+
}
427+
428+
// saveUpdate saves the workspace update to the tunnel's state, such that it can
429+
// be used to populate automated peer updates.
430+
func (t*Tunnel)saveUpdate(update tailnet.WorkspaceUpdate) {
431+
t.mu.Lock()
432+
defert.mu.Unlock()
433+
434+
for_,agent:=rangeupdate.UpsertedAgents {
435+
t.agents[agent.ID]=agent
436+
}
437+
for_,agent:=rangeupdate.DeletedAgents {
438+
delete(t.agents,agent.ID)
439+
}
440+
}
441+
442+
// sendAgentUpdate sends a peer update message to the manager with the current
443+
// state of the agents, including the latest network status.
444+
func (t*Tunnel)sendAgentUpdate() {
445+
// The lock must be held until we send the message,
446+
// else we risk upserting a deleted agent.
447+
t.mu.Lock()
448+
defert.mu.Unlock()
449+
450+
upsertedAgents,err:=t.populateAgents(maps.Values(t.agents))
451+
iferr!=nil {
452+
t.logger.Error(t.ctx,"failed to produce agent network status update",slog.Error(err))
453+
return
454+
}
455+
456+
iflen(upsertedAgents)==0 {
457+
return
458+
}
459+
460+
msg:=&TunnelMessage{
461+
Msg:&TunnelMessage_PeerUpdate{
462+
PeerUpdate:&PeerUpdate{
463+
UpsertedAgents:upsertedAgents,
464+
},
465+
},
466+
}
467+
468+
select {
469+
case<-t.ctx.Done():
470+
return
471+
caset.sendCh<-msg:
345472
}
346-
returnout
347473
}
348474

349475
// the following are taken from sloghuman:
@@ -403,3 +529,17 @@ func quote(key string) string {
403529
}
404530
returnquoted
405531
}
532+
533+
funchostsToIPStrings(hostsmap[dnsname.FQDN][]netip.Addr) []string {
534+
seen:=make(map[netip.Addr]struct{})
535+
varresult []string
536+
for_,inner:=rangehosts {
537+
for_,elem:=rangeinner {
538+
if_,exists:=seen[elem];!exists {
539+
seen[elem]=struct{}{}
540+
result=append(result,elem.String())
541+
}
542+
}
543+
}
544+
returnresult
545+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp