Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat: add workspace updates controller#15506

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
spikecurtis merged 1 commit intomainfromspike/14730-workspace-updates-controller
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 230 additions & 11 deletionstailnet/controllers.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -26,13 +26,14 @@ import (
// A Controller connects to the tailnet control plane, and then uses the control protocols to
// program a tailnet.Conn in production (in test it could be an interface simulating the Conn). It
// delegates this task to sub-controllers responsible for the main areas of the tailnet control
// protocol: coordination, DERP map updates, resume tokens, andtelemetry.
// protocol: coordination, DERP map updates, resume tokens,telemetry,andworkspace updates.
type Controller struct {
Dialer ControlProtocolDialer
CoordCtrl CoordinationController
DERPCtrl DERPController
ResumeTokenCtrl ResumeTokenController
TelemetryCtrl TelemetryController
Dialer ControlProtocolDialer
CoordCtrl CoordinationController
DERPCtrl DERPController
ResumeTokenCtrl ResumeTokenController
TelemetryCtrl TelemetryController
WorkspaceUpdatesCtrl WorkspaceUpdatesController

ctx context.Context
gracefulCtx context.Context
Expand DownExpand Up@@ -94,15 +95,25 @@ type TelemetryController interface {
New(TelemetryClient)
}

type WorkspaceUpdatesClient interface {
Close() error
Recv() (*proto.WorkspaceUpdate, error)
}

type WorkspaceUpdatesController interface {
New(WorkspaceUpdatesClient) CloserWaiter
}

// ControlProtocolClients represents an abstract interface to the tailnet control plane via a set
// of protocol clients. The Closer should close all the clients (e.g. by closing the underlying
// connection).
type ControlProtocolClients struct {
Closer io.Closer
Coordinator CoordinatorClient
DERP DERPClient
ResumeToken ResumeTokenClient
Telemetry TelemetryClient
Closer io.Closer
Coordinator CoordinatorClient
DERP DERPClient
ResumeToken ResumeTokenClient
Telemetry TelemetryClient
WorkspaceUpdates WorkspaceUpdatesClient
}

type ControlProtocolDialer interface {
Expand DownExpand Up@@ -419,6 +430,7 @@ func (c *TunnelSrcCoordController) SyncDestinations(destinations []uuid.UUID) {
}
}()
for dest := range toAdd {
c.Coordinatee.SetTunnelDestination(dest)
err = c.coordination.Client.Send(
&proto.CoordinateRequest{
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: UUIDToByteSlice(dest)},
Expand DownExpand Up@@ -822,6 +834,213 @@ func (r *basicResumeTokenRefresher) refresh() {
r.timer.Reset(dur, "basicResumeTokenRefresher", "refresh")
}

type tunnelAllWorkspaceUpdatesController struct {
coordCtrl *TunnelSrcCoordController
logger slog.Logger
}

type workspace struct {
id uuid.UUID
name string
agents map[uuid.UUID]agent
}

type agent struct {
id uuid.UUID
name string
}

func (t *tunnelAllWorkspaceUpdatesController) New(client WorkspaceUpdatesClient) CloserWaiter {
updater := &tunnelUpdater{
client: client,
errChan: make(chan error, 1),
logger: t.logger,
coordCtrl: t.coordCtrl,
recvLoopDone: make(chan struct{}),
workspaces: make(map[uuid.UUID]*workspace),
}
go updater.recvLoop()
return updater
}

type tunnelUpdater struct {
errChan chan error
logger slog.Logger
client WorkspaceUpdatesClient
coordCtrl *TunnelSrcCoordController
recvLoopDone chan struct{}

// don't need the mutex since only manipulated by the recvLoop
workspaces map[uuid.UUID]*workspace

sync.Mutex
closed bool
}

func (t *tunnelUpdater) Close(ctx context.Context) error {
t.Lock()
defer t.Unlock()
if t.closed {
select {
case <-ctx.Done():
return ctx.Err()
case <-t.recvLoopDone:
return nil
}
}
t.closed = true
cErr := t.client.Close()
select {
case <-ctx.Done():
return ctx.Err()
case <-t.recvLoopDone:
return cErr
}
}

func (t *tunnelUpdater) Wait() <-chan error {
return t.errChan
}

func (t *tunnelUpdater) recvLoop() {
t.logger.Debug(context.Background(), "tunnel updater recvLoop started")
defer t.logger.Debug(context.Background(), "tunnel updater recvLoop done")
defer close(t.recvLoopDone)
for {
update, err := t.client.Recv()
if err != nil {
t.logger.Debug(context.Background(), "failed to receive workspace Update", slog.Error(err))
select {
case t.errChan <- err:
default:
}
return
}
t.logger.Debug(context.Background(), "got workspace update",
slog.F("workspace_update", update),
)
err = t.handleUpdate(update)
if err != nil {
t.logger.Critical(context.Background(), "failed to handle workspace Update", slog.Error(err))
cErr := t.client.Close()
if cErr != nil {
t.logger.Warn(context.Background(), "failed to close client", slog.Error(cErr))
}
select {
case t.errChan <- err:
default:
}
return
}
}
}

func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error {
for _, uw := range update.UpsertedWorkspaces {
workspaceID, err := uuid.FromBytes(uw.Id)
if err != nil {
return xerrors.Errorf("failed to parse workspace ID: %w", err)
}
w := workspace{
id: workspaceID,
name: uw.Name,
agents: make(map[uuid.UUID]agent),
}
t.upsertWorkspace(w)
}

// delete agents before deleting workspaces, since the agents have workspace ID references
for _, da := range update.DeletedAgents {
agentID, err := uuid.FromBytes(da.Id)
if err != nil {
return xerrors.Errorf("failed to parse agent ID: %w", err)
}
workspaceID, err := uuid.FromBytes(da.WorkspaceId)
if err != nil {
return xerrors.Errorf("failed to parse workspace ID: %w", err)
}
err = t.deleteAgent(workspaceID, agentID)
if err != nil {
return xerrors.Errorf("failed to delete agent: %w", err)
}
}
for _, dw := range update.DeletedWorkspaces {
workspaceID, err := uuid.FromBytes(dw.Id)
if err != nil {
return xerrors.Errorf("failed to parse workspace ID: %w", err)
}
t.deleteWorkspace(workspaceID)
}

// upsert agents last, after all workspaces have been added and deleted, since agents reference
// workspace ID.
for _, ua := range update.UpsertedAgents {
agentID, err := uuid.FromBytes(ua.Id)
if err != nil {
return xerrors.Errorf("failed to parse agent ID: %w", err)
}
workspaceID, err := uuid.FromBytes(ua.WorkspaceId)
if err != nil {
return xerrors.Errorf("failed to parse workspace ID: %w", err)
}
a := agent{name: ua.Name, id: agentID}
err = t.upsertAgent(workspaceID, a)
if err != nil {
return xerrors.Errorf("failed to upsert agent: %w", err)
}
}
allAgents := t.allAgentIDs()
t.coordCtrl.SyncDestinations(allAgents)
return nil
}

func (t *tunnelUpdater) upsertWorkspace(w workspace) {
old, ok := t.workspaces[w.id]
if !ok {
t.workspaces[w.id] = &w
return
}
old.name = w.name
}

func (t *tunnelUpdater) deleteWorkspace(id uuid.UUID) {
delete(t.workspaces, id)
}

func (t *tunnelUpdater) upsertAgent(workspaceID uuid.UUID, a agent) error {
w, ok := t.workspaces[workspaceID]
if !ok {
return xerrors.Errorf("workspace %s not found", workspaceID)
}
w.agents[a.id] = a
return nil
}

func (t *tunnelUpdater) deleteAgent(workspaceID, id uuid.UUID) error {
w, ok := t.workspaces[workspaceID]
if !ok {
return xerrors.Errorf("workspace %s not found", workspaceID)
}
delete(w.agents, id)
return nil
}

func (t *tunnelUpdater) allAgentIDs() []uuid.UUID {
out := make([]uuid.UUID, 0, len(t.workspaces))
for _, w := range t.workspaces {
for id := range w.agents {
out = append(out, id)
}
}
return out
}

func NewTunnelAllWorkspaceUpdatesController(
logger slog.Logger, c *TunnelSrcCoordController,
) WorkspaceUpdatesController {
return &tunnelAllWorkspaceUpdatesController{logger: logger, coordCtrl: c}
}

// NewController creates a new Controller without running it
func NewController(logger slog.Logger, dialer ControlProtocolDialer, opts ...ControllerOpt) *Controller {
c := &Controller{
Expand Down
Loading
Loading

[8]ページ先頭

©2009-2025 Movatter.jp