@@ -24,7 +24,9 @@ const (
24
24
// dropping updates
25
25
ResponseBufferSize = 512
26
26
// RequestBufferSize is the max number of requests to buffer per connection
27
- RequestBufferSize = 32
27
+ RequestBufferSize = 32
28
+ CloseErrOverwritten = "peer ID overwritten by new connection"
29
+ CloseErrCoordinatorClose = "coordinator closed"
28
30
)
29
31
30
32
// Coordinator exchanges nodes with agents to establish connections.
97
99
ErrAlreadyRemoved = xerrors .New ("already removed" )
98
100
)
99
101
102
+ type AuthorizationError struct {
103
+ Wrapped error
104
+ }
105
+
106
+ func (e AuthorizationError )Error ()string {
107
+ return fmt .Sprintf ("authorization: %s" ,e .Wrapped .Error ())
108
+ }
109
+
110
+ func (e AuthorizationError )Unwrap ()error {
111
+ return e .Wrapped
112
+ }
113
+
100
114
// NewCoordinator constructs a new in-memory connection coordinator. This
101
115
// coordinator is incompatible with multiple Coder replicas as all node data is
102
116
// in-memory.
@@ -161,8 +175,12 @@ func (c *coordinator) Coordinate(
161
175
c .wg .Add (1 )
162
176
go func () {
163
177
defer c .wg .Done ()
164
- p .reqLoop (ctx ,logger ,c .core .handleRequest )
165
- err := c .core .lostPeer (p )
178
+ loopErr := p .reqLoop (ctx ,logger ,c .core .handleRequest )
179
+ closeErrStr := ""
180
+ if loopErr != nil {
181
+ closeErrStr = loopErr .Error ()
182
+ }
183
+ err := c .core .lostPeer (p ,closeErrStr )
166
184
if xerrors .Is (err ,ErrClosed )|| xerrors .Is (err ,ErrAlreadyRemoved ) {
167
185
return
168
186
}
@@ -227,7 +245,7 @@ func (c *core) handleRequest(ctx context.Context, p *peer, req *proto.Coordinate
227
245
}
228
246
229
247
if err := pr .auth .Authorize (ctx ,req );err != nil {
230
- return xerrors . Errorf ( "authorize request: %w" , err )
248
+ return AuthorizationError { Wrapped : err }
231
249
}
232
250
233
251
if req .UpdateSelf != nil {
@@ -270,7 +288,7 @@ func (c *core) handleRequest(ctx context.Context, p *peer, req *proto.Coordinate
270
288
}
271
289
}
272
290
if req .Disconnect != nil {
273
- c .removePeerLocked (p .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"graceful disconnect" )
291
+ c .removePeerLocked (p .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"graceful disconnect" , "" )
274
292
}
275
293
if rfhs := req .ReadyForHandshake ;rfhs != nil {
276
294
err := c .handleReadyForHandshakeLocked (pr ,rfhs )
@@ -344,7 +362,7 @@ func (c *core) updateTunnelPeersLocked(id uuid.UUID, n *proto.Node, k proto.Coor
344
362
err := other .updateMappingLocked (id ,n ,k ,reason )
345
363
if err != nil {
346
364
other .logger .Error (context .Background (),"failed to update mapping" ,slog .Error (err ))
347
- c .removePeerLocked (other .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" )
365
+ c .removePeerLocked (other .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" , "failed to update tunnel peer mapping" )
348
366
}
349
367
}
350
368
}
@@ -360,7 +378,8 @@ func (c *core) addTunnelLocked(src *peer, dstID uuid.UUID) error {
360
378
err := src .updateMappingLocked (dstID ,dst .node ,proto .CoordinateResponse_PeerUpdate_NODE ,"add tunnel" )
361
379
if err != nil {
362
380
src .logger .Error (context .Background (),"failed update of tunnel src" ,slog .Error (err ))
363
- c .removePeerLocked (src .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" )
381
+ c .removePeerLocked (src .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" ,
382
+ "failed to update tunnel dest mapping" )
364
383
// if the source fails, then the tunnel is also removed and there is no reason to continue
365
384
// processing.
366
385
return err
@@ -370,7 +389,8 @@ func (c *core) addTunnelLocked(src *peer, dstID uuid.UUID) error {
370
389
err := dst .updateMappingLocked (src .id ,src .node ,proto .CoordinateResponse_PeerUpdate_NODE ,"add tunnel" )
371
390
if err != nil {
372
391
dst .logger .Error (context .Background (),"failed update of tunnel dst" ,slog .Error (err ))
373
- c .removePeerLocked (dst .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" )
392
+ c .removePeerLocked (dst .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" ,
393
+ "failed to update tunnel src mapping" )
374
394
}
375
395
}
376
396
}
@@ -381,7 +401,7 @@ func (c *core) removeTunnelLocked(src *peer, dstID uuid.UUID) error {
381
401
err := src .updateMappingLocked (dstID ,nil ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"remove tunnel" )
382
402
if err != nil {
383
403
src .logger .Error (context .Background (),"failed to update" ,slog .Error (err ))
384
- c .removePeerLocked (src .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" )
404
+ c .removePeerLocked (src .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" , "failed to remove tunnel dest mapping" )
385
405
// removing the peer also removes all other tunnels and notifies destinations, so it's safe to
386
406
// return here.
387
407
return err
@@ -391,7 +411,7 @@ func (c *core) removeTunnelLocked(src *peer, dstID uuid.UUID) error {
391
411
err = dst .updateMappingLocked (src .id ,nil ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"remove tunnel" )
392
412
if err != nil {
393
413
dst .logger .Error (context .Background (),"failed to update" ,slog .Error (err ))
394
- c .removePeerLocked (dst .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" )
414
+ c .removePeerLocked (dst .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" , "failed to remove tunnel src mapping" )
395
415
// don't return here because we still want to remove the tunnel, and an error at the
396
416
// destination doesn't count as an error removing the tunnel at the source.
397
417
}
@@ -413,6 +433,11 @@ func (c *core) initPeer(p *peer) error {
413
433
if old ,ok := c .peers [p .id ];ok {
414
434
// rare and interesting enough to log at Info, but it isn't an error per se
415
435
old .logger .Info (context .Background (),"overwritten by new connection" )
436
+ select {
437
+ case old .resps <- & proto.CoordinateResponse {Error :CloseErrOverwritten }:
438
+ default :
439
+ // pass
440
+ }
416
441
close (old .resps )
417
442
p .overwrites = old .overwrites + 1
418
443
}
@@ -433,7 +458,7 @@ func (c *core) initPeer(p *peer) error {
433
458
434
459
// removePeer removes and cleans up a lost peer. It updates all peers it shares a tunnel with, deletes
435
460
// all tunnels from which the removed peer is the source.
436
- func (c * core )lostPeer (p * peer )error {
461
+ func (c * core )lostPeer (p * peer , closeErr string )error {
437
462
c .mutex .Lock ()
438
463
defer c .mutex .Unlock ()
439
464
c .logger .Debug (context .Background (),"lostPeer" ,slog .F ("peer_id" ,p .id ))
@@ -443,18 +468,25 @@ func (c *core) lostPeer(p *peer) error {
443
468
if existing ,ok := c .peers [p .id ];! ok || existing != p {
444
469
return ErrAlreadyRemoved
445
470
}
446
- c .removePeerLocked (p .id ,proto .CoordinateResponse_PeerUpdate_LOST ,"lost" )
471
+ c .removePeerLocked (p .id ,proto .CoordinateResponse_PeerUpdate_LOST ,"lost" , closeErr )
447
472
return nil
448
473
}
449
474
450
- func (c * core )removePeerLocked (id uuid.UUID ,kind proto.CoordinateResponse_PeerUpdate_Kind ,reason string ) {
475
+ func (c * core )removePeerLocked (id uuid.UUID ,kind proto.CoordinateResponse_PeerUpdate_Kind ,reason , closeErr string ) {
451
476
p ,ok := c .peers [id ]
452
477
if ! ok {
453
478
c .logger .Critical (context .Background (),"removed non-existent peer %s" ,id )
454
479
return
455
480
}
456
481
c .updateTunnelPeersLocked (id ,nil ,kind ,reason )
457
482
c .tunnels .removeAll (id )
483
+ if closeErr != "" {
484
+ select {
485
+ case p .resps <- & proto.CoordinateResponse {Error :closeErr }:
486
+ default :
487
+ // blocked, pass.
488
+ }
489
+ }
458
490
close (p .resps )
459
491
delete (c .peers ,id )
460
492
}
@@ -487,7 +519,8 @@ func (c *core) close() error {
487
519
for id := range c .peers {
488
520
// when closing, mark them as LOST so that we don't disrupt in-progress
489
521
// connections.
490
- c .removePeerLocked (id ,proto .CoordinateResponse_PeerUpdate_LOST ,"coordinator close" )
522
+ c .removePeerLocked (id ,proto .CoordinateResponse_PeerUpdate_LOST ,"coordinator close" ,
523
+ CloseErrCoordinatorClose )
491
524
}
492
525
return nil
493
526
}