@@ -24,7 +24,9 @@ const (
2424// dropping updates
2525ResponseBufferSize = 512
2626// RequestBufferSize is the max number of requests to buffer per connection
27- RequestBufferSize = 32
27+ RequestBufferSize = 32
28+ CloseErrOverwritten = "peer ID overwritten by new connection"
29+ CloseErrCoordinatorClose = "coordinator closed"
2830)
2931
3032// Coordinator exchanges nodes with agents to establish connections.
9799ErrAlreadyRemoved = xerrors .New ("already removed" )
98100)
99101
102+ type AuthorizationError struct {
103+ Wrapped error
104+ }
105+
106+ func (e AuthorizationError )Error ()string {
107+ return fmt .Sprintf ("authorization: %s" ,e .Wrapped .Error ())
108+ }
109+
110+ func (e AuthorizationError )Unwrap ()error {
111+ return e .Wrapped
112+ }
113+
100114// NewCoordinator constructs a new in-memory connection coordinator. This
101115// coordinator is incompatible with multiple Coder replicas as all node data is
102116// in-memory.
@@ -161,8 +175,12 @@ func (c *coordinator) Coordinate(
161175c .wg .Add (1 )
162176go func () {
163177defer c .wg .Done ()
164- p .reqLoop (ctx ,logger ,c .core .handleRequest )
165- err := c .core .lostPeer (p )
178+ loopErr := p .reqLoop (ctx ,logger ,c .core .handleRequest )
179+ closeErrStr := ""
180+ if loopErr != nil {
181+ closeErrStr = loopErr .Error ()
182+ }
183+ err := c .core .lostPeer (p ,closeErrStr )
166184if xerrors .Is (err ,ErrClosed )|| xerrors .Is (err ,ErrAlreadyRemoved ) {
167185return
168186}
@@ -227,7 +245,7 @@ func (c *core) handleRequest(ctx context.Context, p *peer, req *proto.Coordinate
227245}
228246
229247if err := pr .auth .Authorize (ctx ,req );err != nil {
230- return xerrors . Errorf ( "authorize request: %w" , err )
248+ return AuthorizationError { Wrapped : err }
231249}
232250
233251if req .UpdateSelf != nil {
@@ -270,7 +288,7 @@ func (c *core) handleRequest(ctx context.Context, p *peer, req *proto.Coordinate
270288}
271289}
272290if req .Disconnect != nil {
273- c .removePeerLocked (p .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"graceful disconnect" )
291+ c .removePeerLocked (p .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"graceful disconnect" , "" )
274292}
275293if rfhs := req .ReadyForHandshake ;rfhs != nil {
276294err := c .handleReadyForHandshakeLocked (pr ,rfhs )
@@ -344,7 +362,7 @@ func (c *core) updateTunnelPeersLocked(id uuid.UUID, n *proto.Node, k proto.Coor
344362err := other .updateMappingLocked (id ,n ,k ,reason )
345363if err != nil {
346364other .logger .Error (context .Background (),"failed to update mapping" ,slog .Error (err ))
347- c .removePeerLocked (other .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" )
365+ c .removePeerLocked (other .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" , "failed to update tunnel peer mapping" )
348366}
349367}
350368}
@@ -360,7 +378,8 @@ func (c *core) addTunnelLocked(src *peer, dstID uuid.UUID) error {
360378err := src .updateMappingLocked (dstID ,dst .node ,proto .CoordinateResponse_PeerUpdate_NODE ,"add tunnel" )
361379if err != nil {
362380src .logger .Error (context .Background (),"failed update of tunnel src" ,slog .Error (err ))
363- c .removePeerLocked (src .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" )
381+ c .removePeerLocked (src .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" ,
382+ "failed to update tunnel dest mapping" )
364383// if the source fails, then the tunnel is also removed and there is no reason to continue
365384// processing.
366385return err
@@ -370,7 +389,8 @@ func (c *core) addTunnelLocked(src *peer, dstID uuid.UUID) error {
370389err := dst .updateMappingLocked (src .id ,src .node ,proto .CoordinateResponse_PeerUpdate_NODE ,"add tunnel" )
371390if err != nil {
372391dst .logger .Error (context .Background (),"failed update of tunnel dst" ,slog .Error (err ))
373- c .removePeerLocked (dst .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" )
392+ c .removePeerLocked (dst .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" ,
393+ "failed to update tunnel src mapping" )
374394}
375395}
376396}
@@ -381,7 +401,7 @@ func (c *core) removeTunnelLocked(src *peer, dstID uuid.UUID) error {
381401err := src .updateMappingLocked (dstID ,nil ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"remove tunnel" )
382402if err != nil {
383403src .logger .Error (context .Background (),"failed to update" ,slog .Error (err ))
384- c .removePeerLocked (src .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" )
404+ c .removePeerLocked (src .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" , "failed to remove tunnel dest mapping" )
385405// removing the peer also removes all other tunnels and notifies destinations, so it's safe to
386406// return here.
387407return err
@@ -391,7 +411,7 @@ func (c *core) removeTunnelLocked(src *peer, dstID uuid.UUID) error {
391411err = dst .updateMappingLocked (src .id ,nil ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"remove tunnel" )
392412if err != nil {
393413dst .logger .Error (context .Background (),"failed to update" ,slog .Error (err ))
394- c .removePeerLocked (dst .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" )
414+ c .removePeerLocked (dst .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" , "failed to remove tunnel src mapping" )
395415// don't return here because we still want to remove the tunnel, and an error at the
396416// destination doesn't count as an error removing the tunnel at the source.
397417}
@@ -413,6 +433,11 @@ func (c *core) initPeer(p *peer) error {
413433if old ,ok := c .peers [p .id ];ok {
414434// rare and interesting enough to log at Info, but it isn't an error per se
415435old .logger .Info (context .Background (),"overwritten by new connection" )
436+ select {
437+ case old .resps <- & proto.CoordinateResponse {Error :CloseErrOverwritten }:
438+ default :
439+ // pass
440+ }
416441close (old .resps )
417442p .overwrites = old .overwrites + 1
418443}
@@ -433,7 +458,7 @@ func (c *core) initPeer(p *peer) error {
433458
434459// removePeer removes and cleans up a lost peer. It updates all peers it shares a tunnel with, deletes
435460// all tunnels from which the removed peer is the source.
436- func (c * core )lostPeer (p * peer )error {
461+ func (c * core )lostPeer (p * peer , closeErr string )error {
437462c .mutex .Lock ()
438463defer c .mutex .Unlock ()
439464c .logger .Debug (context .Background (),"lostPeer" ,slog .F ("peer_id" ,p .id ))
@@ -443,18 +468,25 @@ func (c *core) lostPeer(p *peer) error {
443468if existing ,ok := c .peers [p .id ];! ok || existing != p {
444469return ErrAlreadyRemoved
445470}
446- c .removePeerLocked (p .id ,proto .CoordinateResponse_PeerUpdate_LOST ,"lost" )
471+ c .removePeerLocked (p .id ,proto .CoordinateResponse_PeerUpdate_LOST ,"lost" , closeErr )
447472return nil
448473}
449474
450- func (c * core )removePeerLocked (id uuid.UUID ,kind proto.CoordinateResponse_PeerUpdate_Kind ,reason string ) {
475+ func (c * core )removePeerLocked (id uuid.UUID ,kind proto.CoordinateResponse_PeerUpdate_Kind ,reason , closeErr string ) {
451476p ,ok := c .peers [id ]
452477if ! ok {
453478c .logger .Critical (context .Background (),"removed non-existent peer %s" ,id )
454479return
455480}
456481c .updateTunnelPeersLocked (id ,nil ,kind ,reason )
457482c .tunnels .removeAll (id )
483+ if closeErr != "" {
484+ select {
485+ case p .resps <- & proto.CoordinateResponse {Error :closeErr }:
486+ default :
487+ // blocked, pass.
488+ }
489+ }
458490close (p .resps )
459491delete (c .peers ,id )
460492}
@@ -487,7 +519,8 @@ func (c *core) close() error {
487519for id := range c .peers {
488520// when closing, mark them as LOST so that we don't disrupt in-progress
489521// connections.
490- c .removePeerLocked (id ,proto .CoordinateResponse_PeerUpdate_LOST ,"coordinator close" )
522+ c .removePeerLocked (id ,proto .CoordinateResponse_PeerUpdate_LOST ,"coordinator close" ,
523+ CloseErrCoordinatorClose )
491524}
492525return nil
493526}