@@ -2,7 +2,7 @@ package coderd
22
33import (
44"context"
5- "database/sql "
5+ "fmt "
66"sync"
77
88"github.com/google/uuid"
@@ -16,8 +16,6 @@ import (
1616"github.com/coder/coder/v2/tailnet/proto"
1717)
1818
19- type workspacesByOwner map [uuid.UUID ]workspacesByID
20-
2119type workspacesByID map [uuid.UUID ]ownedWorkspace
2220
2321type ownedWorkspace struct {
@@ -34,133 +32,184 @@ func (w ownedWorkspace) Equal(other ownedWorkspace) bool {
3432w .Transition == other .Transition
3533}
3634
37- func convertRows (v []database.GetWorkspacesAndAgentsRow )workspacesByOwner {
38- m := make (map [uuid.UUID ]workspacesByID )
39- for _ ,ws := range v {
40- owned := ownedWorkspace {
41- WorkspaceName :ws .Name ,
42- JobStatus :ws .JobStatus ,
43- Transition :ws .Transition ,
44- Agents :ws .Agents ,
45- }
46- if byID ,exists := m [ws .OwnerID ];! exists {
47- m [ws .OwnerID ]= map [uuid.UUID ]ownedWorkspace {ws .ID :owned }
48- }else {
49- byID [ws .ID ]= owned
50- m [ws .OwnerID ]= byID
51- }
52- }
53- return workspacesByOwner (m )
54- }
55-
5635func convertStatus (status database.ProvisionerJobStatus ,trans database.WorkspaceTransition ) proto.Workspace_Status {
5736wsStatus := codersdk .ConvertWorkspaceStatus (codersdk .ProvisionerJobStatus (status ),codersdk .WorkspaceTransition (trans ))
5837return tailnet .WorkspaceStatusToProto (wsStatus )
5938}
6039
6140type sub struct {
62- mu sync.Mutex
41+ mu sync.RWMutex
6342userID uuid.UUID
6443tx chan <- * proto.WorkspaceUpdate
6544prev workspacesByID
45+
46+ db UpdateQuerier
47+ ps pubsub.Pubsub
48+
49+ cancelFns []func ()
50+ agentSubCancelFn func ()
51+ wsSubCancelFn func ()
6652}
6753
68- func (s * sub )send (all workspacesByOwner ) {
54+ func (s * sub )ownsAgent (agentID uuid.UUID )bool {
55+ s .mu .RLock ()
56+ defer s .mu .RUnlock ()
57+
58+ for _ ,ws := range s .prev {
59+ for _ ,agent := range ws .Agents {
60+ if agent .ID == agentID {
61+ return true
62+ }
63+ }
64+ }
65+ return false
66+ }
67+
68+ func (s * sub )handleUpdateEvent (ctx context.Context ,_ []byte ) {
69+ s .mu .Lock ()
70+ defer s .mu .Unlock ()
71+
72+ rows ,err := s .db .GetWorkspacesAndAgentsByOwnerID (ctx ,s .userID )
73+ if err != nil {
74+ return
75+ }
76+ s .handleRowsNoLock (rows )
77+ }
78+
79+ func (s * sub )handleInsertWorkspaceEvent (ctx context.Context ,wsIDStr []byte ) {
6980s .mu .Lock ()
7081defer s .mu .Unlock ()
7182
72- // Filter to only the workspaces owned by the user
73- latest := all [s .userID ]
83+ wsID ,err := uuid .Parse (string (wsIDStr ))
84+ if err != nil {
85+ return
86+ }
87+
88+ cancel ,err := s .ps .Subscribe (codersdk .WorkspaceNotifyChannel (wsID ),s .handleUpdateEvent )
89+ if err != nil {
90+ return
91+ }
92+ s .cancelFns = append (s .cancelFns ,cancel )
93+
94+ rows ,err := s .db .GetWorkspacesAndAgentsByOwnerID (ctx ,s .userID )
95+ if err != nil {
96+ return
97+ }
98+ s .handleRowsNoLock (rows )
99+ }
100+
101+ func (s * sub )handleInsertAgentEvent (ctx context.Context ,_ []byte ) {
102+ s .mu .Lock ()
103+ defer s .mu .Unlock ()
104+
105+ rows ,err := s .db .GetWorkspacesAndAgentsByOwnerID (ctx ,s .userID )
106+ if err != nil {
107+ return
108+ }
109+ s .handleRowsNoLock (rows )
110+ }
111+
112+ func (s * sub )handleRowsNoLock (rows []database.GetWorkspacesAndAgentsByOwnerIDRow ) {
113+ latest := convertRows (rows )
74114update := produceUpdate (s .prev ,latest )
75115s .prev = latest
76116s .tx <- update
77117}
78118
119+ // start subscribes to updates for all workspaces owned by the user
120+ func (s * sub )start () (err error ) {
121+ s .mu .Lock ()
122+ defer s .mu .Unlock ()
123+
124+ rows ,err := s .db .GetWorkspacesAndAgentsByOwnerID (context .Background (),s .userID )
125+ if err != nil {
126+ return xerrors .Errorf ("get workspaces and agents by owner ID: %w" ,err )
127+ }
128+
129+ // Send initial state
130+ s .handleRowsNoLock (rows )
131+
132+ for _ ,row := range rows {
133+ cancel ,err := s .ps .Subscribe (codersdk .WorkspaceNotifyChannel (row .ID ),s .handleUpdateEvent )
134+ if err != nil {
135+ return xerrors .Errorf ("subscribe to workspace notify channel: %w" ,err )
136+ }
137+ s .cancelFns = append (s .cancelFns ,cancel )
138+ }
139+
140+ cancel ,err := s .ps .Subscribe (WorkspaceInsertChannel (s .userID ),s .handleInsertWorkspaceEvent )
141+ if err != nil {
142+ return xerrors .Errorf ("subscribe to new workspace channel: %w" ,err )
143+ }
144+ s .wsSubCancelFn = cancel
145+
146+ cancel ,err = s .ps .Subscribe (fmt .Sprintf ("new_agent:%s" ,s .userID .String ()),s .handleInsertAgentEvent )
147+ if err != nil {
148+ return xerrors .Errorf ("subscribe to new agent channel: %w" ,err )
149+ }
150+ s .agentSubCancelFn = cancel
151+ return nil
152+ }
153+
154+ func (s * sub )stop () {
155+ s .mu .Lock ()
156+ defer s .mu .Unlock ()
157+
158+ for _ ,cancel := range s .cancelFns {
159+ cancel ()
160+ }
161+
162+ if s .wsSubCancelFn != nil {
163+ s .wsSubCancelFn ()
164+ }
165+
166+ if s .agentSubCancelFn != nil {
167+ s .agentSubCancelFn ()
168+ }
169+
170+ close (s .tx )
171+ }
172+
79173type UpdateQuerier interface {
80- GetWorkspacesAndAgents (ctx context.Context ) ([]database.GetWorkspacesAndAgentsRow ,error )
174+ GetWorkspacesAndAgentsByOwnerID (ctx context.Context , ownerID uuid. UUID ) ([]database.GetWorkspacesAndAgentsByOwnerIDRow ,error )
81175}
82176
83177type updatesProvider struct {
84178mu sync.RWMutex
85- db UpdateQuerier
86- ps pubsub.Pubsub
87179// Peer ID -> subscription
88180subs map [uuid.UUID ]* sub
89- // Owner ID -> workspace ID -> workspace
90- latest workspacesByOwner
91- cancelFn func ()
181+
182+ db UpdateQuerier
183+ ps pubsub. Pubsub
92184}
93185
94- func (u * updatesProvider )IsOwner (userID uuid.UUID ,agentID uuid.UUID )error {
186+ func (u * updatesProvider )IsOwner (userID uuid.UUID ,agentID uuid.UUID )bool {
95187u .mu .RLock ()
96188defer u .mu .RUnlock ()
97189
98- workspaces ,exists := u .latest [userID ]
99- if ! exists {
100- return xerrors .Errorf ("workspace agent not found or you do not have permission: %w" ,sql .ErrNoRows )
101- }
102- for _ ,workspace := range workspaces {
103- for _ ,agent := range workspace .Agents {
104- if agent .ID == agentID {
105- return nil
106- }
190+ for _ ,sub := range u .subs {
191+ if sub .userID == userID && sub .ownsAgent (agentID ) {
192+ return true
107193}
108194}
109- return xerrors . Errorf ( "workspace agent not found or you do not have permission: %w" , sql . ErrNoRows )
195+ return false
110196}
111197
112198var _ tailnet.WorkspaceUpdatesProvider = (* updatesProvider )(nil )
113199
114- func NewUpdatesProvider (ctx context.Context ,db UpdateQuerier ,ps pubsub.Pubsub ) (tailnet.WorkspaceUpdatesProvider ,error ) {
115- rows ,err := db .GetWorkspacesAndAgents (ctx )
116- if err != nil && ! xerrors .Is (err ,sql .ErrNoRows ) {
117- return nil ,err
118- }
200+ func NewUpdatesProvider (_ context.Context ,db UpdateQuerier ,ps pubsub.Pubsub ) (tailnet.WorkspaceUpdatesProvider ,error ) {
119201out := & updatesProvider {
120- db :db ,
121- ps :ps ,
122- subs :map [uuid.UUID ]* sub {},
123- latest :convertRows (rows ),
124- }
125- cancel ,err := ps .Subscribe (codersdk .AllWorkspacesNotifyChannel ,out .handleUpdate )
126- if err != nil {
127- return nil ,err
202+ db :db ,
203+ ps :ps ,
204+ subs :map [uuid.UUID ]* sub {},
128205}
129- out .cancelFn = cancel
130206return out ,nil
131207}
132208
133209func (u * updatesProvider )Stop () {
134210for _ ,sub := range u .subs {
135- close (sub .tx )
136- }
137- u .cancelFn ()
138- }
139-
140- func (u * updatesProvider )handleUpdate (ctx context.Context ,_ []byte ) {
141- rows ,err := u .db .GetWorkspacesAndAgents (ctx )
142- if err != nil && ! xerrors .Is (err ,sql .ErrNoRows ) {
143- // TODO: Log
144- return
145- }
146-
147- wg := & sync.WaitGroup {}
148- latest := convertRows (rows )
149- u .mu .RLock ()
150- for _ ,sub := range u .subs {
151- sub := sub
152- wg .Add (1 )
153- go func () {
154- sub .send (latest )
155- defer wg .Done ()
156- }()
211+ sub .stop ()
157212}
158- u .mu .RUnlock ()
159-
160- u .mu .Lock ()
161- u .latest = latest
162- u .mu .Unlock ()
163- wg .Wait ()
164213}
165214
166215func (u * updatesProvider )Subscribe (peerID uuid.UUID ,userID uuid.UUID ) (<- chan * proto.WorkspaceUpdate ,error ) {
@@ -169,13 +218,20 @@ func (u *updatesProvider) Subscribe(peerID uuid.UUID, userID uuid.UUID) (<-chan
169218
170219tx := make (chan * proto.WorkspaceUpdate ,1 )
171220sub := & sub {
172- userID :userID ,
173- tx :tx ,
174- prev :make (workspacesByID ),
221+ userID :userID ,
222+ tx :tx ,
223+ db :u .db ,
224+ ps :u .ps ,
225+ prev :workspacesByID {},
226+ cancelFns : []func (){},
227+ }
228+ err := sub .start ()
229+ if err != nil {
230+ sub .stop ()
231+ return nil ,err
175232}
233+
176234u .subs [peerID ]= sub
177- // Write initial state
178- sub .send (u .latest )
179235return tx ,nil
180236}
181237
@@ -263,3 +319,24 @@ func produceUpdate(old, new workspacesByID) *proto.WorkspaceUpdate {
263319
264320return out
265321}
322+
323+ func convertRows (rows []database.GetWorkspacesAndAgentsByOwnerIDRow )workspacesByID {
324+ out := make (workspacesByID )
325+ for _ ,row := range rows {
326+ out [row .ID ]= ownedWorkspace {
327+ WorkspaceName :row .Name ,
328+ JobStatus :row .JobStatus ,
329+ Transition :row .Transition ,
330+ Agents :row .Agents ,
331+ }
332+ }
333+ return out
334+ }
335+
336+ func WorkspaceInsertChannel (userID uuid.UUID )string {
337+ return fmt .Sprintf ("new_workspace:%s" ,userID .String ())
338+ }
339+
340+ func AgentInsertChannel (userID uuid.UUID )string {
341+ return fmt .Sprintf ("new_agent:%s" ,userID .String ())
342+ }