Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit002e484

Browse files
committed
feat: add workspace updates controller
1 parente55e8ee commit002e484

File tree

2 files changed

+543
-11
lines changed

2 files changed

+543
-11
lines changed

‎tailnet/controllers.go

Lines changed: 230 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@ import (
2626
// A Controller connects to the tailnet control plane, and then uses the control protocols to
2727
// program a tailnet.Conn in production (in test it could be an interface simulating the Conn). It
2828
// delegates this task to sub-controllers responsible for the main areas of the tailnet control
29-
// protocol: coordination, DERP map updates, resume tokens, andtelemetry.
29+
// protocol: coordination, DERP map updates, resume tokens,telemetry,andworkspace updates.
3030
typeControllerstruct {
31-
DialerControlProtocolDialer
32-
CoordCtrlCoordinationController
33-
DERPCtrlDERPController
34-
ResumeTokenCtrlResumeTokenController
35-
TelemetryCtrlTelemetryController
31+
DialerControlProtocolDialer
32+
CoordCtrlCoordinationController
33+
DERPCtrlDERPController
34+
ResumeTokenCtrlResumeTokenController
35+
TelemetryCtrlTelemetryController
36+
WorkspaceUpdatesCtrlWorkspaceUpdatesController
3637

3738
ctx context.Context
3839
gracefulCtx context.Context
@@ -94,15 +95,25 @@ type TelemetryController interface {
9495
New(TelemetryClient)
9596
}
9697

98+
typeWorkspaceUpdatesClientinterface {
99+
Close()error
100+
Recv() (*proto.WorkspaceUpdate,error)
101+
}
102+
103+
typeWorkspaceUpdatesControllerinterface {
104+
New(WorkspaceUpdatesClient)CloserWaiter
105+
}
106+
97107
// ControlProtocolClients represents an abstract interface to the tailnet control plane via a set
98108
// of protocol clients. The Closer should close all the clients (e.g. by closing the underlying
99109
// connection).
100110
typeControlProtocolClientsstruct {
101-
Closer io.Closer
102-
CoordinatorCoordinatorClient
103-
DERPDERPClient
104-
ResumeTokenResumeTokenClient
105-
TelemetryTelemetryClient
111+
Closer io.Closer
112+
CoordinatorCoordinatorClient
113+
DERPDERPClient
114+
ResumeTokenResumeTokenClient
115+
TelemetryTelemetryClient
116+
WorkspaceUpdatesWorkspaceUpdatesClient
106117
}
107118

108119
typeControlProtocolDialerinterface {
@@ -419,6 +430,7 @@ func (c *TunnelSrcCoordController) SyncDestinations(destinations []uuid.UUID) {
419430
}
420431
}()
421432
fordest:=rangetoAdd {
433+
c.Coordinatee.SetTunnelDestination(dest)
422434
err=c.coordination.Client.Send(
423435
&proto.CoordinateRequest{
424436
AddTunnel:&proto.CoordinateRequest_Tunnel{Id:UUIDToByteSlice(dest)},
@@ -822,6 +834,213 @@ func (r *basicResumeTokenRefresher) refresh() {
822834
r.timer.Reset(dur,"basicResumeTokenRefresher","refresh")
823835
}
824836

837+
typetunnelAllWorkspaceUpdatesControllerstruct {
838+
coordCtrl*TunnelSrcCoordController
839+
logger slog.Logger
840+
}
841+
842+
typeworkspacestruct {
843+
id uuid.UUID
844+
namestring
845+
agentsmap[uuid.UUID]agent
846+
}
847+
848+
typeagentstruct {
849+
id uuid.UUID
850+
namestring
851+
}
852+
853+
func (t*tunnelAllWorkspaceUpdatesController)New(clientWorkspaceUpdatesClient)CloserWaiter {
854+
updater:=&tunnelUpdater{
855+
client:client,
856+
errChan:make(chanerror,1),
857+
logger:t.logger,
858+
coordCtrl:t.coordCtrl,
859+
recvLoopDone:make(chanstruct{}),
860+
workspaces:make(map[uuid.UUID]*workspace),
861+
}
862+
goupdater.recvLoop()
863+
returnupdater
864+
}
865+
866+
typetunnelUpdaterstruct {
867+
errChanchanerror
868+
logger slog.Logger
869+
clientWorkspaceUpdatesClient
870+
coordCtrl*TunnelSrcCoordController
871+
recvLoopDonechanstruct{}
872+
873+
// don't need the mutex since only manipulated by the recvLoop
874+
workspacesmap[uuid.UUID]*workspace
875+
876+
sync.Mutex
877+
closedbool
878+
}
879+
880+
func (t*tunnelUpdater)Close(ctx context.Context)error {
881+
t.Lock()
882+
defert.Unlock()
883+
ift.closed {
884+
select {
885+
case<-ctx.Done():
886+
returnctx.Err()
887+
case<-t.recvLoopDone:
888+
returnnil
889+
}
890+
}
891+
t.closed=true
892+
cErr:=t.client.Close()
893+
select {
894+
case<-ctx.Done():
895+
returnctx.Err()
896+
case<-t.recvLoopDone:
897+
returncErr
898+
}
899+
}
900+
901+
func (t*tunnelUpdater)Wait()<-chanerror {
902+
returnt.errChan
903+
}
904+
905+
func (t*tunnelUpdater)recvLoop() {
906+
t.logger.Debug(context.Background(),"tunnel updater recvLoop started")
907+
defert.logger.Debug(context.Background(),"tunnel updater recvLoop done")
908+
deferclose(t.recvLoopDone)
909+
for {
910+
update,err:=t.client.Recv()
911+
iferr!=nil {
912+
t.logger.Debug(context.Background(),"failed to receive workspace Update",slog.Error(err))
913+
select {
914+
caset.errChan<-err:
915+
default:
916+
}
917+
return
918+
}
919+
t.logger.Debug(context.Background(),"got workspace update",
920+
slog.F("workspace_update",update),
921+
)
922+
err=t.handleUpdate(update)
923+
iferr!=nil {
924+
t.logger.Critical(context.Background(),"failed to handle workspace Update",slog.Error(err))
925+
cErr:=t.client.Close()
926+
ifcErr!=nil {
927+
t.logger.Warn(context.Background(),"failed to close client",slog.Error(cErr))
928+
}
929+
select {
930+
caset.errChan<-err:
931+
default:
932+
}
933+
return
934+
}
935+
}
936+
}
937+
938+
func (t*tunnelUpdater)handleUpdate(update*proto.WorkspaceUpdate)error {
939+
for_,uw:=rangeupdate.UpsertedWorkspaces {
940+
workspaceID,err:=uuid.FromBytes(uw.Id)
941+
iferr!=nil {
942+
returnxerrors.Errorf("failed to parse workspace ID: %w",err)
943+
}
944+
w:=workspace{
945+
id:workspaceID,
946+
name:uw.Name,
947+
agents:make(map[uuid.UUID]agent),
948+
}
949+
t.upsertWorkspace(w)
950+
}
951+
952+
// delete agents before deleting workspaces, since the agents have workspace ID references
953+
for_,da:=rangeupdate.DeletedAgents {
954+
agentID,err:=uuid.FromBytes(da.Id)
955+
iferr!=nil {
956+
returnxerrors.Errorf("failed to parse agent ID: %w",err)
957+
}
958+
workspaceID,err:=uuid.FromBytes(da.WorkspaceId)
959+
iferr!=nil {
960+
returnxerrors.Errorf("failed to parse workspace ID: %w",err)
961+
}
962+
err=t.deleteAgent(workspaceID,agentID)
963+
iferr!=nil {
964+
returnxerrors.Errorf("failed to delete agent: %w",err)
965+
}
966+
}
967+
for_,dw:=rangeupdate.DeletedWorkspaces {
968+
workspaceID,err:=uuid.FromBytes(dw.Id)
969+
iferr!=nil {
970+
returnxerrors.Errorf("failed to parse workspace ID: %w",err)
971+
}
972+
t.deleteWorkspace(workspaceID)
973+
}
974+
975+
// upsert agents last, after all workspaces have been added and deleted, since agents reference
976+
// workspace ID.
977+
for_,ua:=rangeupdate.UpsertedAgents {
978+
agentID,err:=uuid.FromBytes(ua.Id)
979+
iferr!=nil {
980+
returnxerrors.Errorf("failed to parse agent ID: %w",err)
981+
}
982+
workspaceID,err:=uuid.FromBytes(ua.WorkspaceId)
983+
iferr!=nil {
984+
returnxerrors.Errorf("failed to parse workspace ID: %w",err)
985+
}
986+
a:=agent{name:ua.Name,id:agentID}
987+
err=t.upsertAgent(workspaceID,a)
988+
iferr!=nil {
989+
returnxerrors.Errorf("failed to upsert agent: %w",err)
990+
}
991+
}
992+
allAgents:=t.allAgentIDs()
993+
t.coordCtrl.SyncDestinations(allAgents)
994+
returnnil
995+
}
996+
997+
func (t*tunnelUpdater)upsertWorkspace(wworkspace) {
998+
old,ok:=t.workspaces[w.id]
999+
if!ok {
1000+
t.workspaces[w.id]=&w
1001+
return
1002+
}
1003+
old.name=w.name
1004+
}
1005+
1006+
func (t*tunnelUpdater)deleteWorkspace(id uuid.UUID) {
1007+
delete(t.workspaces,id)
1008+
}
1009+
1010+
func (t*tunnelUpdater)upsertAgent(workspaceID uuid.UUID,aagent)error {
1011+
w,ok:=t.workspaces[workspaceID]
1012+
if!ok {
1013+
returnxerrors.Errorf("workspace %s not found",workspaceID)
1014+
}
1015+
w.agents[a.id]=a
1016+
returnnil
1017+
}
1018+
1019+
func (t*tunnelUpdater)deleteAgent(workspaceID,id uuid.UUID)error {
1020+
w,ok:=t.workspaces[workspaceID]
1021+
if!ok {
1022+
returnxerrors.Errorf("workspace %s not found",workspaceID)
1023+
}
1024+
delete(w.agents,id)
1025+
returnnil
1026+
}
1027+
1028+
func (t*tunnelUpdater)allAgentIDs() []uuid.UUID {
1029+
out:=make([]uuid.UUID,0,len(t.workspaces))
1030+
for_,w:=ranget.workspaces {
1031+
forid:=rangew.agents {
1032+
out=append(out,id)
1033+
}
1034+
}
1035+
returnout
1036+
}
1037+
1038+
funcNewTunnelAllWorkspaceUpdatesController(
1039+
logger slog.Logger,c*TunnelSrcCoordController,
1040+
)WorkspaceUpdatesController {
1041+
return&tunnelAllWorkspaceUpdatesController{logger:logger,coordCtrl:c}
1042+
}
1043+
8251044
// NewController creates a new Controller without running it
8261045
funcNewController(logger slog.Logger,dialerControlProtocolDialer,opts...ControllerOpt)*Controller {
8271046
c:=&Controller{

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp