@@ -2,7 +2,7 @@ package coderd
2
2
3
3
import (
4
4
"context"
5
- "database/sql "
5
+ "fmt "
6
6
"sync"
7
7
8
8
"github.com/google/uuid"
@@ -16,8 +16,6 @@ import (
16
16
"github.com/coder/coder/v2/tailnet/proto"
17
17
)
18
18
19
- type workspacesByOwner map [uuid.UUID ]workspacesByID
20
-
21
19
type workspacesByID map [uuid.UUID ]ownedWorkspace
22
20
23
21
type ownedWorkspace struct {
@@ -34,133 +32,184 @@ func (w ownedWorkspace) Equal(other ownedWorkspace) bool {
34
32
w .Transition == other .Transition
35
33
}
36
34
37
- func convertRows (v []database.GetWorkspacesAndAgentsRow )workspacesByOwner {
38
- m := make (map [uuid.UUID ]workspacesByID )
39
- for _ ,ws := range v {
40
- owned := ownedWorkspace {
41
- WorkspaceName :ws .Name ,
42
- JobStatus :ws .JobStatus ,
43
- Transition :ws .Transition ,
44
- Agents :ws .Agents ,
45
- }
46
- if byID ,exists := m [ws .OwnerID ];! exists {
47
- m [ws .OwnerID ]= map [uuid.UUID ]ownedWorkspace {ws .ID :owned }
48
- }else {
49
- byID [ws .ID ]= owned
50
- m [ws .OwnerID ]= byID
51
- }
52
- }
53
- return workspacesByOwner (m )
54
- }
55
-
56
35
func convertStatus (status database.ProvisionerJobStatus ,trans database.WorkspaceTransition ) proto.Workspace_Status {
57
36
wsStatus := codersdk .ConvertWorkspaceStatus (codersdk .ProvisionerJobStatus (status ),codersdk .WorkspaceTransition (trans ))
58
37
return tailnet .WorkspaceStatusToProto (wsStatus )
59
38
}
60
39
61
40
type sub struct {
62
- mu sync.Mutex
41
+ mu sync.RWMutex
63
42
userID uuid.UUID
64
43
tx chan <- * proto.WorkspaceUpdate
65
44
prev workspacesByID
45
+
46
+ db UpdateQuerier
47
+ ps pubsub.Pubsub
48
+
49
+ cancelFns []func ()
50
+ agentSubCancelFn func ()
51
+ wsSubCancelFn func ()
66
52
}
67
53
68
- func (s * sub )send (all workspacesByOwner ) {
54
+ func (s * sub )ownsAgent (agentID uuid.UUID )bool {
55
+ s .mu .RLock ()
56
+ defer s .mu .RUnlock ()
57
+
58
+ for _ ,ws := range s .prev {
59
+ for _ ,agent := range ws .Agents {
60
+ if agent .ID == agentID {
61
+ return true
62
+ }
63
+ }
64
+ }
65
+ return false
66
+ }
67
+
68
+ func (s * sub )handleUpdateEvent (ctx context.Context ,_ []byte ) {
69
+ s .mu .Lock ()
70
+ defer s .mu .Unlock ()
71
+
72
+ rows ,err := s .db .GetWorkspacesAndAgentsByOwnerID (ctx ,s .userID )
73
+ if err != nil {
74
+ return
75
+ }
76
+ s .handleRowsNoLock (rows )
77
+ }
78
+
79
+ func (s * sub )handleInsertWorkspaceEvent (ctx context.Context ,wsIDStr []byte ) {
69
80
s .mu .Lock ()
70
81
defer s .mu .Unlock ()
71
82
72
- // Filter to only the workspaces owned by the user
73
- latest := all [s .userID ]
83
+ wsID ,err := uuid .Parse (string (wsIDStr ))
84
+ if err != nil {
85
+ return
86
+ }
87
+
88
+ cancel ,err := s .ps .Subscribe (codersdk .WorkspaceNotifyChannel (wsID ),s .handleUpdateEvent )
89
+ if err != nil {
90
+ return
91
+ }
92
+ s .cancelFns = append (s .cancelFns ,cancel )
93
+
94
+ rows ,err := s .db .GetWorkspacesAndAgentsByOwnerID (ctx ,s .userID )
95
+ if err != nil {
96
+ return
97
+ }
98
+ s .handleRowsNoLock (rows )
99
+ }
100
+
101
+ func (s * sub )handleInsertAgentEvent (ctx context.Context ,_ []byte ) {
102
+ s .mu .Lock ()
103
+ defer s .mu .Unlock ()
104
+
105
+ rows ,err := s .db .GetWorkspacesAndAgentsByOwnerID (ctx ,s .userID )
106
+ if err != nil {
107
+ return
108
+ }
109
+ s .handleRowsNoLock (rows )
110
+ }
111
+
112
+ func (s * sub )handleRowsNoLock (rows []database.GetWorkspacesAndAgentsByOwnerIDRow ) {
113
+ latest := convertRows (rows )
74
114
update := produceUpdate (s .prev ,latest )
75
115
s .prev = latest
76
116
s .tx <- update
77
117
}
78
118
119
+ // start subscribes to updates for all workspaces owned by the user
120
+ func (s * sub )start () (err error ) {
121
+ s .mu .Lock ()
122
+ defer s .mu .Unlock ()
123
+
124
+ rows ,err := s .db .GetWorkspacesAndAgentsByOwnerID (context .Background (),s .userID )
125
+ if err != nil {
126
+ return xerrors .Errorf ("get workspaces and agents by owner ID: %w" ,err )
127
+ }
128
+
129
+ // Send initial state
130
+ s .handleRowsNoLock (rows )
131
+
132
+ for _ ,row := range rows {
133
+ cancel ,err := s .ps .Subscribe (codersdk .WorkspaceNotifyChannel (row .ID ),s .handleUpdateEvent )
134
+ if err != nil {
135
+ return xerrors .Errorf ("subscribe to workspace notify channel: %w" ,err )
136
+ }
137
+ s .cancelFns = append (s .cancelFns ,cancel )
138
+ }
139
+
140
+ cancel ,err := s .ps .Subscribe (WorkspaceInsertChannel (s .userID ),s .handleInsertWorkspaceEvent )
141
+ if err != nil {
142
+ return xerrors .Errorf ("subscribe to new workspace channel: %w" ,err )
143
+ }
144
+ s .wsSubCancelFn = cancel
145
+
146
+ cancel ,err = s .ps .Subscribe (fmt .Sprintf ("new_agent:%s" ,s .userID .String ()),s .handleInsertAgentEvent )
147
+ if err != nil {
148
+ return xerrors .Errorf ("subscribe to new agent channel: %w" ,err )
149
+ }
150
+ s .agentSubCancelFn = cancel
151
+ return nil
152
+ }
153
+
154
+ func (s * sub )stop () {
155
+ s .mu .Lock ()
156
+ defer s .mu .Unlock ()
157
+
158
+ for _ ,cancel := range s .cancelFns {
159
+ cancel ()
160
+ }
161
+
162
+ if s .wsSubCancelFn != nil {
163
+ s .wsSubCancelFn ()
164
+ }
165
+
166
+ if s .agentSubCancelFn != nil {
167
+ s .agentSubCancelFn ()
168
+ }
169
+
170
+ close (s .tx )
171
+ }
172
+
79
173
type UpdateQuerier interface {
80
- GetWorkspacesAndAgents (ctx context.Context ) ([]database.GetWorkspacesAndAgentsRow ,error )
174
+ GetWorkspacesAndAgentsByOwnerID (ctx context.Context , ownerID uuid. UUID ) ([]database.GetWorkspacesAndAgentsByOwnerIDRow ,error )
81
175
}
82
176
83
177
type updatesProvider struct {
84
178
mu sync.RWMutex
85
- db UpdateQuerier
86
- ps pubsub.Pubsub
87
179
// Peer ID -> subscription
88
180
subs map [uuid.UUID ]* sub
89
- // Owner ID -> workspace ID -> workspace
90
- latest workspacesByOwner
91
- cancelFn func ()
181
+
182
+ db UpdateQuerier
183
+ ps pubsub. Pubsub
92
184
}
93
185
94
- func (u * updatesProvider )IsOwner (userID uuid.UUID ,agentID uuid.UUID )error {
186
+ func (u * updatesProvider )IsOwner (userID uuid.UUID ,agentID uuid.UUID )bool {
95
187
u .mu .RLock ()
96
188
defer u .mu .RUnlock ()
97
189
98
- workspaces ,exists := u .latest [userID ]
99
- if ! exists {
100
- return xerrors .Errorf ("workspace agent not found or you do not have permission: %w" ,sql .ErrNoRows )
101
- }
102
- for _ ,workspace := range workspaces {
103
- for _ ,agent := range workspace .Agents {
104
- if agent .ID == agentID {
105
- return nil
106
- }
190
+ for _ ,sub := range u .subs {
191
+ if sub .userID == userID && sub .ownsAgent (agentID ) {
192
+ return true
107
193
}
108
194
}
109
- return xerrors . Errorf ( "workspace agent not found or you do not have permission: %w" , sql . ErrNoRows )
195
+ return false
110
196
}
111
197
112
198
var _ tailnet.WorkspaceUpdatesProvider = (* updatesProvider )(nil )
113
199
114
- func NewUpdatesProvider (ctx context.Context ,db UpdateQuerier ,ps pubsub.Pubsub ) (tailnet.WorkspaceUpdatesProvider ,error ) {
115
- rows ,err := db .GetWorkspacesAndAgents (ctx )
116
- if err != nil && ! xerrors .Is (err ,sql .ErrNoRows ) {
117
- return nil ,err
118
- }
200
+ func NewUpdatesProvider (_ context.Context ,db UpdateQuerier ,ps pubsub.Pubsub ) (tailnet.WorkspaceUpdatesProvider ,error ) {
119
201
out := & updatesProvider {
120
- db :db ,
121
- ps :ps ,
122
- subs :map [uuid.UUID ]* sub {},
123
- latest :convertRows (rows ),
124
- }
125
- cancel ,err := ps .Subscribe (codersdk .AllWorkspacesNotifyChannel ,out .handleUpdate )
126
- if err != nil {
127
- return nil ,err
202
+ db :db ,
203
+ ps :ps ,
204
+ subs :map [uuid.UUID ]* sub {},
128
205
}
129
- out .cancelFn = cancel
130
206
return out ,nil
131
207
}
132
208
133
209
func (u * updatesProvider )Stop () {
134
210
for _ ,sub := range u .subs {
135
- close (sub .tx )
136
- }
137
- u .cancelFn ()
138
- }
139
-
140
- func (u * updatesProvider )handleUpdate (ctx context.Context ,_ []byte ) {
141
- rows ,err := u .db .GetWorkspacesAndAgents (ctx )
142
- if err != nil && ! xerrors .Is (err ,sql .ErrNoRows ) {
143
- // TODO: Log
144
- return
145
- }
146
-
147
- wg := & sync.WaitGroup {}
148
- latest := convertRows (rows )
149
- u .mu .RLock ()
150
- for _ ,sub := range u .subs {
151
- sub := sub
152
- wg .Add (1 )
153
- go func () {
154
- sub .send (latest )
155
- defer wg .Done ()
156
- }()
211
+ sub .stop ()
157
212
}
158
- u .mu .RUnlock ()
159
-
160
- u .mu .Lock ()
161
- u .latest = latest
162
- u .mu .Unlock ()
163
- wg .Wait ()
164
213
}
165
214
166
215
func (u * updatesProvider )Subscribe (peerID uuid.UUID ,userID uuid.UUID ) (<- chan * proto.WorkspaceUpdate ,error ) {
@@ -169,13 +218,20 @@ func (u *updatesProvider) Subscribe(peerID uuid.UUID, userID uuid.UUID) (<-chan
169
218
170
219
tx := make (chan * proto.WorkspaceUpdate ,1 )
171
220
sub := & sub {
172
- userID :userID ,
173
- tx :tx ,
174
- prev :make (workspacesByID ),
221
+ userID :userID ,
222
+ tx :tx ,
223
+ db :u .db ,
224
+ ps :u .ps ,
225
+ prev :workspacesByID {},
226
+ cancelFns : []func (){},
227
+ }
228
+ err := sub .start ()
229
+ if err != nil {
230
+ sub .stop ()
231
+ return nil ,err
175
232
}
233
+
176
234
u .subs [peerID ]= sub
177
- // Write initial state
178
- sub .send (u .latest )
179
235
return tx ,nil
180
236
}
181
237
@@ -263,3 +319,24 @@ func produceUpdate(old, new workspacesByID) *proto.WorkspaceUpdate {
263
319
264
320
return out
265
321
}
322
+
323
+ func convertRows (rows []database.GetWorkspacesAndAgentsByOwnerIDRow )workspacesByID {
324
+ out := make (workspacesByID )
325
+ for _ ,row := range rows {
326
+ out [row .ID ]= ownedWorkspace {
327
+ WorkspaceName :row .Name ,
328
+ JobStatus :row .JobStatus ,
329
+ Transition :row .Transition ,
330
+ Agents :row .Agents ,
331
+ }
332
+ }
333
+ return out
334
+ }
335
+
336
+ func WorkspaceInsertChannel (userID uuid.UUID )string {
337
+ return fmt .Sprintf ("new_workspace:%s" ,userID .String ())
338
+ }
339
+
340
+ func AgentInsertChannel (userID uuid.UUID )string {
341
+ return fmt .Sprintf ("new_agent:%s" ,userID .String ())
342
+ }