@@ -40,6 +40,7 @@ func convertRows(v []database.GetWorkspacesAndAgentsRow) workspacesByOwner {
40
40
WorkspaceName :ws .Name ,
41
41
JobStatus :ws .JobStatus ,
42
42
Transition :ws .Transition ,
43
+ Agents :ws .Agents ,
43
44
}
44
45
if byID ,exists := m [ws .OwnerID ];! exists {
45
46
m [ws .OwnerID ]= map [uuid.UUID ]ownedWorkspace {ws .ID :owned }
@@ -68,14 +69,16 @@ func (s *sub) send(all workspacesByOwner) {
68
69
defer s .mu .Unlock ()
69
70
70
71
// Filter to only the workspaces owned by the user
71
- own := all [s .userID ]
72
- update := produceUpdate (s .prev ,own )
73
- s .prev = own
72
+ latest := all [s .userID ]
73
+ update := produceUpdate (s .prev ,latest )
74
+ s .prev = latest
74
75
s .tx <- update
75
76
}
76
77
77
78
type WorkspaceUpdatesProvider interface {
78
- Subscribe (userID uuid.UUID ) (<- chan * proto.WorkspaceUpdate ,error )
79
+ Subscribe (peerID uuid.UUID ,userID uuid.UUID ) (<- chan * proto.WorkspaceUpdate ,error )
80
+ Unsubscribe (peerID uuid.UUID )
81
+ Stop ()
79
82
}
80
83
81
84
type WorkspaceStore interface {
@@ -84,25 +87,40 @@ type WorkspaceStore interface {
84
87
}
85
88
86
89
type updatesProvider struct {
87
- mu sync.RWMutex
88
- db WorkspaceStore
89
- ps pubsub.Pubsub
90
- subs []* sub
90
+ mu sync.RWMutex
91
+ db WorkspaceStore
92
+ ps pubsub.Pubsub
93
+ // Peer ID -> subscription
94
+ subs map [uuid.UUID ]* sub
95
+ latest workspacesByOwner
91
96
cancelFn func ()
92
97
}
93
98
94
99
var _ WorkspaceUpdatesProvider = (* updatesProvider )(nil )
95
100
96
- func (u * updatesProvider )Start ()error {
97
- cancel ,err := u .ps .Subscribe (codersdk .AllWorkspacesNotifyChannel ,u .handleUpdate )
101
+ func NewUpdatesProvider (ctx context.Context ,db WorkspaceStore ,ps pubsub.Pubsub ) (WorkspaceUpdatesProvider ,error ) {
102
+ rows ,err := db .GetWorkspacesAndAgents (ctx )
103
+ if err != nil && ! xerrors .Is (err ,sql .ErrNoRows ) {
104
+ return nil ,err
105
+ }
106
+ out := & updatesProvider {
107
+ db :db ,
108
+ ps :ps ,
109
+ subs :map [uuid.UUID ]* sub {},
110
+ latest :convertRows (rows ),
111
+ }
112
+ cancel ,err := ps .Subscribe (codersdk .AllWorkspacesNotifyChannel ,out .handleUpdate )
98
113
if err != nil {
99
- return err
114
+ return nil , err
100
115
}
101
- u .cancelFn = cancel
102
- return nil
116
+ out .cancelFn = cancel
117
+ return out , nil
103
118
}
104
119
105
120
func (u * updatesProvider )Stop () {
121
+ for _ ,sub := range u .subs {
122
+ close (sub .tx )
123
+ }
106
124
u .cancelFn ()
107
125
}
108
126
@@ -116,7 +134,6 @@ func (u *updatesProvider) handleUpdate(ctx context.Context, _ []byte) {
116
134
wg := & sync.WaitGroup {}
117
135
latest := convertRows (rows )
118
136
u .mu .RLock ()
119
- defer u .mu .RUnlock ()
120
137
for _ ,sub := range u .subs {
121
138
sub := sub
122
139
wg .Add (1 )
@@ -125,18 +142,15 @@ func (u *updatesProvider) handleUpdate(ctx context.Context, _ []byte) {
125
142
defer wg .Done ()
126
143
}()
127
144
}
128
- wg .Wait ()
129
- }
145
+ u .mu .RUnlock ()
130
146
131
- func NewUpdatesProvider (db WorkspaceStore ,ps pubsub.Pubsub )WorkspaceUpdatesProvider {
132
- return & updatesProvider {
133
- db :db ,
134
- ps :ps ,
135
- subs :make ([]* sub ,0 ),
136
- }
147
+ u .mu .Lock ()
148
+ u .latest = latest
149
+ u .mu .Unlock ()
150
+ wg .Wait ()
137
151
}
138
152
139
- func (u * updatesProvider )Subscribe (userID uuid.UUID ) (<- chan * proto.WorkspaceUpdate ,error ) {
153
+ func (u * updatesProvider )Subscribe (peerID uuid. UUID , userID uuid.UUID ) (<- chan * proto.WorkspaceUpdate ,error ) {
140
154
u .mu .Lock ()
141
155
defer u .mu .Unlock ()
142
156
@@ -146,10 +160,25 @@ func (u *updatesProvider) Subscribe(userID uuid.UUID) (<-chan *proto.WorkspaceUp
146
160
tx :tx ,
147
161
prev :make (workspacesByID ),
148
162
}
149
- u .subs = append (u .subs ,sub )
163
+ u .subs [peerID ]= sub
164
+ // Write initial state
165
+ sub .send (u .latest )
150
166
return tx ,nil
151
167
}
152
168
169
+ func (u * updatesProvider )Unsubscribe (peerID uuid.UUID ) {
170
+ u .mu .Lock ()
171
+ defer u .mu .Unlock ()
172
+
173
+ sub ,exists := u .subs [peerID ]
174
+ if ! exists {
175
+ return
176
+ }
177
+ close (sub .tx )
178
+ delete (u .subs ,peerID )
179
+ return
180
+ }
181
+
153
182
func produceUpdate (old ,new workspacesByID )* proto.WorkspaceUpdate {
154
183
out := & proto.WorkspaceUpdate {
155
184
UpsertedWorkspaces : []* proto.Workspace {},