44"context"
55"fmt"
66"io"
7+ "maps"
78"math"
89"strings"
910"sync"
@@ -239,15 +240,17 @@ func (c *BasicCoordination) respLoop() {
239240defer func () {
240241cErr := c .Client .Close ()
241242if cErr != nil {
242- c .logger .Debug (context .Background (),"failed to close coordinate client after respLoop exit" ,slog .Error (cErr ))
243+ c .logger .Debug (context .Background (),
244+ "failed to close coordinate client after respLoop exit" ,slog .Error (cErr ))
243245}
244246c .coordinatee .SetAllPeersLost ()
245247close (c .respLoopDone )
246248}()
247249for {
248250resp ,err := c .Client .Recv ()
249251if err != nil {
250- c .logger .Debug (context .Background (),"failed to read from protocol" ,slog .Error (err ))
252+ c .logger .Debug (context .Background (),
253+ "failed to read from protocol" ,slog .Error (err ))
251254c .SendErr (xerrors .Errorf ("read: %w" ,err ))
252255return
253256}
@@ -278,7 +281,8 @@ func (c *BasicCoordination) respLoop() {
278281ReadyForHandshake :rfh ,
279282})
280283if err != nil {
281- c .logger .Debug (context .Background (),"failed to send ready for handshake" ,slog .Error (err ))
284+ c .logger .Debug (context .Background (),
285+ "failed to send ready for handshake" ,slog .Error (err ))
282286c .SendErr (xerrors .Errorf ("send: %w" ,err ))
283287return
284288}
@@ -287,37 +291,158 @@ func (c *BasicCoordination) respLoop() {
287291}
288292}
289293
290- type singleDestController struct {
294+ type TunnelSrcCoordController struct {
291295* BasicCoordinationController
292- dest uuid.UUID
296+
297+ mu sync.Mutex
298+ dests map [uuid.UUID ]struct {}
299+ coordination * BasicCoordination
293300}
294301
295- // NewSingleDestController creates a CoordinationController for Coder clients that connect to a
296- // single tunnel destination, e.g. `coder ssh`, which connects to a single workspace Agent.
297- func NewSingleDestController (logger slog.Logger ,coordinatee Coordinatee ,dest uuid.UUID )CoordinationController {
298- coordinatee .SetTunnelDestination (dest )
299- return & singleDestController {
302+ // NewTunnelSrcCoordController creates a CoordinationController for peers that are exclusively
303+ // tunnel sources (that is, they create tunnel --- Coder clients not workspaces).
304+ func NewTunnelSrcCoordController (
305+ logger slog.Logger ,coordinatee Coordinatee ,
306+ )* TunnelSrcCoordController {
307+ return & TunnelSrcCoordController {
300308BasicCoordinationController :& BasicCoordinationController {
301309Logger :logger ,
302310Coordinatee :coordinatee ,
303311SendAcks :false ,
304312},
305- dest : dest ,
313+ dests : make ( map [uuid. UUID ] struct {}) ,
306314}
307315}
308316
309- func (c * singleDestController )New (client CoordinatorClient )CloserWaiter {
317+ func (c * TunnelSrcCoordController )New (client CoordinatorClient )CloserWaiter {
318+ c .mu .Lock ()
319+ defer c .mu .Unlock ()
310320b := c .BasicCoordinationController .NewCoordination (client )
311- err := client .Send (& proto.CoordinateRequest {AddTunnel :& proto.CoordinateRequest_Tunnel {Id :c .dest [:]}})
312- if err != nil {
313- b .SendErr (err )
321+ c .coordination = b
322+ // resync destinations on reconnect
323+ for dest := range c .dests {
324+ err := client .Send (& proto.CoordinateRequest {
325+ AddTunnel :& proto.CoordinateRequest_Tunnel {Id :UUIDToByteSlice (dest )},
326+ })
327+ if err != nil {
328+ b .SendErr (err )
329+ c .coordination = nil
330+ cErr := client .Close ()
331+ if cErr != nil {
332+ c .Logger .Debug (
333+ context .Background (),
334+ "failed to close coordinator client after add tunnel failure" ,
335+ slog .Error (cErr ),
336+ )
337+ }
338+ break
339+ }
314340}
315341return b
316342}
317343
344+ func (c * TunnelSrcCoordController )AddDestination (dest uuid.UUID ) {
345+ c .mu .Lock ()
346+ defer c .mu .Unlock ()
347+ c .Coordinatee .SetTunnelDestination (dest )// this prepares us for an ack
348+ c .dests [dest ]= struct {}{}
349+ if c .coordination == nil {
350+ return
351+ }
352+ err := c .coordination .Client .Send (
353+ & proto.CoordinateRequest {
354+ AddTunnel :& proto.CoordinateRequest_Tunnel {Id :UUIDToByteSlice (dest )},
355+ })
356+ if err != nil {
357+ c .coordination .SendErr (err )
358+ cErr := c .coordination .Client .Close ()// close the client so we don't gracefully disconnect
359+ if cErr != nil {
360+ c .Logger .Debug (context .Background (),
361+ "failed to close coordinator client after add tunnel failure" ,
362+ slog .Error (cErr ))
363+ }
364+ c .coordination = nil
365+ }
366+ }
367+
368+ func (c * TunnelSrcCoordController )RemoveDestination (dest uuid.UUID ) {
369+ c .mu .Lock ()
370+ defer c .mu .Unlock ()
371+ delete (c .dests ,dest )
372+ if c .coordination == nil {
373+ return
374+ }
375+ err := c .coordination .Client .Send (
376+ & proto.CoordinateRequest {
377+ RemoveTunnel :& proto.CoordinateRequest_Tunnel {Id :UUIDToByteSlice (dest )},
378+ })
379+ if err != nil {
380+ c .coordination .SendErr (err )
381+ cErr := c .coordination .Client .Close ()// close the client so we don't gracefully disconnect
382+ if cErr != nil {
383+ c .Logger .Debug (context .Background (),
384+ "failed to close coordinator client after remove tunnel failure" ,
385+ slog .Error (cErr ))
386+ }
387+ c .coordination = nil
388+ }
389+ }
390+
391+ func (c * TunnelSrcCoordController )SyncDestinations (destinations []uuid.UUID ) {
392+ c .mu .Lock ()
393+ defer c .mu .Unlock ()
394+ toAdd := make (map [uuid.UUID ]struct {})
395+ toRemove := maps .Clone (c .dests )
396+ all := make (map [uuid.UUID ]struct {})
397+ for _ ,dest := range destinations {
398+ all [dest ]= struct {}{}
399+ delete (toRemove ,dest )
400+ if _ ,ok := c .dests [dest ];! ok {
401+ toAdd [dest ]= struct {}{}
402+ }
403+ }
404+ c .dests = all
405+ if c .coordination == nil {
406+ return
407+ }
408+ var err error
409+ defer func () {
410+ if err != nil {
411+ c .coordination .SendErr (err )
412+ cErr := c .coordination .Client .Close ()// don't gracefully disconnect
413+ if cErr != nil {
414+ c .Logger .Debug (context .Background (),
415+ "failed to close coordinator client during sync destinations" ,
416+ slog .Error (cErr ))
417+ }
418+ c .coordination = nil
419+ }
420+ }()
421+ for dest := range toAdd {
422+ err = c .coordination .Client .Send (
423+ & proto.CoordinateRequest {
424+ AddTunnel :& proto.CoordinateRequest_Tunnel {Id :UUIDToByteSlice (dest )},
425+ })
426+ if err != nil {
427+ return
428+ }
429+ }
430+ for dest := range toRemove {
431+ err = c .coordination .Client .Send (
432+ & proto.CoordinateRequest {
433+ RemoveTunnel :& proto.CoordinateRequest_Tunnel {Id :UUIDToByteSlice (dest )},
434+ })
435+ if err != nil {
436+ return
437+ }
438+ }
439+ }
440+
318441// NewAgentCoordinationController creates a CoordinationController for Coder Agents, which never
319442// create tunnels and always send ReadyToHandshake acknowledgements.
320- func NewAgentCoordinationController (logger slog.Logger ,coordinatee Coordinatee )CoordinationController {
443+ func NewAgentCoordinationController (
444+ logger slog.Logger ,coordinatee Coordinatee ,
445+ )CoordinationController {
321446return & BasicCoordinationController {
322447Logger :logger ,
323448Coordinatee :coordinatee ,