@@ -2,22 +2,18 @@ package coderd
2
2
3
3
import (
4
4
"context"
5
- "database/sql"
6
5
"sync"
7
6
8
7
"github.com/google/uuid"
9
8
"golang.org/x/xerrors"
10
9
11
10
"github.com/coder/coder/v2/coderd/database"
12
11
"github.com/coder/coder/v2/coderd/database/pubsub"
13
- "github.com/coder/coder/v2/coderd/util/slice"
14
12
"github.com/coder/coder/v2/codersdk"
15
13
"github.com/coder/coder/v2/tailnet"
16
14
"github.com/coder/coder/v2/tailnet/proto"
17
15
)
18
16
19
- type workspacesByOwner map [uuid.UUID ]workspacesByID
20
-
21
17
type workspacesByID map [uuid.UUID ]ownedWorkspace
22
18
23
19
type ownedWorkspace struct {
@@ -27,140 +23,147 @@ type ownedWorkspace struct {
27
23
Agents []database.AgentIDNamePair
28
24
}
29
25
30
- // Equal does not compare agents
31
- func (w ownedWorkspace )Equal (other ownedWorkspace )bool {
32
- return w .WorkspaceName == other .WorkspaceName &&
33
- w .JobStatus == other .JobStatus &&
34
- w .Transition == other .Transition
35
- }
36
-
37
- func convertRows (v []database.GetWorkspacesAndAgentsRow )workspacesByOwner {
38
- m := make (map [uuid.UUID ]workspacesByID )
39
- for _ ,ws := range v {
40
- owned := ownedWorkspace {
41
- WorkspaceName :ws .Name ,
42
- JobStatus :ws .JobStatus ,
43
- Transition :ws .Transition ,
44
- Agents :ws .Agents ,
45
- }
46
- if byID ,exists := m [ws .OwnerID ];! exists {
47
- m [ws .OwnerID ]= map [uuid.UUID ]ownedWorkspace {ws .ID :owned }
48
- }else {
49
- byID [ws .ID ]= owned
50
- m [ws .OwnerID ]= byID
51
- }
52
- }
53
- return workspacesByOwner (m )
54
- }
55
-
56
26
func convertStatus (status database.ProvisionerJobStatus ,trans database.WorkspaceTransition ) proto.Workspace_Status {
57
27
wsStatus := codersdk .ConvertWorkspaceStatus (codersdk .ProvisionerJobStatus (status ),codersdk .WorkspaceTransition (trans ))
58
28
return tailnet .WorkspaceStatusToProto (wsStatus )
59
29
}
60
30
61
31
type sub struct {
62
- mu sync.Mutex
32
+ mu sync.RWMutex
63
33
userID uuid.UUID
64
34
tx chan <- * proto.WorkspaceUpdate
65
- prev workspacesByID
35
+ state workspacesByID
36
+
37
+ db UpdateQuerier
38
+ ps pubsub.Pubsub
39
+
40
+ cancelFn func ()
41
+ }
42
+
43
+ func (s * sub )ownsAgent (agentID uuid.UUID )bool {
44
+ s .mu .RLock ()
45
+ defer s .mu .RUnlock ()
46
+
47
+ for _ ,ws := range s .state {
48
+ for _ ,agent := range ws .Agents {
49
+ if agent .ID == agentID {
50
+ return true
51
+ }
52
+ }
53
+ }
54
+ return false
55
+ }
56
+
57
+ func (s * sub )handleEvent (_ context.Context ,event codersdk.WorkspaceEvent ) {
58
+ s .mu .Lock ()
59
+ defer s .mu .Unlock ()
60
+
61
+ out := & proto.WorkspaceUpdate {
62
+ UpsertedWorkspaces : []* proto.Workspace {},
63
+ UpsertedAgents : []* proto.Agent {},
64
+ DeletedWorkspaces : []* proto.Workspace {},
65
+ DeletedAgents : []* proto.Agent {},
66
+ }
67
+
68
+ switch event .Kind {
69
+ case codersdk .WorkspaceEventKindNewAgent :
70
+ out .UpsertedAgents = append (out .UpsertedAgents ,& proto.Agent {
71
+ WorkspaceId :tailnet .UUIDToByteSlice (event .WorkspaceID ),
72
+ Id :tailnet .UUIDToByteSlice (* event .AgentID ),
73
+ Name :* event .AgentName ,
74
+ })
75
+ ws ,ok := s .state [event .WorkspaceID ]
76
+ if ! ok {
77
+ break
78
+ }
79
+ ws .Agents = append (ws .Agents , database.AgentIDNamePair {
80
+ ID :* event .AgentID ,
81
+ Name :* event .AgentName ,
82
+ })
83
+ s .state [event .WorkspaceID ]= ws
84
+ case codersdk .WorkspaceEventKindStateChange :
85
+ // TODO: One event for both upsertions and deletions
86
+ default :
87
+ return
88
+ }
89
+ s .tx <- out
90
+ }
91
+
92
+ // start subscribes to updates for all workspaces owned by the user
93
+ func (s * sub )start () (err error ) {
94
+ s .mu .Lock ()
95
+ defer s .mu .Unlock ()
96
+
97
+ rows ,err := s .db .GetWorkspacesAndAgentsByOwnerID (context .Background (),s .userID )
98
+ if err != nil {
99
+ return xerrors .Errorf ("get workspaces and agents by owner ID: %w" ,err )
100
+ }
101
+
102
+ initUpdate := produceInitialUpdate (rows )
103
+ s .tx <- initUpdate
104
+ initState := convertRows (rows )
105
+ s .state = initState
106
+
107
+ cancel ,err := s .ps .Subscribe (codersdk .WorkspaceEventChannel (s .userID ),codersdk .HandleWorkspaceEvent (s .handleEvent ))
108
+ if err != nil {
109
+ return xerrors .Errorf ("subscribe to workspace event channel: %w" ,err )
110
+ }
111
+
112
+ s .cancelFn = cancel
113
+ return nil
66
114
}
67
115
68
- func (s * sub )send ( all workspacesByOwner ) {
116
+ func (s * sub )stop ( ) {
69
117
s .mu .Lock ()
70
118
defer s .mu .Unlock ()
71
119
72
- // Filter to only the workspaces owned by the user
73
- latest := all [ s . userID ]
74
- update := produceUpdate ( s . prev , latest )
75
- s . prev = latest
76
- s .tx <- update
120
+ if s . cancelFn != nil {
121
+ s . cancelFn ()
122
+ }
123
+
124
+ close ( s .tx )
77
125
}
78
126
79
127
type UpdateQuerier interface {
80
- GetWorkspacesAndAgents (ctx context.Context ) ([]database.GetWorkspacesAndAgentsRow ,error )
128
+ GetWorkspacesAndAgentsByOwnerID (ctx context.Context , ownerID uuid. UUID ) ([]database.GetWorkspacesAndAgentsByOwnerIDRow ,error )
81
129
}
82
130
83
131
type updatesProvider struct {
84
132
mu sync.RWMutex
85
- db UpdateQuerier
86
- ps pubsub.Pubsub
87
133
// Peer ID -> subscription
88
134
subs map [uuid.UUID ]* sub
89
- // Owner ID -> workspace ID -> workspace
90
- latest workspacesByOwner
91
- cancelFn func ()
135
+
136
+ db UpdateQuerier
137
+ ps pubsub. Pubsub
92
138
}
93
139
94
- func (u * updatesProvider )IsOwner (userID uuid.UUID ,agentID uuid.UUID )error {
140
+ func (u * updatesProvider )IsOwner (userID uuid.UUID ,agentID uuid.UUID )bool {
95
141
u .mu .RLock ()
96
142
defer u .mu .RUnlock ()
97
143
98
- workspaces ,exists := u .latest [userID ]
99
- if ! exists {
100
- return xerrors .Errorf ("workspace agent not found or you do not have permission: %w" ,sql .ErrNoRows )
101
- }
102
- for _ ,workspace := range workspaces {
103
- for _ ,agent := range workspace .Agents {
104
- if agent .ID == agentID {
105
- return nil
106
- }
144
+ for _ ,sub := range u .subs {
145
+ if sub .userID == userID && sub .ownsAgent (agentID ) {
146
+ return true
107
147
}
108
148
}
109
- return xerrors . Errorf ( "workspace agent not found or you do not have permission: %w" , sql . ErrNoRows )
149
+ return false
110
150
}
111
151
112
152
var _ tailnet.WorkspaceUpdatesProvider = (* updatesProvider )(nil )
113
153
114
- func NewUpdatesProvider (ctx context.Context ,db UpdateQuerier ,ps pubsub.Pubsub ) (tailnet.WorkspaceUpdatesProvider ,error ) {
115
- rows ,err := db .GetWorkspacesAndAgents (ctx )
116
- if err != nil && ! xerrors .Is (err ,sql .ErrNoRows ) {
117
- return nil ,err
118
- }
154
+ func NewUpdatesProvider (_ context.Context ,db UpdateQuerier ,ps pubsub.Pubsub ) (tailnet.WorkspaceUpdatesProvider ,error ) {
119
155
out := & updatesProvider {
120
- db :db ,
121
- ps :ps ,
122
- subs :map [uuid.UUID ]* sub {},
123
- latest :convertRows (rows ),
124
- }
125
- cancel ,err := ps .Subscribe (codersdk .AllWorkspacesNotifyChannel ,out .handleUpdate )
126
- if err != nil {
127
- return nil ,err
156
+ db :db ,
157
+ ps :ps ,
158
+ subs :map [uuid.UUID ]* sub {},
128
159
}
129
- out .cancelFn = cancel
130
160
return out ,nil
131
161
}
132
162
133
163
func (u * updatesProvider )Stop () {
134
164
for _ ,sub := range u .subs {
135
- close ( sub .tx )
165
+ sub .stop ( )
136
166
}
137
- u .cancelFn ()
138
- }
139
-
140
- func (u * updatesProvider )handleUpdate (ctx context.Context ,_ []byte ) {
141
- rows ,err := u .db .GetWorkspacesAndAgents (ctx )
142
- if err != nil && ! xerrors .Is (err ,sql .ErrNoRows ) {
143
- // TODO: Log
144
- return
145
- }
146
-
147
- wg := & sync.WaitGroup {}
148
- latest := convertRows (rows )
149
- u .mu .RLock ()
150
- for _ ,sub := range u .subs {
151
- sub := sub
152
- wg .Add (1 )
153
- go func () {
154
- sub .send (latest )
155
- defer wg .Done ()
156
- }()
157
- }
158
- u .mu .RUnlock ()
159
-
160
- u .mu .Lock ()
161
- u .latest = latest
162
- u .mu .Unlock ()
163
- wg .Wait ()
164
167
}
165
168
166
169
func (u * updatesProvider )Subscribe (peerID uuid.UUID ,userID uuid.UUID ) (<- chan * proto.WorkspaceUpdate ,error ) {
@@ -171,11 +174,17 @@ func (u *updatesProvider) Subscribe(peerID uuid.UUID, userID uuid.UUID) (<-chan
171
174
sub := & sub {
172
175
userID :userID ,
173
176
tx :tx ,
174
- prev :make (workspacesByID ),
177
+ db :u .db ,
178
+ ps :u .ps ,
179
+ state :workspacesByID {},
180
+ }
181
+ err := sub .start ()
182
+ if err != nil {
183
+ sub .stop ()
184
+ return nil ,err
175
185
}
186
+
176
187
u .subs [peerID ]= sub
177
- // Write initial state
178
- sub .send (u .latest )
179
188
return tx ,nil
180
189
}
181
190
@@ -191,75 +200,40 @@ func (u *updatesProvider) Unsubscribe(peerID uuid.UUID) {
191
200
delete (u .subs ,peerID )
192
201
}
193
202
194
- func produceUpdate ( old , new workspacesByID )* proto.WorkspaceUpdate {
203
+ func produceInitialUpdate ( rows []database. GetWorkspacesAndAgentsByOwnerIDRow )* proto.WorkspaceUpdate {
195
204
out := & proto.WorkspaceUpdate {
196
205
UpsertedWorkspaces : []* proto.Workspace {},
197
206
UpsertedAgents : []* proto.Agent {},
198
207
DeletedWorkspaces : []* proto.Workspace {},
199
208
DeletedAgents : []* proto.Agent {},
200
209
}
201
210
202
- for wsID ,newWorkspace := range new {
203
- oldWorkspace ,exists := old [wsID ]
204
- // Upsert both workspace and agents if the workspace is new
205
- if ! exists {
206
- out .UpsertedWorkspaces = append (out .UpsertedWorkspaces ,& proto.Workspace {
207
- Id :tailnet .UUIDToByteSlice (wsID ),
208
- Name :newWorkspace .WorkspaceName ,
209
- Status :convertStatus (newWorkspace .JobStatus ,newWorkspace .Transition ),
210
- })
211
- for _ ,agent := range newWorkspace .Agents {
212
- out .UpsertedAgents = append (out .UpsertedAgents ,& proto.Agent {
213
- Id :tailnet .UUIDToByteSlice (agent .ID ),
214
- Name :agent .Name ,
215
- WorkspaceId :tailnet .UUIDToByteSlice (wsID ),
216
- })
217
- }
218
- continue
219
- }
220
- // Upsert workspace if the workspace is updated
221
- if ! newWorkspace .Equal (oldWorkspace ) {
222
- out .UpsertedWorkspaces = append (out .UpsertedWorkspaces ,& proto.Workspace {
223
- Id :tailnet .UUIDToByteSlice (wsID ),
224
- Name :newWorkspace .WorkspaceName ,
225
- Status :convertStatus (newWorkspace .JobStatus ,newWorkspace .Transition ),
226
- })
227
- }
228
-
229
- add ,remove := slice .SymmetricDifference (oldWorkspace .Agents ,newWorkspace .Agents )
230
- for _ ,agent := range add {
211
+ for _ ,row := range rows {
212
+ out .UpsertedWorkspaces = append (out .UpsertedWorkspaces ,& proto.Workspace {
213
+ Id :tailnet .UUIDToByteSlice (row .ID ),
214
+ Name :row .Name ,
215
+ Status :convertStatus (row .JobStatus ,row .Transition ),
216
+ })
217
+ for _ ,agent := range row .Agents {
231
218
out .UpsertedAgents = append (out .UpsertedAgents ,& proto.Agent {
232
219
Id :tailnet .UUIDToByteSlice (agent .ID ),
233
220
Name :agent .Name ,
234
- WorkspaceId :tailnet .UUIDToByteSlice (wsID ),
235
- })
236
- }
237
- for _ ,agent := range remove {
238
- out .DeletedAgents = append (out .DeletedAgents ,& proto.Agent {
239
- Id :tailnet .UUIDToByteSlice (agent .ID ),
240
- Name :agent .Name ,
241
- WorkspaceId :tailnet .UUIDToByteSlice (wsID ),
221
+ WorkspaceId :tailnet .UUIDToByteSlice (row .ID ),
242
222
})
243
223
}
244
224
}
225
+ return out
226
+ }
245
227
246
- // Delete workspace and agents if the workspace is deleted
247
- for wsID ,oldWorkspace := range old {
248
- if _ ,exists := new [wsID ];! exists {
249
- out .DeletedWorkspaces = append (out .DeletedWorkspaces ,& proto.Workspace {
250
- Id :tailnet .UUIDToByteSlice (wsID ),
251
- Name :oldWorkspace .WorkspaceName ,
252
- Status :convertStatus (oldWorkspace .JobStatus ,oldWorkspace .Transition ),
253
- })
254
- for _ ,agent := range oldWorkspace .Agents {
255
- out .DeletedAgents = append (out .DeletedAgents ,& proto.Agent {
256
- Id :tailnet .UUIDToByteSlice (agent .ID ),
257
- Name :agent .Name ,
258
- WorkspaceId :tailnet .UUIDToByteSlice (wsID ),
259
- })
260
- }
228
+ func convertRows (rows []database.GetWorkspacesAndAgentsByOwnerIDRow )workspacesByID {
229
+ out := make (workspacesByID )
230
+ for _ ,row := range rows {
231
+ out [row .ID ]= ownedWorkspace {
232
+ WorkspaceName :row .Name ,
233
+ JobStatus :row .JobStatus ,
234
+ Transition :row .Transition ,
235
+ Agents :row .Agents ,
261
236
}
262
237
}
263
-
264
238
return out
265
239
}