Expand Up @@ -26,13 +26,14 @@ import ( // A Controller connects to the tailnet control plane, and then uses the control protocols to // program a tailnet.Conn in production (in test it could be an interface simulating the Conn). It // delegates this task to sub-controllers responsible for the main areas of the tailnet control // protocol: coordination, DERP map updates, resume tokens, andtelemetry . // protocol: coordination, DERP map updates, resume tokens,telemetry, andworkspace updates . type Controller struct { Dialer ControlProtocolDialer CoordCtrl CoordinationController DERPCtrl DERPController ResumeTokenCtrl ResumeTokenController TelemetryCtrl TelemetryController Dialer ControlProtocolDialer CoordCtrl CoordinationController DERPCtrl DERPController ResumeTokenCtrl ResumeTokenController TelemetryCtrl TelemetryController WorkspaceUpdatesCtrl WorkspaceUpdatesController ctx context.Context gracefulCtx context.Context Expand Down Expand Up @@ -94,15 +95,25 @@ type TelemetryController interface { New(TelemetryClient) } type WorkspaceUpdatesClient interface { Close() error Recv() (*proto.WorkspaceUpdate, error) } type WorkspaceUpdatesController interface { New(WorkspaceUpdatesClient) CloserWaiter } // ControlProtocolClients represents an abstract interface to the tailnet control plane via a set // of protocol clients. The Closer should close all the clients (e.g. by closing the underlying // connection). type ControlProtocolClients struct { Closer io.Closer Coordinator CoordinatorClient DERP DERPClient ResumeToken ResumeTokenClient Telemetry TelemetryClient Closer io.Closer Coordinator CoordinatorClient DERP DERPClient ResumeToken ResumeTokenClient Telemetry TelemetryClient WorkspaceUpdates WorkspaceUpdatesClient } type ControlProtocolDialer interface { Expand Down Expand Up @@ -419,6 +430,7 @@ func (c *TunnelSrcCoordController) SyncDestinations(destinations []uuid.UUID) { } }() for dest := range toAdd { c.Coordinatee.SetTunnelDestination(dest) err = c.coordination.Client.Send( &proto.CoordinateRequest{ AddTunnel: &proto.CoordinateRequest_Tunnel{Id: UUIDToByteSlice(dest)}, Expand Down Expand Up @@ -822,6 +834,213 @@ func (r *basicResumeTokenRefresher) refresh() { r.timer.Reset(dur, "basicResumeTokenRefresher", "refresh") } type tunnelAllWorkspaceUpdatesController struct { coordCtrl *TunnelSrcCoordController logger slog.Logger } type workspace struct { id uuid.UUID name string agents map[uuid.UUID]agent } type agent struct { id uuid.UUID name string } func (t *tunnelAllWorkspaceUpdatesController) New(client WorkspaceUpdatesClient) CloserWaiter { updater := &tunnelUpdater{ client: client, errChan: make(chan error, 1), logger: t.logger, coordCtrl: t.coordCtrl, recvLoopDone: make(chan struct{}), workspaces: make(map[uuid.UUID]*workspace), } go updater.recvLoop() return updater } type tunnelUpdater struct { errChan chan error logger slog.Logger client WorkspaceUpdatesClient coordCtrl *TunnelSrcCoordController recvLoopDone chan struct{} // don't need the mutex since only manipulated by the recvLoop workspaces map[uuid.UUID]*workspace sync.Mutex closed bool } func (t *tunnelUpdater) Close(ctx context.Context) error { t.Lock() defer t.Unlock() if t.closed { select { case <-ctx.Done(): return ctx.Err() case <-t.recvLoopDone: return nil } } t.closed = true cErr := t.client.Close() select { case <-ctx.Done(): return ctx.Err() case <-t.recvLoopDone: return cErr } } func (t *tunnelUpdater) Wait() <-chan error { return t.errChan } func (t *tunnelUpdater) recvLoop() { t.logger.Debug(context.Background(), "tunnel updater recvLoop started") defer t.logger.Debug(context.Background(), "tunnel updater recvLoop done") defer close(t.recvLoopDone) for { update, err := t.client.Recv() if err != nil { t.logger.Debug(context.Background(), "failed to receive workspace Update", slog.Error(err)) select { case t.errChan <- err: default: } return } t.logger.Debug(context.Background(), "got workspace update", slog.F("workspace_update", update), ) err = t.handleUpdate(update) if err != nil { t.logger.Critical(context.Background(), "failed to handle workspace Update", slog.Error(err)) cErr := t.client.Close() if cErr != nil { t.logger.Warn(context.Background(), "failed to close client", slog.Error(cErr)) } select { case t.errChan <- err: default: } return } } } func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error { for _, uw := range update.UpsertedWorkspaces { workspaceID, err := uuid.FromBytes(uw.Id) if err != nil { return xerrors.Errorf("failed to parse workspace ID: %w", err) } w := workspace{ id: workspaceID, name: uw.Name, agents: make(map[uuid.UUID]agent), } t.upsertWorkspace(w) } // delete agents before deleting workspaces, since the agents have workspace ID references for _, da := range update.DeletedAgents { agentID, err := uuid.FromBytes(da.Id) if err != nil { return xerrors.Errorf("failed to parse agent ID: %w", err) } workspaceID, err := uuid.FromBytes(da.WorkspaceId) if err != nil { return xerrors.Errorf("failed to parse workspace ID: %w", err) } err = t.deleteAgent(workspaceID, agentID) if err != nil { return xerrors.Errorf("failed to delete agent: %w", err) } } for _, dw := range update.DeletedWorkspaces { workspaceID, err := uuid.FromBytes(dw.Id) if err != nil { return xerrors.Errorf("failed to parse workspace ID: %w", err) } t.deleteWorkspace(workspaceID) } // upsert agents last, after all workspaces have been added and deleted, since agents reference // workspace ID. for _, ua := range update.UpsertedAgents { agentID, err := uuid.FromBytes(ua.Id) if err != nil { return xerrors.Errorf("failed to parse agent ID: %w", err) } workspaceID, err := uuid.FromBytes(ua.WorkspaceId) if err != nil { return xerrors.Errorf("failed to parse workspace ID: %w", err) } a := agent{name: ua.Name, id: agentID} err = t.upsertAgent(workspaceID, a) if err != nil { return xerrors.Errorf("failed to upsert agent: %w", err) } } allAgents := t.allAgentIDs() t.coordCtrl.SyncDestinations(allAgents) return nil } func (t *tunnelUpdater) upsertWorkspace(w workspace) { old, ok := t.workspaces[w.id] if !ok { t.workspaces[w.id] = &w return } old.name = w.name } func (t *tunnelUpdater) deleteWorkspace(id uuid.UUID) { delete(t.workspaces, id) } func (t *tunnelUpdater) upsertAgent(workspaceID uuid.UUID, a agent) error { w, ok := t.workspaces[workspaceID] if !ok { return xerrors.Errorf("workspace %s not found", workspaceID) } w.agents[a.id] = a return nil } func (t *tunnelUpdater) deleteAgent(workspaceID, id uuid.UUID) error { w, ok := t.workspaces[workspaceID] if !ok { return xerrors.Errorf("workspace %s not found", workspaceID) } delete(w.agents, id) return nil } func (t *tunnelUpdater) allAgentIDs() []uuid.UUID { out := make([]uuid.UUID, 0, len(t.workspaces)) for _, w := range t.workspaces { for id := range w.agents { out = append(out, id) } } return out } func NewTunnelAllWorkspaceUpdatesController( logger slog.Logger, c *TunnelSrcCoordController, ) WorkspaceUpdatesController { return &tunnelAllWorkspaceUpdatesController{logger: logger, coordCtrl: c} } // NewController creates a new Controller without running it func NewController(logger slog.Logger, dialer ControlProtocolDialer, opts ...ControllerOpt) *Controller { c := &Controller{ Expand Down