@@ -26,13 +26,14 @@ import (
2626// A Controller connects to the tailnet control plane, and then uses the control protocols to
2727// program a tailnet.Conn in production (in test it could be an interface simulating the Conn). It
2828// delegates this task to sub-controllers responsible for the main areas of the tailnet control
29- // protocol: coordination, DERP map updates, resume tokens, andtelemetry .
29+ // protocol: coordination, DERP map updates, resume tokens,telemetry, andworkspace updates .
3030type Controller struct {
31- Dialer ControlProtocolDialer
32- CoordCtrl CoordinationController
33- DERPCtrl DERPController
34- ResumeTokenCtrl ResumeTokenController
35- TelemetryCtrl TelemetryController
31+ Dialer ControlProtocolDialer
32+ CoordCtrl CoordinationController
33+ DERPCtrl DERPController
34+ ResumeTokenCtrl ResumeTokenController
35+ TelemetryCtrl TelemetryController
36+ WorkspaceUpdatesCtrl WorkspaceUpdatesController
3637
3738ctx context.Context
3839gracefulCtx context.Context
@@ -94,15 +95,25 @@ type TelemetryController interface {
9495New (TelemetryClient )
9596}
9697
98+ type WorkspaceUpdatesClient interface {
99+ Close ()error
100+ Recv () (* proto.WorkspaceUpdate ,error )
101+ }
102+
103+ type WorkspaceUpdatesController interface {
104+ New (WorkspaceUpdatesClient )CloserWaiter
105+ }
106+
97107// ControlProtocolClients represents an abstract interface to the tailnet control plane via a set
98108// of protocol clients. The Closer should close all the clients (e.g. by closing the underlying
99109// connection).
100110type ControlProtocolClients struct {
101- Closer io.Closer
102- Coordinator CoordinatorClient
103- DERP DERPClient
104- ResumeToken ResumeTokenClient
105- Telemetry TelemetryClient
111+ Closer io.Closer
112+ Coordinator CoordinatorClient
113+ DERP DERPClient
114+ ResumeToken ResumeTokenClient
115+ Telemetry TelemetryClient
116+ WorkspaceUpdates WorkspaceUpdatesClient
106117}
107118
108119type ControlProtocolDialer interface {
@@ -419,6 +430,7 @@ func (c *TunnelSrcCoordController) SyncDestinations(destinations []uuid.UUID) {
419430}
420431}()
421432for dest := range toAdd {
433+ c .Coordinatee .SetTunnelDestination (dest )
422434err = c .coordination .Client .Send (
423435& proto.CoordinateRequest {
424436AddTunnel :& proto.CoordinateRequest_Tunnel {Id :UUIDToByteSlice (dest )},
@@ -822,6 +834,213 @@ func (r *basicResumeTokenRefresher) refresh() {
822834r .timer .Reset (dur ,"basicResumeTokenRefresher" ,"refresh" )
823835}
824836
837+ type tunnelAllWorkspaceUpdatesController struct {
838+ coordCtrl * TunnelSrcCoordController
839+ logger slog.Logger
840+ }
841+
842+ type workspace struct {
843+ id uuid.UUID
844+ name string
845+ agents map [uuid.UUID ]agent
846+ }
847+
848+ type agent struct {
849+ id uuid.UUID
850+ name string
851+ }
852+
853+ func (t * tunnelAllWorkspaceUpdatesController )New (client WorkspaceUpdatesClient )CloserWaiter {
854+ updater := & tunnelUpdater {
855+ client :client ,
856+ errChan :make (chan error ,1 ),
857+ logger :t .logger ,
858+ coordCtrl :t .coordCtrl ,
859+ recvLoopDone :make (chan struct {}),
860+ workspaces :make (map [uuid.UUID ]* workspace ),
861+ }
862+ go updater .recvLoop ()
863+ return updater
864+ }
865+
866+ type tunnelUpdater struct {
867+ errChan chan error
868+ logger slog.Logger
869+ client WorkspaceUpdatesClient
870+ coordCtrl * TunnelSrcCoordController
871+ recvLoopDone chan struct {}
872+
873+ // don't need the mutex since only manipulated by the recvLoop
874+ workspaces map [uuid.UUID ]* workspace
875+
876+ sync.Mutex
877+ closed bool
878+ }
879+
880+ func (t * tunnelUpdater )Close (ctx context.Context )error {
881+ t .Lock ()
882+ defer t .Unlock ()
883+ if t .closed {
884+ select {
885+ case <- ctx .Done ():
886+ return ctx .Err ()
887+ case <- t .recvLoopDone :
888+ return nil
889+ }
890+ }
891+ t .closed = true
892+ cErr := t .client .Close ()
893+ select {
894+ case <- ctx .Done ():
895+ return ctx .Err ()
896+ case <- t .recvLoopDone :
897+ return cErr
898+ }
899+ }
900+
901+ func (t * tunnelUpdater )Wait ()<- chan error {
902+ return t .errChan
903+ }
904+
905+ func (t * tunnelUpdater )recvLoop () {
906+ t .logger .Debug (context .Background (),"tunnel updater recvLoop started" )
907+ defer t .logger .Debug (context .Background (),"tunnel updater recvLoop done" )
908+ defer close (t .recvLoopDone )
909+ for {
910+ update ,err := t .client .Recv ()
911+ if err != nil {
912+ t .logger .Debug (context .Background (),"failed to receive workspace Update" ,slog .Error (err ))
913+ select {
914+ case t .errChan <- err :
915+ default :
916+ }
917+ return
918+ }
919+ t .logger .Debug (context .Background (),"got workspace update" ,
920+ slog .F ("workspace_update" ,update ),
921+ )
922+ err = t .handleUpdate (update )
923+ if err != nil {
924+ t .logger .Critical (context .Background (),"failed to handle workspace Update" ,slog .Error (err ))
925+ cErr := t .client .Close ()
926+ if cErr != nil {
927+ t .logger .Warn (context .Background (),"failed to close client" ,slog .Error (cErr ))
928+ }
929+ select {
930+ case t .errChan <- err :
931+ default :
932+ }
933+ return
934+ }
935+ }
936+ }
937+
938+ func (t * tunnelUpdater )handleUpdate (update * proto.WorkspaceUpdate )error {
939+ for _ ,uw := range update .UpsertedWorkspaces {
940+ workspaceID ,err := uuid .FromBytes (uw .Id )
941+ if err != nil {
942+ return xerrors .Errorf ("failed to parse workspace ID: %w" ,err )
943+ }
944+ w := workspace {
945+ id :workspaceID ,
946+ name :uw .Name ,
947+ agents :make (map [uuid.UUID ]agent ),
948+ }
949+ t .upsertWorkspace (w )
950+ }
951+
952+ // delete agents before deleting workspaces, since the agents have workspace ID references
953+ for _ ,da := range update .DeletedAgents {
954+ agentID ,err := uuid .FromBytes (da .Id )
955+ if err != nil {
956+ return xerrors .Errorf ("failed to parse agent ID: %w" ,err )
957+ }
958+ workspaceID ,err := uuid .FromBytes (da .WorkspaceId )
959+ if err != nil {
960+ return xerrors .Errorf ("failed to parse workspace ID: %w" ,err )
961+ }
962+ err = t .deleteAgent (workspaceID ,agentID )
963+ if err != nil {
964+ return xerrors .Errorf ("failed to delete agent: %w" ,err )
965+ }
966+ }
967+ for _ ,dw := range update .DeletedWorkspaces {
968+ workspaceID ,err := uuid .FromBytes (dw .Id )
969+ if err != nil {
970+ return xerrors .Errorf ("failed to parse workspace ID: %w" ,err )
971+ }
972+ t .deleteWorkspace (workspaceID )
973+ }
974+
975+ // upsert agents last, after all workspaces have been added and deleted, since agents reference
976+ // workspace ID.
977+ for _ ,ua := range update .UpsertedAgents {
978+ agentID ,err := uuid .FromBytes (ua .Id )
979+ if err != nil {
980+ return xerrors .Errorf ("failed to parse agent ID: %w" ,err )
981+ }
982+ workspaceID ,err := uuid .FromBytes (ua .WorkspaceId )
983+ if err != nil {
984+ return xerrors .Errorf ("failed to parse workspace ID: %w" ,err )
985+ }
986+ a := agent {name :ua .Name ,id :agentID }
987+ err = t .upsertAgent (workspaceID ,a )
988+ if err != nil {
989+ return xerrors .Errorf ("failed to upsert agent: %w" ,err )
990+ }
991+ }
992+ allAgents := t .allAgentIDs ()
993+ t .coordCtrl .SyncDestinations (allAgents )
994+ return nil
995+ }
996+
997+ func (t * tunnelUpdater )upsertWorkspace (w workspace ) {
998+ old ,ok := t .workspaces [w .id ]
999+ if ! ok {
1000+ t .workspaces [w .id ]= & w
1001+ return
1002+ }
1003+ old .name = w .name
1004+ }
1005+
1006+ func (t * tunnelUpdater )deleteWorkspace (id uuid.UUID ) {
1007+ delete (t .workspaces ,id )
1008+ }
1009+
1010+ func (t * tunnelUpdater )upsertAgent (workspaceID uuid.UUID ,a agent )error {
1011+ w ,ok := t .workspaces [workspaceID ]
1012+ if ! ok {
1013+ return xerrors .Errorf ("workspace %s not found" ,workspaceID )
1014+ }
1015+ w .agents [a .id ]= a
1016+ return nil
1017+ }
1018+
1019+ func (t * tunnelUpdater )deleteAgent (workspaceID ,id uuid.UUID )error {
1020+ w ,ok := t .workspaces [workspaceID ]
1021+ if ! ok {
1022+ return xerrors .Errorf ("workspace %s not found" ,workspaceID )
1023+ }
1024+ delete (w .agents ,id )
1025+ return nil
1026+ }
1027+
1028+ func (t * tunnelUpdater )allAgentIDs () []uuid.UUID {
1029+ out := make ([]uuid.UUID ,0 ,len (t .workspaces ))
1030+ for _ ,w := range t .workspaces {
1031+ for id := range w .agents {
1032+ out = append (out ,id )
1033+ }
1034+ }
1035+ return out
1036+ }
1037+
1038+ func NewTunnelAllWorkspaceUpdatesController (
1039+ logger slog.Logger ,c * TunnelSrcCoordController ,
1040+ )WorkspaceUpdatesController {
1041+ return & tunnelAllWorkspaceUpdatesController {logger :logger ,coordCtrl :c }
1042+ }
1043+
8251044// NewController creates a new Controller without running it
8261045func NewController (logger slog.Logger ,dialer ControlProtocolDialer ,opts ... ControllerOpt )* Controller {
8271046c := & Controller {