@@ -3,11 +3,15 @@ package tailnet
3
3
import (
4
4
"context"
5
5
"errors"
6
+ "fmt"
6
7
"net/netip"
7
8
"sync"
9
+ "time"
8
10
11
+ "github.com/benbjohnson/clock"
9
12
"github.com/google/uuid"
10
13
"go4.org/netipx"
14
+ "tailscale.com/ipn/ipnstate"
11
15
"tailscale.com/net/dns"
12
16
"tailscale.com/tailcfg"
13
17
"tailscale.com/types/ipproto"
@@ -23,10 +27,13 @@ import (
23
27
"github.com/coder/coder/v2/tailnet/proto"
24
28
)
25
29
30
+ const lostTimeout = 15 * time .Minute
31
+
26
32
// engineConfigurable is the subset of wgengine.Engine that we use for configuration.
27
33
//
28
34
// This allows us to test configuration code without faking the whole interface.
29
35
type engineConfigurable interface {
36
+ UpdateStatus (* ipnstate.StatusBuilder )
30
37
SetNetworkMap (* netmap.NetworkMap )
31
38
Reconfig (* wgcfg.Config ,* router.Config ,* dns.Config ,* tailcfg.Debug )error
32
39
SetDERPMap (* tailcfg.DERPMap )
@@ -49,12 +56,16 @@ type configMaps struct {
49
56
closing bool
50
57
phase phase
51
58
52
- engine engineConfigurable
53
- static netmap.NetworkMap
54
- peers map [uuid.UUID ]* peerLifecycle
55
- addresses []netip.Prefix
56
- derpMap * proto.DERPMap
57
- logger slog.Logger
59
+ engine engineConfigurable
60
+ static netmap.NetworkMap
61
+ peers map [uuid.UUID ]* peerLifecycle
62
+ addresses []netip.Prefix
63
+ derpMap * proto.DERPMap
64
+ logger slog.Logger
65
+ blockEndpoints bool
66
+
67
+ // for testing
68
+ clock clock.Clock
58
69
}
59
70
60
71
func newConfigMaps (logger slog.Logger ,engine engineConfigurable ,nodeID tailcfg.NodeID ,nodeKey key.NodePrivate ,discoKey key.DiscoPublic ,addresses []netip.Prefix )* configMaps {
@@ -101,6 +112,7 @@ func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg
101
112
},
102
113
peers :make (map [uuid.UUID ]* peerLifecycle ),
103
114
addresses :addresses ,
115
+ clock :clock .New (),
104
116
}
105
117
go c .configLoop ()
106
118
return c
@@ -164,6 +176,9 @@ func (c *configMaps) configLoop() {
164
176
func (c * configMaps )close () {
165
177
c .L .Lock ()
166
178
defer c .L .Unlock ()
179
+ for _ ,lc := range c .peers {
180
+ lc .resetTimer ()
181
+ }
167
182
c .closing = true
168
183
c .Broadcast ()
169
184
for c .phase != closed {
@@ -247,11 +262,201 @@ func (c *configMaps) filterLocked() *filter.Filter {
247
262
)
248
263
}
249
264
265
+ func (c * configMaps )updatePeers (updates []* proto.CoordinateResponse_PeerUpdate ) {
266
+ status := c .status ()
267
+ c .L .Lock ()
268
+ defer c .L .Unlock ()
269
+
270
+ // Update all the lastHandshake values here. That way we don't have to
271
+ // worry about them being up-to-date when handling updates below, and it covers
272
+ // all peers, not just the ones we got updates about.
273
+ for _ ,lc := range c .peers {
274
+ if peerStatus ,ok := status .Peer [lc .node .Key ];ok {
275
+ lc .lastHandshake = peerStatus .LastHandshake
276
+ }
277
+ }
278
+
279
+ for _ ,update := range updates {
280
+ if dirty := c .updatePeerLocked (update ,status );dirty {
281
+ c .netmapDirty = true
282
+ }
283
+ }
284
+ if c .netmapDirty {
285
+ c .Broadcast ()
286
+ }
287
+ }
288
+
289
+ func (c * configMaps )status ()* ipnstate.Status {
290
+ sb := & ipnstate.StatusBuilder {WantPeers :true }
291
+ c .engine .UpdateStatus (sb )
292
+ return sb .Status ()
293
+ }
294
+
295
+ func (c * configMaps )updatePeerLocked (update * proto.CoordinateResponse_PeerUpdate ,status * ipnstate.Status ) (dirty bool ) {
296
+ id ,err := uuid .FromBytes (update .Id )
297
+ if err != nil {
298
+ c .logger .Critical (context .Background (),"received update with bad id" ,slog .F ("id" ,update .Id ))
299
+ return false
300
+ }
301
+ logger := c .logger .With (slog .F ("peer_id" ,id ))
302
+ lc ,ok := c .peers [id ]
303
+ var node * tailcfg.Node
304
+ if update .Kind == proto .CoordinateResponse_PeerUpdate_NODE {
305
+ // If no preferred DERP is provided, we can't reach the node.
306
+ if update .Node .PreferredDerp == 0 {
307
+ logger .Warn (context .Background (),"no preferred DERP, peer update" ,slog .F ("node_proto" ,update .Node ))
308
+ return false
309
+ }
310
+ node ,err = c .protoNodeToTailcfg (update .Node )
311
+ if err != nil {
312
+ logger .Critical (context .Background (),"failed to convert proto node to tailcfg" ,slog .F ("node_proto" ,update .Node ))
313
+ return false
314
+ }
315
+ logger = logger .With (slog .F ("key_id" ,node .Key .ShortString ()),slog .F ("node" ,node ))
316
+ peerStatus ,ok := status .Peer [node .Key ]
317
+ // Starting KeepAlive messages at the initialization of a connection
318
+ // causes a race condition. If we send the handshake before the peer has
319
+ // our node, we'll have to wait for 5 seconds before trying again.
320
+ // Ideally, the first handshake starts when the user first initiates a
321
+ // connection to the peer. After a successful connection we enable
322
+ // keep alives to persist the connection and keep it from becoming idle.
323
+ // SSH connections don't send packets while idle, so we use keep alives
324
+ // to avoid random hangs while we set up the connection again after
325
+ // inactivity.
326
+ node .KeepAlive = ok && peerStatus .Active
327
+ if c .blockEndpoints {
328
+ node .Endpoints = nil
329
+ }
330
+ }
331
+ switch {
332
+ case ! ok && update .Kind == proto .CoordinateResponse_PeerUpdate_NODE :
333
+ // new!
334
+ var lastHandshake time.Time
335
+ if ps ,ok := status .Peer [node .Key ];ok {
336
+ lastHandshake = ps .LastHandshake
337
+ }
338
+ c .peers [id ]= & peerLifecycle {
339
+ peerID :id ,
340
+ node :node ,
341
+ lastHandshake :lastHandshake ,
342
+ lost :false ,
343
+ }
344
+ logger .Debug (context .Background (),"adding new peer" )
345
+ return true
346
+ case ok && update .Kind == proto .CoordinateResponse_PeerUpdate_NODE :
347
+ // update
348
+ node .Created = lc .node .Created
349
+ dirty = ! lc .node .Equal (node )
350
+ lc .node = node
351
+ lc .lost = false
352
+ lc .resetTimer ()
353
+ logger .Debug (context .Background (),"node update to existing peer" ,slog .F ("dirty" ,dirty ))
354
+ return dirty
355
+ case ! ok :
356
+ // disconnected or lost, but we don't have the node. No op
357
+ logger .Debug (context .Background (),"skipping update for peer we don't recognize" )
358
+ return false
359
+ case update .Kind == proto .CoordinateResponse_PeerUpdate_DISCONNECTED :
360
+ lc .resetTimer ()
361
+ delete (c .peers ,id )
362
+ logger .Debug (context .Background (),"disconnected peer" )
363
+ return true
364
+ case update .Kind == proto .CoordinateResponse_PeerUpdate_LOST :
365
+ lc .lost = true
366
+ lc .setLostTimer (c )
367
+ logger .Debug (context .Background (),"marked peer lost" )
368
+ // marking a node lost doesn't change anything right now, so dirty=false
369
+ return false
370
+ default :
371
+ logger .Warn (context .Background (),"unknown peer update" ,slog .F ("kind" ,update .Kind ))
372
+ return false
373
+ }
374
+ }
375
+
376
+ func (c * configMaps )peerLostTimeout (id uuid.UUID ) {
377
+ logger := c .logger .With (slog .F ("peer_id" ,id ))
378
+ logger .Debug (context .Background (),
379
+ "peer lost timeout" )
380
+
381
+ // First do a status update to see if the peer did a handshake while we were
382
+ // waiting
383
+ status := c .status ()
384
+ c .L .Lock ()
385
+ defer c .L .Unlock ()
386
+
387
+ lc ,ok := c .peers [id ]
388
+ if ! ok {
389
+ logger .Debug (context .Background (),
390
+ "timeout triggered for peer that is removed from the map" )
391
+ return
392
+ }
393
+ if peerStatus ,ok := status .Peer [lc .node .Key ];ok {
394
+ lc .lastHandshake = peerStatus .LastHandshake
395
+ }
396
+ logger = logger .With (slog .F ("key_id" ,lc .node .Key .ShortString ()))
397
+ if ! lc .lost {
398
+ logger .Debug (context .Background (),
399
+ "timeout triggered for peer that is no longer lost" )
400
+ return
401
+ }
402
+ since := c .clock .Since (lc .lastHandshake )
403
+ if since >= lostTimeout {
404
+ logger .Info (
405
+ context .Background (),"removing lost peer" )
406
+ delete (c .peers ,id )
407
+ c .netmapDirty = true
408
+ c .Broadcast ()
409
+ return
410
+ }
411
+ logger .Debug (context .Background (),
412
+ "timeout triggered for peer but it had handshake in meantime" )
413
+ lc .setLostTimer (c )
414
+ }
415
+
416
+ func (c * configMaps )protoNodeToTailcfg (p * proto.Node ) (* tailcfg.Node ,error ) {
417
+ node ,err := ProtoToNode (p )
418
+ if err != nil {
419
+ return nil ,err
420
+ }
421
+ return & tailcfg.Node {
422
+ ID :tailcfg .NodeID (p .GetId ()),
423
+ Created :c .clock .Now (),
424
+ Key :node .Key ,
425
+ DiscoKey :node .DiscoKey ,
426
+ Addresses :node .Addresses ,
427
+ AllowedIPs :node .AllowedIPs ,
428
+ Endpoints :node .Endpoints ,
429
+ DERP :fmt .Sprintf ("%s:%d" ,tailcfg .DerpMagicIP ,node .PreferredDERP ),
430
+ Hostinfo : (& tailcfg.Hostinfo {}).View (),
431
+ },nil
432
+ }
433
+
250
434
type peerLifecycle struct {
251
- node * tailcfg.Node
252
- // TODO: implement timers to track lost peers
253
- // lastHandshake time.Time
254
- // timer time.Timer
435
+ peerID uuid.UUID
436
+ node * tailcfg.Node
437
+ lost bool
438
+ lastHandshake time.Time
439
+ timer * clock.Timer
440
+ }
441
+
442
+ func (l * peerLifecycle )resetTimer () {
443
+ if l .timer != nil {
444
+ l .timer .Stop ()
445
+ l .timer = nil
446
+ }
447
+ }
448
+
449
+ func (l * peerLifecycle )setLostTimer (c * configMaps ) {
450
+ if l .timer != nil {
451
+ l .timer .Stop ()
452
+ }
453
+ ttl := lostTimeout - c .clock .Since (l .lastHandshake )
454
+ if ttl <= 0 {
455
+ ttl = time .Nanosecond
456
+ }
457
+ l .timer = c .clock .AfterFunc (ttl ,func () {
458
+ c .peerLostTimeout (l .peerID )
459
+ })
255
460
}
256
461
257
462
// prefixesDifferent returns true if the two slices contain different prefixes