@@ -7,24 +7,36 @@ import (
7
7
"fmt"
8
8
"io"
9
9
"net/http"
10
+ "net/netip"
10
11
"net/url"
11
12
"reflect"
12
13
"strconv"
13
14
"sync"
15
+ "time"
14
16
"unicode"
15
17
18
+ "golang.org/x/exp/maps"
16
19
"golang.org/x/xerrors"
20
+ "google.golang.org/protobuf/types/known/durationpb"
21
+ "google.golang.org/protobuf/types/known/timestamppb"
17
22
"tailscale.com/net/dns"
23
+ "tailscale.com/util/dnsname"
18
24
"tailscale.com/wgengine/router"
19
25
26
+ "github.com/google/uuid"
27
+
20
28
"cdr.dev/slog"
21
29
"github.com/coder/coder/v2/coderd/util/ptr"
22
30
"github.com/coder/coder/v2/tailnet"
23
31
)
24
32
33
+ // The interval at which the tunnel sends network status updates to the manager.
34
+ const netStatusInterval = 30 * time .Second
35
+
25
36
type Tunnel struct {
26
37
speaker [* TunnelMessage ,* ManagerMessage ,ManagerMessage ]
27
38
ctx context.Context
39
+ netLoopDone chan struct {}
28
40
requestLoopDone chan struct {}
29
41
30
42
logger slog.Logger
@@ -35,6 +47,10 @@ type Tunnel struct {
35
47
client Client
36
48
conn Conn
37
49
50
+ mu sync.Mutex
51
+ // agents contains the agents that are currently connected to the tunnel.
52
+ agents map [uuid.UUID ]* tailnet.Agent
53
+
38
54
// clientLogger is deliberately separate, to avoid the tunnel using itself
39
55
// as a sink for it's own logs, which could lead to deadlocks
40
56
clientLogger slog.Logger
@@ -65,14 +81,17 @@ func NewTunnel(
65
81
logger :logger ,
66
82
clientLogger :slog .Make (),
67
83
requestLoopDone :make (chan struct {}),
84
+ netLoopDone :make (chan struct {}),
68
85
client :client ,
86
+ agents :make (map [uuid.UUID ]* tailnet.Agent ),
69
87
}
70
88
71
89
for _ ,opt := range opts {
72
90
opt (t )
73
91
}
74
92
t .speaker .start ()
75
93
go t .requestLoop ()
94
+ go t .netStatusLoop ()
76
95
return t ,nil
77
96
}
78
97
@@ -101,6 +120,20 @@ func (t *Tunnel) requestLoop() {
101
120
}
102
121
}
103
122
123
+ func (t * Tunnel )netStatusLoop () {
124
+ ticker := time .NewTicker (netStatusInterval )
125
+ defer ticker .Stop ()
126
+ defer close (t .netLoopDone )
127
+ for {
128
+ select {
129
+ case <- t .ctx .Done ():
130
+ return
131
+ case <- ticker .C :
132
+ t .sendAgentUpdate ()
133
+ }
134
+ }
135
+ }
136
+
104
137
// handleRPC handles unary RPCs from the manager.
105
138
func (t * Tunnel )handleRPC (req * ManagerMessage ,msgID uint64 )* TunnelMessage {
106
139
resp := & TunnelMessage {}
@@ -111,8 +144,12 @@ func (t *Tunnel) handleRPC(req *ManagerMessage, msgID uint64) *TunnelMessage {
111
144
if err != nil {
112
145
t .logger .Critical (t .ctx ,"failed to get current workspace state" ,slog .Error (err ))
113
146
}
147
+ update ,err := t .createPeerUpdate (state )
148
+ if err != nil {
149
+ t .logger .Error (t .ctx ,"failed to populate agent network info" ,slog .Error (err ))
150
+ }
114
151
resp .Msg = & TunnelMessage_PeerUpdate {
115
- PeerUpdate :convertWorkspaceUpdate ( state ) ,
152
+ PeerUpdate :update ,
116
153
}
117
154
return resp
118
155
case * ManagerMessage_Start :
@@ -193,9 +230,13 @@ func (t *Tunnel) ApplyNetworkSettings(ctx context.Context, ns *NetworkSettingsRe
193
230
}
194
231
195
232
func (t * Tunnel )Update (update tailnet.WorkspaceUpdate )error {
233
+ peerUpdate ,err := t .createPeerUpdate (update )
234
+ if err != nil {
235
+ t .logger .Error (t .ctx ,"failed to populate agent network info" ,slog .Error (err ))
236
+ }
196
237
msg := & TunnelMessage {
197
238
Msg :& TunnelMessage_PeerUpdate {
198
- PeerUpdate :convertWorkspaceUpdate ( update ) ,
239
+ PeerUpdate :peerUpdate ,
199
240
},
200
241
}
201
242
select {
@@ -292,35 +333,30 @@ func sinkEntryToPb(e slog.SinkEntry) *Log {
292
333
return l
293
334
}
294
335
295
- func convertWorkspaceUpdate (update tailnet.WorkspaceUpdate )* PeerUpdate {
336
+ // createPeerUpdate creates a PeerUpdate message from a workspace update, populating
337
+ // the network status of the agents.
338
+ func (t * Tunnel )createPeerUpdate (update tailnet.WorkspaceUpdate ) (* PeerUpdate ,error ) {
296
339
out := & PeerUpdate {
297
340
UpsertedWorkspaces :make ([]* Workspace ,len (update .UpsertedWorkspaces )),
298
341
UpsertedAgents :make ([]* Agent ,len (update .UpsertedAgents )),
299
342
DeletedWorkspaces :make ([]* Workspace ,len (update .DeletedWorkspaces )),
300
343
DeletedAgents :make ([]* Agent ,len (update .DeletedAgents )),
301
344
}
345
+
346
+ t .saveUpdate (update )
347
+
302
348
for i ,ws := range update .UpsertedWorkspaces {
303
349
out .UpsertedWorkspaces [i ]= & Workspace {
304
350
Id :tailnet .UUIDToByteSlice (ws .ID ),
305
351
Name :ws .Name ,
306
352
Status :Workspace_Status (ws .Status ),
307
353
}
308
354
}
309
- for i ,agent := range update .UpsertedAgents {
310
- fqdn := make ([]string ,0 ,len (agent .Hosts ))
311
- for name := range agent .Hosts {
312
- fqdn = append (fqdn ,name .WithTrailingDot ())
313
- }
314
- out .UpsertedAgents [i ]= & Agent {
315
- Id :tailnet .UUIDToByteSlice (agent .ID ),
316
- Name :agent .Name ,
317
- WorkspaceId :tailnet .UUIDToByteSlice (agent .WorkspaceID ),
318
- Fqdn :fqdn ,
319
- IpAddrs : []string {tailnet .CoderServicePrefix .AddrFromUUID (agent .ID ).String ()},
320
- // TODO: Populate
321
- LastHandshake :nil ,
322
- }
355
+ upsertedAgents ,err := t .populateAgents (update .UpsertedAgents )
356
+ if err != nil {
357
+ return nil ,xerrors .Errorf ("failed to populate agent network info: %w" ,err )
323
358
}
359
+ out .UpsertedAgents = upsertedAgents
324
360
for i ,ws := range update .DeletedWorkspaces {
325
361
out .DeletedWorkspaces [i ]= & Workspace {
326
362
Id :tailnet .UUIDToByteSlice (ws .ID ),
@@ -334,16 +370,106 @@ func convertWorkspaceUpdate(update tailnet.WorkspaceUpdate) *PeerUpdate {
334
370
fqdn = append (fqdn ,name .WithTrailingDot ())
335
371
}
336
372
out .DeletedAgents [i ]= & Agent {
373
+ Id :tailnet .UUIDToByteSlice (agent .ID ),
374
+ Name :agent .Name ,
375
+ WorkspaceId :tailnet .UUIDToByteSlice (agent .WorkspaceID ),
376
+ Fqdn :fqdn ,
377
+ IpAddrs :hostsToIPStrings (agent .Hosts ),
378
+ LastHandshake :nil ,
379
+ Latency :nil ,
380
+ }
381
+ }
382
+ return out ,nil
383
+ }
384
+
385
+ // Given a list of agents, populate their network info, and return them as proto agents.
386
+ func (t * Tunnel )populateAgents (agents []* tailnet.Agent ) ([]* Agent ,error ) {
387
+ if t .conn == nil {
388
+ return nil ,xerrors .New ("no active connection" )
389
+ }
390
+
391
+ out := make ([]* Agent ,0 ,len (agents ))
392
+ var wg sync.WaitGroup
393
+ pingCtx ,cancelFunc := context .WithTimeout (context .Background (),5 * time .Second )
394
+ defer cancelFunc ()
395
+
396
+ for _ ,agent := range agents {
397
+ fqdn := make ([]string ,0 ,len (agent .Hosts ))
398
+ for name := range agent .Hosts {
399
+ fqdn = append (fqdn ,name .WithTrailingDot ())
400
+ }
401
+ protoAgent := & Agent {
337
402
Id :tailnet .UUIDToByteSlice (agent .ID ),
338
403
Name :agent .Name ,
339
404
WorkspaceId :tailnet .UUIDToByteSlice (agent .WorkspaceID ),
340
405
Fqdn :fqdn ,
341
- IpAddrs : []string {tailnet .CoderServicePrefix .AddrFromUUID (agent .ID ).String ()},
342
- // TODO: Populate
343
- LastHandshake :nil ,
406
+ IpAddrs :hostsToIPStrings (agent .Hosts ),
344
407
}
408
+ agentIP := tailnet .CoderServicePrefix .AddrFromUUID (agent .ID )
409
+ wg .Add (1 )
410
+ go func () {
411
+ defer wg .Done ()
412
+ duration ,_ ,_ ,err := t .conn .Ping (pingCtx ,agentIP )
413
+ if err != nil {
414
+ return
415
+ }
416
+ protoAgent .Latency = durationpb .New (duration )
417
+ }()
418
+ diags := t .conn .GetPeerDiagnostics (agent .ID )
419
+ //nolint:revive // outdated rule
420
+ protoAgent .LastHandshake = timestamppb .New (diags .LastWireguardHandshake )
421
+ out = append (out ,protoAgent )
422
+ }
423
+ wg .Wait ()
424
+
425
+ return out ,nil
426
+ }
427
+
428
+ // saveUpdate saves the workspace update to the tunnel's state, such that it can
429
+ // be used to populate automated peer updates.
430
+ func (t * Tunnel )saveUpdate (update tailnet.WorkspaceUpdate ) {
431
+ t .mu .Lock ()
432
+ defer t .mu .Unlock ()
433
+
434
+ for _ ,agent := range update .UpsertedAgents {
435
+ t .agents [agent .ID ]= agent
436
+ }
437
+ for _ ,agent := range update .DeletedAgents {
438
+ delete (t .agents ,agent .ID )
439
+ }
440
+ }
441
+
442
+ // sendAgentUpdate sends a peer update message to the manager with the current
443
+ // state of the agents, including the latest network status.
444
+ func (t * Tunnel )sendAgentUpdate () {
445
+ // The lock must be held until we send the message,
446
+ // else we risk upserting a deleted agent.
447
+ t .mu .Lock ()
448
+ defer t .mu .Unlock ()
449
+
450
+ upsertedAgents ,err := t .populateAgents (maps .Values (t .agents ))
451
+ if err != nil {
452
+ t .logger .Error (t .ctx ,"failed to produce agent network status update" ,slog .Error (err ))
453
+ return
454
+ }
455
+
456
+ if len (upsertedAgents )== 0 {
457
+ return
458
+ }
459
+
460
+ msg := & TunnelMessage {
461
+ Msg :& TunnelMessage_PeerUpdate {
462
+ PeerUpdate :& PeerUpdate {
463
+ UpsertedAgents :upsertedAgents ,
464
+ },
465
+ },
466
+ }
467
+
468
+ select {
469
+ case <- t .ctx .Done ():
470
+ return
471
+ case t .sendCh <- msg :
345
472
}
346
- return out
347
473
}
348
474
349
475
// the following are taken from sloghuman:
@@ -403,3 +529,17 @@ func quote(key string) string {
403
529
}
404
530
return quoted
405
531
}
532
+
533
+ func hostsToIPStrings (hosts map [dnsname.FQDN ][]netip.Addr ) []string {
534
+ seen := make (map [netip.Addr ]struct {})
535
+ var result []string
536
+ for _ ,inner := range hosts {
537
+ for _ ,elem := range inner {
538
+ if _ ,exists := seen [elem ];! exists {
539
+ seen [elem ]= struct {}{}
540
+ result = append (result ,elem .String ())
541
+ }
542
+ }
543
+ }
544
+ return result
545
+ }