@@ -40,6 +40,7 @@ func convertRows(v []database.GetWorkspacesAndAgentsRow) workspacesByOwner {
4040WorkspaceName :ws .Name ,
4141JobStatus :ws .JobStatus ,
4242Transition :ws .Transition ,
43+ Agents :ws .Agents ,
4344}
4445if byID ,exists := m [ws .OwnerID ];! exists {
4546m [ws .OwnerID ]= map [uuid.UUID ]ownedWorkspace {ws .ID :owned }
@@ -68,14 +69,16 @@ func (s *sub) send(all workspacesByOwner) {
6869defer s .mu .Unlock ()
6970
7071// Filter to only the workspaces owned by the user
71- own := all [s .userID ]
72- update := produceUpdate (s .prev ,own )
73- s .prev = own
72+ latest := all [s .userID ]
73+ update := produceUpdate (s .prev ,latest )
74+ s .prev = latest
7475s .tx <- update
7576}
7677
7778type WorkspaceUpdatesProvider interface {
78- Subscribe (userID uuid.UUID ) (<- chan * proto.WorkspaceUpdate ,error )
79+ Subscribe (peerID uuid.UUID ,userID uuid.UUID ) (<- chan * proto.WorkspaceUpdate ,error )
80+ Unsubscribe (peerID uuid.UUID )
81+ Stop ()
7982}
8083
8184type WorkspaceStore interface {
@@ -84,25 +87,40 @@ type WorkspaceStore interface {
8487}
8588
8689type updatesProvider struct {
87- mu sync.RWMutex
88- db WorkspaceStore
89- ps pubsub.Pubsub
90- subs []* sub
90+ mu sync.RWMutex
91+ db WorkspaceStore
92+ ps pubsub.Pubsub
93+ // Peer ID -> subscription
94+ subs map [uuid.UUID ]* sub
95+ latest workspacesByOwner
9196cancelFn func ()
9297}
9398
9499var _ WorkspaceUpdatesProvider = (* updatesProvider )(nil )
95100
96- func (u * updatesProvider )Start ()error {
97- cancel ,err := u .ps .Subscribe (codersdk .AllWorkspacesNotifyChannel ,u .handleUpdate )
101+ func NewUpdatesProvider (ctx context.Context ,db WorkspaceStore ,ps pubsub.Pubsub ) (WorkspaceUpdatesProvider ,error ) {
102+ rows ,err := db .GetWorkspacesAndAgents (ctx )
103+ if err != nil && ! xerrors .Is (err ,sql .ErrNoRows ) {
104+ return nil ,err
105+ }
106+ out := & updatesProvider {
107+ db :db ,
108+ ps :ps ,
109+ subs :map [uuid.UUID ]* sub {},
110+ latest :convertRows (rows ),
111+ }
112+ cancel ,err := ps .Subscribe (codersdk .AllWorkspacesNotifyChannel ,out .handleUpdate )
98113if err != nil {
99- return err
114+ return nil , err
100115}
101- u .cancelFn = cancel
102- return nil
116+ out .cancelFn = cancel
117+ return out , nil
103118}
104119
105120func (u * updatesProvider )Stop () {
121+ for _ ,sub := range u .subs {
122+ close (sub .tx )
123+ }
106124u .cancelFn ()
107125}
108126
@@ -116,7 +134,6 @@ func (u *updatesProvider) handleUpdate(ctx context.Context, _ []byte) {
116134wg := & sync.WaitGroup {}
117135latest := convertRows (rows )
118136u .mu .RLock ()
119- defer u .mu .RUnlock ()
120137for _ ,sub := range u .subs {
121138sub := sub
122139wg .Add (1 )
@@ -125,18 +142,15 @@ func (u *updatesProvider) handleUpdate(ctx context.Context, _ []byte) {
125142defer wg .Done ()
126143}()
127144}
128- wg .Wait ()
129- }
145+ u .mu .RUnlock ()
130146
131- func NewUpdatesProvider (db WorkspaceStore ,ps pubsub.Pubsub )WorkspaceUpdatesProvider {
132- return & updatesProvider {
133- db :db ,
134- ps :ps ,
135- subs :make ([]* sub ,0 ),
136- }
147+ u .mu .Lock ()
148+ u .latest = latest
149+ u .mu .Unlock ()
150+ wg .Wait ()
137151}
138152
139- func (u * updatesProvider )Subscribe (userID uuid.UUID ) (<- chan * proto.WorkspaceUpdate ,error ) {
153+ func (u * updatesProvider )Subscribe (peerID uuid. UUID , userID uuid.UUID ) (<- chan * proto.WorkspaceUpdate ,error ) {
140154u .mu .Lock ()
141155defer u .mu .Unlock ()
142156
@@ -146,10 +160,25 @@ func (u *updatesProvider) Subscribe(userID uuid.UUID) (<-chan *proto.WorkspaceUp
146160tx :tx ,
147161prev :make (workspacesByID ),
148162}
149- u .subs = append (u .subs ,sub )
163+ u .subs [peerID ]= sub
164+ // Write initial state
165+ sub .send (u .latest )
150166return tx ,nil
151167}
152168
169+ func (u * updatesProvider )Unsubscribe (peerID uuid.UUID ) {
170+ u .mu .Lock ()
171+ defer u .mu .Unlock ()
172+
173+ sub ,exists := u .subs [peerID ]
174+ if ! exists {
175+ return
176+ }
177+ close (sub .tx )
178+ delete (u .subs ,peerID )
179+ return
180+ }
181+
153182func produceUpdate (old ,new workspacesByID )* proto.WorkspaceUpdate {
154183out := & proto.WorkspaceUpdate {
155184UpsertedWorkspaces : []* proto.Workspace {},