@@ -7,24 +7,36 @@ import (
7
7
"fmt"
8
8
"io"
9
9
"net/http"
10
+ "net/netip"
10
11
"net/url"
11
12
"reflect"
12
13
"strconv"
13
14
"sync"
15
+ "time"
14
16
"unicode"
15
17
18
+ "golang.org/x/exp/maps"
16
19
"golang.org/x/xerrors"
20
+ "google.golang.org/protobuf/types/known/durationpb"
21
+ "google.golang.org/protobuf/types/known/timestamppb"
17
22
"tailscale.com/net/dns"
23
+ "tailscale.com/util/dnsname"
18
24
"tailscale.com/wgengine/router"
19
25
26
+ "github.com/google/uuid"
27
+
20
28
"cdr.dev/slog"
21
29
"github.com/coder/coder/v2/coderd/util/ptr"
22
30
"github.com/coder/coder/v2/tailnet"
23
31
)
24
32
33
+ // The interval at which the tunnel sends network status updates to the manager.
34
+ const netStatusInterval = 30 * time .Second
35
+
25
36
type Tunnel struct {
26
37
speaker [* TunnelMessage ,* ManagerMessage ,ManagerMessage ]
27
38
ctx context.Context
39
+ netLoopDone chan struct {}
28
40
requestLoopDone chan struct {}
29
41
30
42
logger slog.Logger
@@ -35,6 +47,10 @@ type Tunnel struct {
35
47
client Client
36
48
conn Conn
37
49
50
+ mu sync.Mutex
51
+ // agents contains the agents that are currently connected to the tunnel.
52
+ agents map [uuid.UUID ]* tailnet.Agent
53
+
38
54
// router and dnsConfigurator may be nil
39
55
router router.Router
40
56
dnsConfigurator dns.OSConfigurator
@@ -61,14 +77,17 @@ func NewTunnel(
61
77
ctx :ctx ,
62
78
logger :logger ,
63
79
requestLoopDone :make (chan struct {}),
80
+ netLoopDone :make (chan struct {}),
64
81
client :client ,
82
+ agents :make (map [uuid.UUID ]* tailnet.Agent ),
65
83
}
66
84
67
85
for _ ,opt := range opts {
68
86
opt (t )
69
87
}
70
88
t .speaker .start ()
71
89
go t .requestLoop ()
90
+ go t .netStatusLoop ()
72
91
return t ,nil
73
92
}
74
93
@@ -97,6 +116,20 @@ func (t *Tunnel) requestLoop() {
97
116
}
98
117
}
99
118
119
+ func (t * Tunnel )netStatusLoop () {
120
+ ticker := time .NewTicker (netStatusInterval )
121
+ defer ticker .Stop ()
122
+ defer close (t .netLoopDone )
123
+ for {
124
+ select {
125
+ case <- t .ctx .Done ():
126
+ return
127
+ case <- ticker .C :
128
+ t .sendAgentUpdate ()
129
+ }
130
+ }
131
+ }
132
+
100
133
// handleRPC handles unary RPCs from the manager.
101
134
func (t * Tunnel )handleRPC (req * ManagerMessage ,msgID uint64 )* TunnelMessage {
102
135
resp := & TunnelMessage {}
@@ -107,8 +140,12 @@ func (t *Tunnel) handleRPC(req *ManagerMessage, msgID uint64) *TunnelMessage {
107
140
if err != nil {
108
141
t .logger .Critical (t .ctx ,"failed to get current workspace state" ,slog .Error (err ))
109
142
}
143
+ update ,err := t .createPeerUpdate (state )
144
+ if err != nil {
145
+ t .logger .Error (t .ctx ,"failed to populate agent network info" ,slog .Error (err ))
146
+ }
110
147
resp .Msg = & TunnelMessage_PeerUpdate {
111
- PeerUpdate :convertWorkspaceUpdate ( state ) ,
148
+ PeerUpdate :update ,
112
149
}
113
150
return resp
114
151
case * ManagerMessage_Start :
@@ -189,9 +226,13 @@ func (t *Tunnel) ApplyNetworkSettings(ctx context.Context, ns *NetworkSettingsRe
189
226
}
190
227
191
228
func (t * Tunnel )Update (update tailnet.WorkspaceUpdate )error {
229
+ peerUpdate ,err := t .createPeerUpdate (update )
230
+ if err != nil {
231
+ t .logger .Error (t .ctx ,"failed to populate agent network info" ,slog .Error (err ))
232
+ }
192
233
msg := & TunnelMessage {
193
234
Msg :& TunnelMessage_PeerUpdate {
194
- PeerUpdate :convertWorkspaceUpdate ( update ) ,
235
+ PeerUpdate :peerUpdate ,
195
236
},
196
237
}
197
238
select {
@@ -288,35 +329,30 @@ func sinkEntryToPb(e slog.SinkEntry) *Log {
288
329
return l
289
330
}
290
331
291
- func convertWorkspaceUpdate (update tailnet.WorkspaceUpdate )* PeerUpdate {
332
+ // createPeerUpdate creates a PeerUpdate message from a workspace update, populating
333
+ // the network status of the agents.
334
+ func (t * Tunnel )createPeerUpdate (update tailnet.WorkspaceUpdate ) (* PeerUpdate ,error ) {
292
335
out := & PeerUpdate {
293
336
UpsertedWorkspaces :make ([]* Workspace ,len (update .UpsertedWorkspaces )),
294
337
UpsertedAgents :make ([]* Agent ,len (update .UpsertedAgents )),
295
338
DeletedWorkspaces :make ([]* Workspace ,len (update .DeletedWorkspaces )),
296
339
DeletedAgents :make ([]* Agent ,len (update .DeletedAgents )),
297
340
}
341
+
342
+ t .saveUpdate (update )
343
+
298
344
for i ,ws := range update .UpsertedWorkspaces {
299
345
out .UpsertedWorkspaces [i ]= & Workspace {
300
346
Id :tailnet .UUIDToByteSlice (ws .ID ),
301
347
Name :ws .Name ,
302
348
Status :Workspace_Status (ws .Status ),
303
349
}
304
350
}
305
- for i ,agent := range update .UpsertedAgents {
306
- fqdn := make ([]string ,0 ,len (agent .Hosts ))
307
- for name := range agent .Hosts {
308
- fqdn = append (fqdn ,name .WithoutTrailingDot ())
309
- }
310
- out .UpsertedAgents [i ]= & Agent {
311
- Id :tailnet .UUIDToByteSlice (agent .ID ),
312
- Name :agent .Name ,
313
- WorkspaceId :tailnet .UUIDToByteSlice (agent .WorkspaceID ),
314
- Fqdn :fqdn ,
315
- IpAddrs : []string {tailnet .CoderServicePrefix .AddrFromUUID (agent .ID ).String ()},
316
- // TODO: Populate
317
- LastHandshake :nil ,
318
- }
351
+ upsertedAgents ,err := t .populateAgents (update .UpsertedAgents )
352
+ if err != nil {
353
+ return nil ,xerrors .Errorf ("failed to populate agent network info: %w" ,err )
319
354
}
355
+ out .UpsertedAgents = upsertedAgents
320
356
for i ,ws := range update .DeletedWorkspaces {
321
357
out .DeletedWorkspaces [i ]= & Workspace {
322
358
Id :tailnet .UUIDToByteSlice (ws .ID ),
@@ -327,19 +363,109 @@ func convertWorkspaceUpdate(update tailnet.WorkspaceUpdate) *PeerUpdate {
327
363
for i ,agent := range update .DeletedAgents {
328
364
fqdn := make ([]string ,0 ,len (agent .Hosts ))
329
365
for name := range agent .Hosts {
330
- fqdn = append (fqdn ,name .WithoutTrailingDot ())
366
+ fqdn = append (fqdn ,name .WithTrailingDot ())
331
367
}
332
368
out .DeletedAgents [i ]= & Agent {
369
+ Id :tailnet .UUIDToByteSlice (agent .ID ),
370
+ Name :agent .Name ,
371
+ WorkspaceId :tailnet .UUIDToByteSlice (agent .WorkspaceID ),
372
+ Fqdn :fqdn ,
373
+ IpAddrs :hostsToIPStrings (agent .Hosts ),
374
+ LastHandshake :nil ,
375
+ Latency :nil ,
376
+ }
377
+ }
378
+ return out ,nil
379
+ }
380
+
381
+ // Given a list of agents, populate their network info, and return them as proto agents.
382
+ func (t * Tunnel )populateAgents (agents []* tailnet.Agent ) ([]* Agent ,error ) {
383
+ if t .conn == nil {
384
+ return nil ,xerrors .New ("no active connection" )
385
+ }
386
+
387
+ out := make ([]* Agent ,0 ,len (agents ))
388
+ var wg sync.WaitGroup
389
+ pingCtx ,cancelFunc := context .WithTimeout (context .Background (),5 * time .Second )
390
+ defer cancelFunc ()
391
+
392
+ for _ ,agent := range agents {
393
+ fqdn := make ([]string ,0 ,len (agent .Hosts ))
394
+ for name := range agent .Hosts {
395
+ fqdn = append (fqdn ,name .WithTrailingDot ())
396
+ }
397
+ protoAgent := & Agent {
333
398
Id :tailnet .UUIDToByteSlice (agent .ID ),
334
399
Name :agent .Name ,
335
400
WorkspaceId :tailnet .UUIDToByteSlice (agent .WorkspaceID ),
336
401
Fqdn :fqdn ,
337
- IpAddrs : []string {tailnet .CoderServicePrefix .AddrFromUUID (agent .ID ).String ()},
338
- // TODO: Populate
339
- LastHandshake :nil ,
402
+ IpAddrs :hostsToIPStrings (agent .Hosts ),
340
403
}
404
+ agentIP := tailnet .CoderServicePrefix .AddrFromUUID (agent .ID )
405
+ wg .Add (1 )
406
+ go func () {
407
+ defer wg .Done ()
408
+ duration ,_ ,_ ,err := t .conn .Ping (pingCtx ,agentIP )
409
+ if err != nil {
410
+ return
411
+ }
412
+ protoAgent .Latency = durationpb .New (duration )
413
+ }()
414
+ diags := t .conn .GetPeerDiagnostics (agent .ID )
415
+ //nolint:revive // outdated rule
416
+ protoAgent .LastHandshake = timestamppb .New (diags .LastWireguardHandshake )
417
+ out = append (out ,protoAgent )
418
+ }
419
+ wg .Wait ()
420
+
421
+ return out ,nil
422
+ }
423
+
424
+ // saveUpdate saves the workspace update to the tunnel's state, such that it can
425
+ // be used to populate automated peer updates.
426
+ func (t * Tunnel )saveUpdate (update tailnet.WorkspaceUpdate ) {
427
+ t .mu .Lock ()
428
+ defer t .mu .Unlock ()
429
+
430
+ for _ ,agent := range update .UpsertedAgents {
431
+ t .agents [agent .ID ]= agent
432
+ }
433
+ for _ ,agent := range update .DeletedAgents {
434
+ delete (t .agents ,agent .ID )
435
+ }
436
+ }
437
+
438
+ // sendAgentUpdate sends a peer update message to the manager with the current
439
+ // state of the agents, including the latest network status.
440
+ func (t * Tunnel )sendAgentUpdate () {
441
+ // The lock must be held until we send the message,
442
+ // else we risk upserting a deleted agent.
443
+ t .mu .Lock ()
444
+ defer t .mu .Unlock ()
445
+
446
+ upsertedAgents ,err := t .populateAgents (maps .Values (t .agents ))
447
+ if err != nil {
448
+ t .logger .Error (t .ctx ,"failed to produce agent network status update" ,slog .Error (err ))
449
+ return
450
+ }
451
+
452
+ if len (upsertedAgents )== 0 {
453
+ return
454
+ }
455
+
456
+ msg := & TunnelMessage {
457
+ Msg :& TunnelMessage_PeerUpdate {
458
+ PeerUpdate :& PeerUpdate {
459
+ UpsertedAgents :upsertedAgents ,
460
+ },
461
+ },
462
+ }
463
+
464
+ select {
465
+ case <- t .ctx .Done ():
466
+ return
467
+ case t .sendCh <- msg :
341
468
}
342
- return out
343
469
}
344
470
345
471
// the following are taken from sloghuman:
@@ -399,3 +525,17 @@ func quote(key string) string {
399
525
}
400
526
return quoted
401
527
}
528
+
529
+ func hostsToIPStrings (hosts map [dnsname.FQDN ][]netip.Addr ) []string {
530
+ seen := make (map [netip.Addr ]struct {})
531
+ var result []string
532
+ for _ ,inner := range hosts {
533
+ for _ ,elem := range inner {
534
+ if _ ,exists := seen [elem ];! exists {
535
+ seen [elem ]= struct {}{}
536
+ result = append (result ,elem .String ())
537
+ }
538
+ }
539
+ }
540
+ return result
541
+ }