@@ -24,7 +24,10 @@ const (
2424// dropping updates
2525ResponseBufferSize = 512
2626// RequestBufferSize is the max number of requests to buffer per connection
27- RequestBufferSize = 32
27+ RequestBufferSize = 32
28+ CloseErrOverwritten = "peer ID overwritten by new connection"
29+ CloseErrCoordinatorClose = "coordinator closed"
30+ ReadyForHandshakeError = "ready for handshake error"
2831)
2932
3033// Coordinator exchanges nodes with agents to establish connections.
@@ -97,6 +100,18 @@ var (
97100ErrAlreadyRemoved = xerrors .New ("already removed" )
98101)
99102
103+ type AuthorizationError struct {
104+ Wrapped error
105+ }
106+
107+ func (e AuthorizationError )Error ()string {
108+ return fmt .Sprintf ("authorization: %s" ,e .Wrapped .Error ())
109+ }
110+
111+ func (e AuthorizationError )Unwrap ()error {
112+ return e .Wrapped
113+ }
114+
100115// NewCoordinator constructs a new in-memory connection coordinator. This
101116// coordinator is incompatible with multiple Coder replicas as all node data is
102117// in-memory.
@@ -161,8 +176,12 @@ func (c *coordinator) Coordinate(
161176c .wg .Add (1 )
162177go func () {
163178defer c .wg .Done ()
164- p .reqLoop (ctx ,logger ,c .core .handleRequest )
165- err := c .core .lostPeer (p )
179+ loopErr := p .reqLoop (ctx ,logger ,c .core .handleRequest )
180+ closeErrStr := ""
181+ if loopErr != nil {
182+ closeErrStr = loopErr .Error ()
183+ }
184+ err := c .core .lostPeer (p ,closeErrStr )
166185if xerrors .Is (err ,ErrClosed )|| xerrors .Is (err ,ErrAlreadyRemoved ) {
167186return
168187}
@@ -227,7 +246,7 @@ func (c *core) handleRequest(ctx context.Context, p *peer, req *proto.Coordinate
227246}
228247
229248if err := pr .auth .Authorize (ctx ,req );err != nil {
230- return xerrors . Errorf ( "authorize request: %w" , err )
249+ return AuthorizationError { Wrapped : err }
231250}
232251
233252if req .UpdateSelf != nil {
@@ -270,7 +289,7 @@ func (c *core) handleRequest(ctx context.Context, p *peer, req *proto.Coordinate
270289}
271290}
272291if req .Disconnect != nil {
273- c .removePeerLocked (p .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"graceful disconnect" )
292+ c .removePeerLocked (p .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"graceful disconnect" , "" )
274293}
275294if rfhs := req .ReadyForHandshake ;rfhs != nil {
276295err := c .handleReadyForHandshakeLocked (pr ,rfhs )
@@ -298,7 +317,7 @@ func (c *core) handleReadyForHandshakeLocked(src *peer, rfhs []*proto.Coordinate
298317// don't want to kill its connection.
299318select {
300319case src .resps <- & proto.CoordinateResponse {
301- Error :fmt .Sprintf ("you do not share a tunnel with %q" ,dstID .String ()),
320+ Error :fmt .Sprintf ("%s: you do not share a tunnel with %q" , ReadyForHandshakeError ,dstID .String ()),
302321}:
303322default :
304323return ErrWouldBlock
@@ -344,7 +363,7 @@ func (c *core) updateTunnelPeersLocked(id uuid.UUID, n *proto.Node, k proto.Coor
344363err := other .updateMappingLocked (id ,n ,k ,reason )
345364if err != nil {
346365other .logger .Error (context .Background (),"failed to update mapping" ,slog .Error (err ))
347- c .removePeerLocked (other .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" )
366+ c .removePeerLocked (other .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" , "failed to update tunnel peer mapping" )
348367}
349368}
350369}
@@ -360,7 +379,8 @@ func (c *core) addTunnelLocked(src *peer, dstID uuid.UUID) error {
360379err := src .updateMappingLocked (dstID ,dst .node ,proto .CoordinateResponse_PeerUpdate_NODE ,"add tunnel" )
361380if err != nil {
362381src .logger .Error (context .Background (),"failed update of tunnel src" ,slog .Error (err ))
363- c .removePeerLocked (src .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" )
382+ c .removePeerLocked (src .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" ,
383+ "failed to update tunnel dest mapping" )
364384// if the source fails, then the tunnel is also removed and there is no reason to continue
365385// processing.
366386return err
@@ -370,7 +390,8 @@ func (c *core) addTunnelLocked(src *peer, dstID uuid.UUID) error {
370390err := dst .updateMappingLocked (src .id ,src .node ,proto .CoordinateResponse_PeerUpdate_NODE ,"add tunnel" )
371391if err != nil {
372392dst .logger .Error (context .Background (),"failed update of tunnel dst" ,slog .Error (err ))
373- c .removePeerLocked (dst .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" )
393+ c .removePeerLocked (dst .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" ,
394+ "failed to update tunnel src mapping" )
374395}
375396}
376397}
@@ -381,7 +402,7 @@ func (c *core) removeTunnelLocked(src *peer, dstID uuid.UUID) error {
381402err := src .updateMappingLocked (dstID ,nil ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"remove tunnel" )
382403if err != nil {
383404src .logger .Error (context .Background (),"failed to update" ,slog .Error (err ))
384- c .removePeerLocked (src .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" )
405+ c .removePeerLocked (src .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" , "failed to remove tunnel dest mapping" )
385406// removing the peer also removes all other tunnels and notifies destinations, so it's safe to
386407// return here.
387408return err
@@ -391,7 +412,7 @@ func (c *core) removeTunnelLocked(src *peer, dstID uuid.UUID) error {
391412err = dst .updateMappingLocked (src .id ,nil ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"remove tunnel" )
392413if err != nil {
393414dst .logger .Error (context .Background (),"failed to update" ,slog .Error (err ))
394- c .removePeerLocked (dst .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" )
415+ c .removePeerLocked (dst .id ,proto .CoordinateResponse_PeerUpdate_DISCONNECTED ,"failed update" , "failed to remove tunnel src mapping" )
395416// don't return here because we still want to remove the tunnel, and an error at the
396417// destination doesn't count as an error removing the tunnel at the source.
397418}
@@ -413,6 +434,11 @@ func (c *core) initPeer(p *peer) error {
413434if old ,ok := c .peers [p .id ];ok {
414435// rare and interesting enough to log at Info, but it isn't an error per se
415436old .logger .Info (context .Background (),"overwritten by new connection" )
437+ select {
438+ case old .resps <- & proto.CoordinateResponse {Error :CloseErrOverwritten }:
439+ default :
440+ // pass
441+ }
416442close (old .resps )
417443p .overwrites = old .overwrites + 1
418444}
@@ -433,7 +459,7 @@ func (c *core) initPeer(p *peer) error {
433459
434460// removePeer removes and cleans up a lost peer. It updates all peers it shares a tunnel with, deletes
435461// all tunnels from which the removed peer is the source.
436- func (c * core )lostPeer (p * peer )error {
462+ func (c * core )lostPeer (p * peer , closeErr string )error {
437463c .mutex .Lock ()
438464defer c .mutex .Unlock ()
439465c .logger .Debug (context .Background (),"lostPeer" ,slog .F ("peer_id" ,p .id ))
@@ -443,18 +469,25 @@ func (c *core) lostPeer(p *peer) error {
443469if existing ,ok := c .peers [p .id ];! ok || existing != p {
444470return ErrAlreadyRemoved
445471}
446- c .removePeerLocked (p .id ,proto .CoordinateResponse_PeerUpdate_LOST ,"lost" )
472+ c .removePeerLocked (p .id ,proto .CoordinateResponse_PeerUpdate_LOST ,"lost" , closeErr )
447473return nil
448474}
449475
450- func (c * core )removePeerLocked (id uuid.UUID ,kind proto.CoordinateResponse_PeerUpdate_Kind ,reason string ) {
476+ func (c * core )removePeerLocked (id uuid.UUID ,kind proto.CoordinateResponse_PeerUpdate_Kind ,reason , closeErr string ) {
451477p ,ok := c .peers [id ]
452478if ! ok {
453479c .logger .Critical (context .Background (),"removed non-existent peer %s" ,id )
454480return
455481}
456482c .updateTunnelPeersLocked (id ,nil ,kind ,reason )
457483c .tunnels .removeAll (id )
484+ if closeErr != "" {
485+ select {
486+ case p .resps <- & proto.CoordinateResponse {Error :closeErr }:
487+ default :
488+ // blocked, pass.
489+ }
490+ }
458491close (p .resps )
459492delete (c .peers ,id )
460493}
@@ -487,7 +520,8 @@ func (c *core) close() error {
487520for id := range c .peers {
488521// when closing, mark them as LOST so that we don't disrupt in-progress
489522// connections.
490- c .removePeerLocked (id ,proto .CoordinateResponse_PeerUpdate_LOST ,"coordinator close" )
523+ c .removePeerLocked (id ,proto .CoordinateResponse_PeerUpdate_LOST ,"coordinator close" ,
524+ CloseErrCoordinatorClose )
491525}
492526return nil
493527}