@@ -2,22 +2,18 @@ package coderd
22
33import (
44"context"
5- "database/sql"
65"sync"
76
87"github.com/google/uuid"
98"golang.org/x/xerrors"
109
1110"github.com/coder/coder/v2/coderd/database"
1211"github.com/coder/coder/v2/coderd/database/pubsub"
13- "github.com/coder/coder/v2/coderd/util/slice"
1412"github.com/coder/coder/v2/codersdk"
1513"github.com/coder/coder/v2/tailnet"
1614"github.com/coder/coder/v2/tailnet/proto"
1715)
1816
19- type workspacesByOwner map [uuid.UUID ]workspacesByID
20-
2117type workspacesByID map [uuid.UUID ]ownedWorkspace
2218
2319type ownedWorkspace struct {
@@ -27,140 +23,147 @@ type ownedWorkspace struct {
2723Agents []database.AgentIDNamePair
2824}
2925
30- // Equal does not compare agents
31- func (w ownedWorkspace )Equal (other ownedWorkspace )bool {
32- return w .WorkspaceName == other .WorkspaceName &&
33- w .JobStatus == other .JobStatus &&
34- w .Transition == other .Transition
35- }
36-
37- func convertRows (v []database.GetWorkspacesAndAgentsRow )workspacesByOwner {
38- m := make (map [uuid.UUID ]workspacesByID )
39- for _ ,ws := range v {
40- owned := ownedWorkspace {
41- WorkspaceName :ws .Name ,
42- JobStatus :ws .JobStatus ,
43- Transition :ws .Transition ,
44- Agents :ws .Agents ,
45- }
46- if byID ,exists := m [ws .OwnerID ];! exists {
47- m [ws .OwnerID ]= map [uuid.UUID ]ownedWorkspace {ws .ID :owned }
48- }else {
49- byID [ws .ID ]= owned
50- m [ws .OwnerID ]= byID
51- }
52- }
53- return workspacesByOwner (m )
54- }
55-
5626func convertStatus (status database.ProvisionerJobStatus ,trans database.WorkspaceTransition ) proto.Workspace_Status {
5727wsStatus := codersdk .ConvertWorkspaceStatus (codersdk .ProvisionerJobStatus (status ),codersdk .WorkspaceTransition (trans ))
5828return tailnet .WorkspaceStatusToProto (wsStatus )
5929}
6030
6131type sub struct {
62- mu sync.Mutex
32+ mu sync.RWMutex
6333userID uuid.UUID
6434tx chan <- * proto.WorkspaceUpdate
65- prev workspacesByID
35+ state workspacesByID
36+
37+ db UpdateQuerier
38+ ps pubsub.Pubsub
39+
40+ cancelFn func ()
41+ }
42+
43+ func (s * sub )ownsAgent (agentID uuid.UUID )bool {
44+ s .mu .RLock ()
45+ defer s .mu .RUnlock ()
46+
47+ for _ ,ws := range s .state {
48+ for _ ,agent := range ws .Agents {
49+ if agent .ID == agentID {
50+ return true
51+ }
52+ }
53+ }
54+ return false
55+ }
56+
57+ func (s * sub )handleEvent (_ context.Context ,event codersdk.WorkspaceEvent ) {
58+ s .mu .Lock ()
59+ defer s .mu .Unlock ()
60+
61+ out := & proto.WorkspaceUpdate {
62+ UpsertedWorkspaces : []* proto.Workspace {},
63+ UpsertedAgents : []* proto.Agent {},
64+ DeletedWorkspaces : []* proto.Workspace {},
65+ DeletedAgents : []* proto.Agent {},
66+ }
67+
68+ switch event .Kind {
69+ case codersdk .WorkspaceEventKindNewAgent :
70+ out .UpsertedAgents = append (out .UpsertedAgents ,& proto.Agent {
71+ WorkspaceId :tailnet .UUIDToByteSlice (event .WorkspaceID ),
72+ Id :tailnet .UUIDToByteSlice (* event .AgentID ),
73+ Name :* event .AgentName ,
74+ })
75+ ws ,ok := s .state [event .WorkspaceID ]
76+ if ! ok {
77+ break
78+ }
79+ ws .Agents = append (ws .Agents , database.AgentIDNamePair {
80+ ID :* event .AgentID ,
81+ Name :* event .AgentName ,
82+ })
83+ s .state [event .WorkspaceID ]= ws
84+ case codersdk .WorkspaceEventKindStateChange :
85+ // TODO: One event for both upsertions and deletions
86+ default :
87+ return
88+ }
89+ s .tx <- out
90+ }
91+
92+ // start subscribes to updates for all workspaces owned by the user
93+ func (s * sub )start () (err error ) {
94+ s .mu .Lock ()
95+ defer s .mu .Unlock ()
96+
97+ rows ,err := s .db .GetWorkspacesAndAgentsByOwnerID (context .Background (),s .userID )
98+ if err != nil {
99+ return xerrors .Errorf ("get workspaces and agents by owner ID: %w" ,err )
100+ }
101+
102+ initUpdate := produceInitialUpdate (rows )
103+ s .tx <- initUpdate
104+ initState := convertRows (rows )
105+ s .state = initState
106+
107+ cancel ,err := s .ps .Subscribe (codersdk .WorkspaceEventChannel (s .userID ),codersdk .HandleWorkspaceEvent (s .handleEvent ))
108+ if err != nil {
109+ return xerrors .Errorf ("subscribe to workspace event channel: %w" ,err )
110+ }
111+
112+ s .cancelFn = cancel
113+ return nil
66114}
67115
68- func (s * sub )send ( all workspacesByOwner ) {
116+ func (s * sub )stop ( ) {
69117s .mu .Lock ()
70118defer s .mu .Unlock ()
71119
72- // Filter to only the workspaces owned by the user
73- latest := all [ s . userID ]
74- update := produceUpdate ( s . prev , latest )
75- s . prev = latest
76- s .tx <- update
120+ if s . cancelFn != nil {
121+ s . cancelFn ()
122+ }
123+
124+ close ( s .tx )
77125}
78126
79127type UpdateQuerier interface {
80- GetWorkspacesAndAgents (ctx context.Context ) ([]database.GetWorkspacesAndAgentsRow ,error )
128+ GetWorkspacesAndAgentsByOwnerID (ctx context.Context , ownerID uuid. UUID ) ([]database.GetWorkspacesAndAgentsByOwnerIDRow ,error )
81129}
82130
83131type updatesProvider struct {
84132mu sync.RWMutex
85- db UpdateQuerier
86- ps pubsub.Pubsub
87133// Peer ID -> subscription
88134subs map [uuid.UUID ]* sub
89- // Owner ID -> workspace ID -> workspace
90- latest workspacesByOwner
91- cancelFn func ()
135+
136+ db UpdateQuerier
137+ ps pubsub. Pubsub
92138}
93139
94- func (u * updatesProvider )IsOwner (userID uuid.UUID ,agentID uuid.UUID )error {
140+ func (u * updatesProvider )IsOwner (userID uuid.UUID ,agentID uuid.UUID )bool {
95141u .mu .RLock ()
96142defer u .mu .RUnlock ()
97143
98- workspaces ,exists := u .latest [userID ]
99- if ! exists {
100- return xerrors .Errorf ("workspace agent not found or you do not have permission: %w" ,sql .ErrNoRows )
101- }
102- for _ ,workspace := range workspaces {
103- for _ ,agent := range workspace .Agents {
104- if agent .ID == agentID {
105- return nil
106- }
144+ for _ ,sub := range u .subs {
145+ if sub .userID == userID && sub .ownsAgent (agentID ) {
146+ return true
107147}
108148}
109- return xerrors . Errorf ( "workspace agent not found or you do not have permission: %w" , sql . ErrNoRows )
149+ return false
110150}
111151
112152var _ tailnet.WorkspaceUpdatesProvider = (* updatesProvider )(nil )
113153
114- func NewUpdatesProvider (ctx context.Context ,db UpdateQuerier ,ps pubsub.Pubsub ) (tailnet.WorkspaceUpdatesProvider ,error ) {
115- rows ,err := db .GetWorkspacesAndAgents (ctx )
116- if err != nil && ! xerrors .Is (err ,sql .ErrNoRows ) {
117- return nil ,err
118- }
154+ func NewUpdatesProvider (_ context.Context ,db UpdateQuerier ,ps pubsub.Pubsub ) (tailnet.WorkspaceUpdatesProvider ,error ) {
119155out := & updatesProvider {
120- db :db ,
121- ps :ps ,
122- subs :map [uuid.UUID ]* sub {},
123- latest :convertRows (rows ),
124- }
125- cancel ,err := ps .Subscribe (codersdk .AllWorkspacesNotifyChannel ,out .handleUpdate )
126- if err != nil {
127- return nil ,err
156+ db :db ,
157+ ps :ps ,
158+ subs :map [uuid.UUID ]* sub {},
128159}
129- out .cancelFn = cancel
130160return out ,nil
131161}
132162
133163func (u * updatesProvider )Stop () {
134164for _ ,sub := range u .subs {
135- close ( sub .tx )
165+ sub .stop ( )
136166}
137- u .cancelFn ()
138- }
139-
140- func (u * updatesProvider )handleUpdate (ctx context.Context ,_ []byte ) {
141- rows ,err := u .db .GetWorkspacesAndAgents (ctx )
142- if err != nil && ! xerrors .Is (err ,sql .ErrNoRows ) {
143- // TODO: Log
144- return
145- }
146-
147- wg := & sync.WaitGroup {}
148- latest := convertRows (rows )
149- u .mu .RLock ()
150- for _ ,sub := range u .subs {
151- sub := sub
152- wg .Add (1 )
153- go func () {
154- sub .send (latest )
155- defer wg .Done ()
156- }()
157- }
158- u .mu .RUnlock ()
159-
160- u .mu .Lock ()
161- u .latest = latest
162- u .mu .Unlock ()
163- wg .Wait ()
164167}
165168
166169func (u * updatesProvider )Subscribe (peerID uuid.UUID ,userID uuid.UUID ) (<- chan * proto.WorkspaceUpdate ,error ) {
@@ -171,11 +174,17 @@ func (u *updatesProvider) Subscribe(peerID uuid.UUID, userID uuid.UUID) (<-chan
171174sub := & sub {
172175userID :userID ,
173176tx :tx ,
174- prev :make (workspacesByID ),
177+ db :u .db ,
178+ ps :u .ps ,
179+ state :workspacesByID {},
180+ }
181+ err := sub .start ()
182+ if err != nil {
183+ sub .stop ()
184+ return nil ,err
175185}
186+
176187u .subs [peerID ]= sub
177- // Write initial state
178- sub .send (u .latest )
179188return tx ,nil
180189}
181190
@@ -191,75 +200,40 @@ func (u *updatesProvider) Unsubscribe(peerID uuid.UUID) {
191200delete (u .subs ,peerID )
192201}
193202
194- func produceUpdate ( old , new workspacesByID )* proto.WorkspaceUpdate {
203+ func produceInitialUpdate ( rows []database. GetWorkspacesAndAgentsByOwnerIDRow )* proto.WorkspaceUpdate {
195204out := & proto.WorkspaceUpdate {
196205UpsertedWorkspaces : []* proto.Workspace {},
197206UpsertedAgents : []* proto.Agent {},
198207DeletedWorkspaces : []* proto.Workspace {},
199208DeletedAgents : []* proto.Agent {},
200209}
201210
202- for wsID ,newWorkspace := range new {
203- oldWorkspace ,exists := old [wsID ]
204- // Upsert both workspace and agents if the workspace is new
205- if ! exists {
206- out .UpsertedWorkspaces = append (out .UpsertedWorkspaces ,& proto.Workspace {
207- Id :tailnet .UUIDToByteSlice (wsID ),
208- Name :newWorkspace .WorkspaceName ,
209- Status :convertStatus (newWorkspace .JobStatus ,newWorkspace .Transition ),
210- })
211- for _ ,agent := range newWorkspace .Agents {
212- out .UpsertedAgents = append (out .UpsertedAgents ,& proto.Agent {
213- Id :tailnet .UUIDToByteSlice (agent .ID ),
214- Name :agent .Name ,
215- WorkspaceId :tailnet .UUIDToByteSlice (wsID ),
216- })
217- }
218- continue
219- }
220- // Upsert workspace if the workspace is updated
221- if ! newWorkspace .Equal (oldWorkspace ) {
222- out .UpsertedWorkspaces = append (out .UpsertedWorkspaces ,& proto.Workspace {
223- Id :tailnet .UUIDToByteSlice (wsID ),
224- Name :newWorkspace .WorkspaceName ,
225- Status :convertStatus (newWorkspace .JobStatus ,newWorkspace .Transition ),
226- })
227- }
228-
229- add ,remove := slice .SymmetricDifference (oldWorkspace .Agents ,newWorkspace .Agents )
230- for _ ,agent := range add {
211+ for _ ,row := range rows {
212+ out .UpsertedWorkspaces = append (out .UpsertedWorkspaces ,& proto.Workspace {
213+ Id :tailnet .UUIDToByteSlice (row .ID ),
214+ Name :row .Name ,
215+ Status :convertStatus (row .JobStatus ,row .Transition ),
216+ })
217+ for _ ,agent := range row .Agents {
231218out .UpsertedAgents = append (out .UpsertedAgents ,& proto.Agent {
232219Id :tailnet .UUIDToByteSlice (agent .ID ),
233220Name :agent .Name ,
234- WorkspaceId :tailnet .UUIDToByteSlice (wsID ),
235- })
236- }
237- for _ ,agent := range remove {
238- out .DeletedAgents = append (out .DeletedAgents ,& proto.Agent {
239- Id :tailnet .UUIDToByteSlice (agent .ID ),
240- Name :agent .Name ,
241- WorkspaceId :tailnet .UUIDToByteSlice (wsID ),
221+ WorkspaceId :tailnet .UUIDToByteSlice (row .ID ),
242222})
243223}
244224}
225+ return out
226+ }
245227
246- // Delete workspace and agents if the workspace is deleted
247- for wsID ,oldWorkspace := range old {
248- if _ ,exists := new [wsID ];! exists {
249- out .DeletedWorkspaces = append (out .DeletedWorkspaces ,& proto.Workspace {
250- Id :tailnet .UUIDToByteSlice (wsID ),
251- Name :oldWorkspace .WorkspaceName ,
252- Status :convertStatus (oldWorkspace .JobStatus ,oldWorkspace .Transition ),
253- })
254- for _ ,agent := range oldWorkspace .Agents {
255- out .DeletedAgents = append (out .DeletedAgents ,& proto.Agent {
256- Id :tailnet .UUIDToByteSlice (agent .ID ),
257- Name :agent .Name ,
258- WorkspaceId :tailnet .UUIDToByteSlice (wsID ),
259- })
260- }
228+ func convertRows (rows []database.GetWorkspacesAndAgentsByOwnerIDRow )workspacesByID {
229+ out := make (workspacesByID )
230+ for _ ,row := range rows {
231+ out [row .ID ]= ownedWorkspace {
232+ WorkspaceName :row .Name ,
233+ JobStatus :row .JobStatus ,
234+ Transition :row .Transition ,
235+ Agents :row .Agents ,
261236}
262237}
263-
264238return out
265239}