Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf3b75c9

Browse files
committed
chore(vpn): upsert agents with their network status
1 parent469ff7a commitf3b75c9

File tree

5 files changed

+565
-157
lines changed

5 files changed

+565
-157
lines changed

‎vpn/client.go‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@ import (
55
"net/http"
66
"net/netip"
77
"net/url"
8+
"time"
89

910
"golang.org/x/xerrors"
1011
"nhooyr.io/websocket"
12+
"tailscale.com/ipn/ipnstate"
1113
"tailscale.com/net/dns"
1214
"tailscale.com/wgengine/router"
1315

16+
"github.com/google/uuid"
1417
"github.com/tailscale/wireguard-go/tun"
1518

1619
"cdr.dev/slog"
@@ -23,6 +26,8 @@ import (
2326

2427
typeConninterface {
2528
CurrentWorkspaceState() (tailnet.WorkspaceUpdate,error)
29+
GetPeerDiagnostics(peerID uuid.UUID) tailnet.PeerDiagnostics
30+
Ping(ctx context.Context,ip netip.Addr) (time.Duration,bool,*ipnstate.PingResult,error)
2631
Close()error
2732
}
2833

‎vpn/tunnel.go‎

Lines changed: 161 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,36 @@ import (
77
"fmt"
88
"io"
99
"net/http"
10+
"net/netip"
1011
"net/url"
1112
"reflect"
1213
"strconv"
1314
"sync"
15+
"time"
1416
"unicode"
1517

18+
"golang.org/x/exp/maps"
1619
"golang.org/x/xerrors"
20+
"google.golang.org/protobuf/types/known/durationpb"
21+
"google.golang.org/protobuf/types/known/timestamppb"
1722
"tailscale.com/net/dns"
23+
"tailscale.com/util/dnsname"
1824
"tailscale.com/wgengine/router"
1925

26+
"github.com/google/uuid"
27+
2028
"cdr.dev/slog"
2129
"github.com/coder/coder/v2/coderd/util/ptr"
2230
"github.com/coder/coder/v2/tailnet"
2331
)
2432

33+
// The interval at which the tunnel sends network status updates to the manager.
34+
constnetStatusInterval=30*time.Second
35+
2536
typeTunnelstruct {
2637
speaker[*TunnelMessage,*ManagerMessage,ManagerMessage]
2738
ctx context.Context
39+
netLoopDonechanstruct{}
2840
requestLoopDonechanstruct{}
2941

3042
logger slog.Logger
@@ -35,6 +47,10 @@ type Tunnel struct {
3547
clientClient
3648
connConn
3749

50+
mu sync.Mutex
51+
// agents contains the agents that are currently connected to the tunnel.
52+
agentsmap[uuid.UUID]*tailnet.Agent
53+
3854
// clientLogger is a separate logger than `logger` when the `UseAsLogger`
3955
// option is used, to avoid the tunnel using itself as a sink for it's own
4056
// logs, which could lead to deadlocks.
@@ -66,14 +82,17 @@ func NewTunnel(
6682
logger:logger,
6783
clientLogger:logger,
6884
requestLoopDone:make(chanstruct{}),
85+
netLoopDone:make(chanstruct{}),
6986
client:client,
87+
agents:make(map[uuid.UUID]*tailnet.Agent),
7088
}
7189

7290
for_,opt:=rangeopts {
7391
opt(t)
7492
}
7593
t.speaker.start()
7694
got.requestLoop()
95+
got.netStatusLoop()
7796
returnt,nil
7897
}
7998

@@ -102,6 +121,20 @@ func (t *Tunnel) requestLoop() {
102121
}
103122
}
104123

124+
func (t*Tunnel)netStatusLoop() {
125+
ticker:=time.NewTicker(netStatusInterval)
126+
deferticker.Stop()
127+
deferclose(t.netLoopDone)
128+
for {
129+
select {
130+
case<-t.ctx.Done():
131+
return
132+
case<-ticker.C:
133+
t.sendAgentUpdate()
134+
}
135+
}
136+
}
137+
105138
// handleRPC handles unary RPCs from the manager.
106139
func (t*Tunnel)handleRPC(req*ManagerMessage,msgIDuint64)*TunnelMessage {
107140
resp:=&TunnelMessage{}
@@ -112,8 +145,12 @@ func (t *Tunnel) handleRPC(req *ManagerMessage, msgID uint64) *TunnelMessage {
112145
iferr!=nil {
113146
t.logger.Critical(t.ctx,"failed to get current workspace state",slog.Error(err))
114147
}
148+
update,err:=t.createPeerUpdate(state)
149+
iferr!=nil {
150+
t.logger.Error(t.ctx,"failed to populate agent network info",slog.Error(err))
151+
}
115152
resp.Msg=&TunnelMessage_PeerUpdate{
116-
PeerUpdate:convertWorkspaceUpdate(state),
153+
PeerUpdate:update,
117154
}
118155
returnresp
119156
case*ManagerMessage_Start:
@@ -194,9 +231,13 @@ func (t *Tunnel) ApplyNetworkSettings(ctx context.Context, ns *NetworkSettingsRe
194231
}
195232

196233
func (t*Tunnel)Update(update tailnet.WorkspaceUpdate)error {
234+
peerUpdate,err:=t.createPeerUpdate(update)
235+
iferr!=nil {
236+
t.logger.Error(t.ctx,"failed to populate agent network info",slog.Error(err))
237+
}
197238
msg:=&TunnelMessage{
198239
Msg:&TunnelMessage_PeerUpdate{
199-
PeerUpdate:convertWorkspaceUpdate(update),
240+
PeerUpdate:peerUpdate,
200241
},
201242
}
202243
select {
@@ -293,35 +334,30 @@ func sinkEntryToPb(e slog.SinkEntry) *Log {
293334
returnl
294335
}
295336

296-
funcconvertWorkspaceUpdate(update tailnet.WorkspaceUpdate)*PeerUpdate {
337+
// createPeerUpdate creates a PeerUpdate message from a workspace update, populating
338+
// the network status of the agents.
339+
func (t*Tunnel)createPeerUpdate(update tailnet.WorkspaceUpdate) (*PeerUpdate,error) {
297340
out:=&PeerUpdate{
298341
UpsertedWorkspaces:make([]*Workspace,len(update.UpsertedWorkspaces)),
299342
UpsertedAgents:make([]*Agent,len(update.UpsertedAgents)),
300343
DeletedWorkspaces:make([]*Workspace,len(update.DeletedWorkspaces)),
301344
DeletedAgents:make([]*Agent,len(update.DeletedAgents)),
302345
}
346+
347+
t.saveUpdate(update)
348+
303349
fori,ws:=rangeupdate.UpsertedWorkspaces {
304350
out.UpsertedWorkspaces[i]=&Workspace{
305351
Id:tailnet.UUIDToByteSlice(ws.ID),
306352
Name:ws.Name,
307353
Status:Workspace_Status(ws.Status),
308354
}
309355
}
310-
fori,agent:=rangeupdate.UpsertedAgents {
311-
fqdn:=make([]string,0,len(agent.Hosts))
312-
forname:=rangeagent.Hosts {
313-
fqdn=append(fqdn,name.WithTrailingDot())
314-
}
315-
out.UpsertedAgents[i]=&Agent{
316-
Id:tailnet.UUIDToByteSlice(agent.ID),
317-
Name:agent.Name,
318-
WorkspaceId:tailnet.UUIDToByteSlice(agent.WorkspaceID),
319-
Fqdn:fqdn,
320-
IpAddrs: []string{tailnet.CoderServicePrefix.AddrFromUUID(agent.ID).String()},
321-
// TODO: Populate
322-
LastHandshake:nil,
323-
}
356+
upsertedAgents,err:=t.populateAgents(update.UpsertedAgents)
357+
iferr!=nil {
358+
returnnil,xerrors.Errorf("failed to populate agent network info: %w",err)
324359
}
360+
out.UpsertedAgents=upsertedAgents
325361
fori,ws:=rangeupdate.DeletedWorkspaces {
326362
out.DeletedWorkspaces[i]=&Workspace{
327363
Id:tailnet.UUIDToByteSlice(ws.ID),
@@ -335,16 +371,106 @@ func convertWorkspaceUpdate(update tailnet.WorkspaceUpdate) *PeerUpdate {
335371
fqdn=append(fqdn,name.WithTrailingDot())
336372
}
337373
out.DeletedAgents[i]=&Agent{
374+
Id:tailnet.UUIDToByteSlice(agent.ID),
375+
Name:agent.Name,
376+
WorkspaceId:tailnet.UUIDToByteSlice(agent.WorkspaceID),
377+
Fqdn:fqdn,
378+
IpAddrs:hostsToIPStrings(agent.Hosts),
379+
LastHandshake:nil,
380+
Latency:nil,
381+
}
382+
}
383+
returnout,nil
384+
}
385+
386+
// Given a list of agents, populate their network info, and return them as proto agents.
387+
func (t*Tunnel)populateAgents(agents []*tailnet.Agent) ([]*Agent,error) {
388+
ift.conn==nil {
389+
returnnil,xerrors.New("no active connection")
390+
}
391+
392+
out:=make([]*Agent,0,len(agents))
393+
varwg sync.WaitGroup
394+
pingCtx,cancelFunc:=context.WithTimeout(context.Background(),5*time.Second)
395+
defercancelFunc()
396+
397+
for_,agent:=rangeagents {
398+
fqdn:=make([]string,0,len(agent.Hosts))
399+
forname:=rangeagent.Hosts {
400+
fqdn=append(fqdn,name.WithTrailingDot())
401+
}
402+
protoAgent:=&Agent{
338403
Id:tailnet.UUIDToByteSlice(agent.ID),
339404
Name:agent.Name,
340405
WorkspaceId:tailnet.UUIDToByteSlice(agent.WorkspaceID),
341406
Fqdn:fqdn,
342-
IpAddrs: []string{tailnet.CoderServicePrefix.AddrFromUUID(agent.ID).String()},
343-
// TODO: Populate
344-
LastHandshake:nil,
407+
IpAddrs:hostsToIPStrings(agent.Hosts),
345408
}
409+
agentIP:=tailnet.CoderServicePrefix.AddrFromUUID(agent.ID)
410+
wg.Add(1)
411+
gofunc() {
412+
deferwg.Done()
413+
duration,_,_,err:=t.conn.Ping(pingCtx,agentIP)
414+
iferr!=nil {
415+
return
416+
}
417+
protoAgent.Latency=durationpb.New(duration)
418+
}()
419+
diags:=t.conn.GetPeerDiagnostics(agent.ID)
420+
//nolint:revive // outdated rule
421+
protoAgent.LastHandshake=timestamppb.New(diags.LastWireguardHandshake)
422+
out=append(out,protoAgent)
423+
}
424+
wg.Wait()
425+
426+
returnout,nil
427+
}
428+
429+
// saveUpdate saves the workspace update to the tunnel's state, such that it can
430+
// be used to populate automated peer updates.
431+
func (t*Tunnel)saveUpdate(update tailnet.WorkspaceUpdate) {
432+
t.mu.Lock()
433+
defert.mu.Unlock()
434+
435+
for_,agent:=rangeupdate.UpsertedAgents {
436+
t.agents[agent.ID]=agent
437+
}
438+
for_,agent:=rangeupdate.DeletedAgents {
439+
delete(t.agents,agent.ID)
440+
}
441+
}
442+
443+
// sendAgentUpdate sends a peer update message to the manager with the current
444+
// state of the agents, including the latest network status.
445+
func (t*Tunnel)sendAgentUpdate() {
446+
// The lock must be held until we send the message,
447+
// else we risk upserting a deleted agent.
448+
t.mu.Lock()
449+
defert.mu.Unlock()
450+
451+
upsertedAgents,err:=t.populateAgents(maps.Values(t.agents))
452+
iferr!=nil {
453+
t.logger.Error(t.ctx,"failed to produce agent network status update",slog.Error(err))
454+
return
455+
}
456+
457+
iflen(upsertedAgents)==0 {
458+
return
459+
}
460+
461+
msg:=&TunnelMessage{
462+
Msg:&TunnelMessage_PeerUpdate{
463+
PeerUpdate:&PeerUpdate{
464+
UpsertedAgents:upsertedAgents,
465+
},
466+
},
467+
}
468+
469+
select {
470+
case<-t.ctx.Done():
471+
return
472+
caset.sendCh<-msg:
346473
}
347-
returnout
348474
}
349475

350476
// the following are taken from sloghuman:
@@ -404,3 +530,17 @@ func quote(key string) string {
404530
}
405531
returnquoted
406532
}
533+
534+
funchostsToIPStrings(hostsmap[dnsname.FQDN][]netip.Addr) []string {
535+
seen:=make(map[netip.Addr]struct{})
536+
varresult []string
537+
for_,inner:=rangehosts {
538+
for_,elem:=rangeinner {
539+
if_,exists:=seen[elem];!exists {
540+
seen[elem]=struct{}{}
541+
result=append(result,elem.String())
542+
}
543+
}
544+
}
545+
returnresult
546+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp