@@ -7,24 +7,36 @@ import (
77"fmt"
88"io"
99"net/http"
10+ "net/netip"
1011"net/url"
1112"reflect"
1213"strconv"
1314"sync"
15+ "time"
1416"unicode"
1517
18+ "golang.org/x/exp/maps"
1619"golang.org/x/xerrors"
20+ "google.golang.org/protobuf/types/known/durationpb"
21+ "google.golang.org/protobuf/types/known/timestamppb"
1722"tailscale.com/net/dns"
23+ "tailscale.com/util/dnsname"
1824"tailscale.com/wgengine/router"
1925
26+ "github.com/google/uuid"
27+
2028"cdr.dev/slog"
2129"github.com/coder/coder/v2/coderd/util/ptr"
2230"github.com/coder/coder/v2/tailnet"
2331)
2432
33+ // The interval at which the tunnel sends network status updates to the manager.
34+ const netStatusInterval = 30 * time .Second
35+
2536type Tunnel struct {
2637speaker [* TunnelMessage ,* ManagerMessage ,ManagerMessage ]
2738ctx context.Context
39+ netLoopDone chan struct {}
2840requestLoopDone chan struct {}
2941
3042logger slog.Logger
@@ -35,6 +47,10 @@ type Tunnel struct {
3547client Client
3648conn Conn
3749
50+ mu sync.Mutex
51+ // agents contains the agents that are currently connected to the tunnel.
52+ agents map [uuid.UUID ]* tailnet.Agent
53+
3854// clientLogger is a separate logger than `logger` when the `UseAsLogger`
3955// option is used, to avoid the tunnel using itself as a sink for it's own
4056// logs, which could lead to deadlocks.
@@ -66,14 +82,17 @@ func NewTunnel(
6682logger :logger ,
6783clientLogger :logger ,
6884requestLoopDone :make (chan struct {}),
85+ netLoopDone :make (chan struct {}),
6986client :client ,
87+ agents :make (map [uuid.UUID ]* tailnet.Agent ),
7088}
7189
7290for _ ,opt := range opts {
7391opt (t )
7492}
7593t .speaker .start ()
7694go t .requestLoop ()
95+ go t .netStatusLoop ()
7796return t ,nil
7897}
7998
@@ -102,6 +121,20 @@ func (t *Tunnel) requestLoop() {
102121}
103122}
104123
124+ func (t * Tunnel )netStatusLoop () {
125+ ticker := time .NewTicker (netStatusInterval )
126+ defer ticker .Stop ()
127+ defer close (t .netLoopDone )
128+ for {
129+ select {
130+ case <- t .ctx .Done ():
131+ return
132+ case <- ticker .C :
133+ t .sendAgentUpdate ()
134+ }
135+ }
136+ }
137+
105138// handleRPC handles unary RPCs from the manager.
106139func (t * Tunnel )handleRPC (req * ManagerMessage ,msgID uint64 )* TunnelMessage {
107140resp := & TunnelMessage {}
@@ -112,8 +145,12 @@ func (t *Tunnel) handleRPC(req *ManagerMessage, msgID uint64) *TunnelMessage {
112145if err != nil {
113146t .logger .Critical (t .ctx ,"failed to get current workspace state" ,slog .Error (err ))
114147}
148+ update ,err := t .createPeerUpdate (state )
149+ if err != nil {
150+ t .logger .Error (t .ctx ,"failed to populate agent network info" ,slog .Error (err ))
151+ }
115152resp .Msg = & TunnelMessage_PeerUpdate {
116- PeerUpdate :convertWorkspaceUpdate ( state ) ,
153+ PeerUpdate :update ,
117154}
118155return resp
119156case * ManagerMessage_Start :
@@ -194,9 +231,13 @@ func (t *Tunnel) ApplyNetworkSettings(ctx context.Context, ns *NetworkSettingsRe
194231}
195232
196233func (t * Tunnel )Update (update tailnet.WorkspaceUpdate )error {
234+ peerUpdate ,err := t .createPeerUpdate (update )
235+ if err != nil {
236+ t .logger .Error (t .ctx ,"failed to populate agent network info" ,slog .Error (err ))
237+ }
197238msg := & TunnelMessage {
198239Msg :& TunnelMessage_PeerUpdate {
199- PeerUpdate :convertWorkspaceUpdate ( update ) ,
240+ PeerUpdate :peerUpdate ,
200241},
201242}
202243select {
@@ -293,35 +334,30 @@ func sinkEntryToPb(e slog.SinkEntry) *Log {
293334return l
294335}
295336
296- func convertWorkspaceUpdate (update tailnet.WorkspaceUpdate )* PeerUpdate {
337+ // createPeerUpdate creates a PeerUpdate message from a workspace update, populating
338+ // the network status of the agents.
339+ func (t * Tunnel )createPeerUpdate (update tailnet.WorkspaceUpdate ) (* PeerUpdate ,error ) {
297340out := & PeerUpdate {
298341UpsertedWorkspaces :make ([]* Workspace ,len (update .UpsertedWorkspaces )),
299342UpsertedAgents :make ([]* Agent ,len (update .UpsertedAgents )),
300343DeletedWorkspaces :make ([]* Workspace ,len (update .DeletedWorkspaces )),
301344DeletedAgents :make ([]* Agent ,len (update .DeletedAgents )),
302345}
346+
347+ t .saveUpdate (update )
348+
303349for i ,ws := range update .UpsertedWorkspaces {
304350out .UpsertedWorkspaces [i ]= & Workspace {
305351Id :tailnet .UUIDToByteSlice (ws .ID ),
306352Name :ws .Name ,
307353Status :Workspace_Status (ws .Status ),
308354}
309355}
310- for i ,agent := range update .UpsertedAgents {
311- fqdn := make ([]string ,0 ,len (agent .Hosts ))
312- for name := range agent .Hosts {
313- fqdn = append (fqdn ,name .WithTrailingDot ())
314- }
315- out .UpsertedAgents [i ]= & Agent {
316- Id :tailnet .UUIDToByteSlice (agent .ID ),
317- Name :agent .Name ,
318- WorkspaceId :tailnet .UUIDToByteSlice (agent .WorkspaceID ),
319- Fqdn :fqdn ,
320- IpAddrs : []string {tailnet .CoderServicePrefix .AddrFromUUID (agent .ID ).String ()},
321- // TODO: Populate
322- LastHandshake :nil ,
323- }
356+ upsertedAgents ,err := t .populateAgents (update .UpsertedAgents )
357+ if err != nil {
358+ return nil ,xerrors .Errorf ("failed to populate agent network info: %w" ,err )
324359}
360+ out .UpsertedAgents = upsertedAgents
325361for i ,ws := range update .DeletedWorkspaces {
326362out .DeletedWorkspaces [i ]= & Workspace {
327363Id :tailnet .UUIDToByteSlice (ws .ID ),
@@ -335,16 +371,106 @@ func convertWorkspaceUpdate(update tailnet.WorkspaceUpdate) *PeerUpdate {
335371fqdn = append (fqdn ,name .WithTrailingDot ())
336372}
337373out .DeletedAgents [i ]= & Agent {
374+ Id :tailnet .UUIDToByteSlice (agent .ID ),
375+ Name :agent .Name ,
376+ WorkspaceId :tailnet .UUIDToByteSlice (agent .WorkspaceID ),
377+ Fqdn :fqdn ,
378+ IpAddrs :hostsToIPStrings (agent .Hosts ),
379+ LastHandshake :nil ,
380+ Latency :nil ,
381+ }
382+ }
383+ return out ,nil
384+ }
385+
386+ // Given a list of agents, populate their network info, and return them as proto agents.
387+ func (t * Tunnel )populateAgents (agents []* tailnet.Agent ) ([]* Agent ,error ) {
388+ if t .conn == nil {
389+ return nil ,xerrors .New ("no active connection" )
390+ }
391+
392+ out := make ([]* Agent ,0 ,len (agents ))
393+ var wg sync.WaitGroup
394+ pingCtx ,cancelFunc := context .WithTimeout (context .Background (),5 * time .Second )
395+ defer cancelFunc ()
396+
397+ for _ ,agent := range agents {
398+ fqdn := make ([]string ,0 ,len (agent .Hosts ))
399+ for name := range agent .Hosts {
400+ fqdn = append (fqdn ,name .WithTrailingDot ())
401+ }
402+ protoAgent := & Agent {
338403Id :tailnet .UUIDToByteSlice (agent .ID ),
339404Name :agent .Name ,
340405WorkspaceId :tailnet .UUIDToByteSlice (agent .WorkspaceID ),
341406Fqdn :fqdn ,
342- IpAddrs : []string {tailnet .CoderServicePrefix .AddrFromUUID (agent .ID ).String ()},
343- // TODO: Populate
344- LastHandshake :nil ,
407+ IpAddrs :hostsToIPStrings (agent .Hosts ),
345408}
409+ agentIP := tailnet .CoderServicePrefix .AddrFromUUID (agent .ID )
410+ wg .Add (1 )
411+ go func () {
412+ defer wg .Done ()
413+ duration ,_ ,_ ,err := t .conn .Ping (pingCtx ,agentIP )
414+ if err != nil {
415+ return
416+ }
417+ protoAgent .Latency = durationpb .New (duration )
418+ }()
419+ diags := t .conn .GetPeerDiagnostics (agent .ID )
420+ //nolint:revive // outdated rule
421+ protoAgent .LastHandshake = timestamppb .New (diags .LastWireguardHandshake )
422+ out = append (out ,protoAgent )
423+ }
424+ wg .Wait ()
425+
426+ return out ,nil
427+ }
428+
429+ // saveUpdate saves the workspace update to the tunnel's state, such that it can
430+ // be used to populate automated peer updates.
431+ func (t * Tunnel )saveUpdate (update tailnet.WorkspaceUpdate ) {
432+ t .mu .Lock ()
433+ defer t .mu .Unlock ()
434+
435+ for _ ,agent := range update .UpsertedAgents {
436+ t .agents [agent .ID ]= agent
437+ }
438+ for _ ,agent := range update .DeletedAgents {
439+ delete (t .agents ,agent .ID )
440+ }
441+ }
442+
443+ // sendAgentUpdate sends a peer update message to the manager with the current
444+ // state of the agents, including the latest network status.
445+ func (t * Tunnel )sendAgentUpdate () {
446+ // The lock must be held until we send the message,
447+ // else we risk upserting a deleted agent.
448+ t .mu .Lock ()
449+ defer t .mu .Unlock ()
450+
451+ upsertedAgents ,err := t .populateAgents (maps .Values (t .agents ))
452+ if err != nil {
453+ t .logger .Error (t .ctx ,"failed to produce agent network status update" ,slog .Error (err ))
454+ return
455+ }
456+
457+ if len (upsertedAgents )== 0 {
458+ return
459+ }
460+
461+ msg := & TunnelMessage {
462+ Msg :& TunnelMessage_PeerUpdate {
463+ PeerUpdate :& PeerUpdate {
464+ UpsertedAgents :upsertedAgents ,
465+ },
466+ },
467+ }
468+
469+ select {
470+ case <- t .ctx .Done ():
471+ return
472+ case t .sendCh <- msg :
346473}
347- return out
348474}
349475
350476// the following are taken from sloghuman:
@@ -404,3 +530,17 @@ func quote(key string) string {
404530}
405531return quoted
406532}
533+
534+ func hostsToIPStrings (hosts map [dnsname.FQDN ][]netip.Addr ) []string {
535+ seen := make (map [netip.Addr ]struct {})
536+ var result []string
537+ for _ ,inner := range hosts {
538+ for _ ,elem := range inner {
539+ if _ ,exists := seen [elem ];! exists {
540+ seen [elem ]= struct {}{}
541+ result = append (result ,elem .String ())
542+ }
543+ }
544+ }
545+ return result
546+ }