Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit73b63ca

Browse files
committed
use peer ids as subscription key + tests
1 parentd6a1b68 commit73b63ca

File tree

4 files changed

+358
-215
lines changed

4 files changed

+358
-215
lines changed

‎tailnet/service.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,10 +259,12 @@ func (s *DRPCService) WorkspaceUpdates(_ *proto.WorkspaceUpdatesRequest, stream
259259
)
260260
switchauth:=streamID.Auth.(type) {
261261
caseClientUserCoordinateeAuth:
262-
updatesCh,err=s.WorkspaceUpdatesProvider.Subscribe(auth.UserID)
262+
// Stream ID is the peer ID
263+
updatesCh,err=s.WorkspaceUpdatesProvider.Subscribe(streamID.ID,auth.UserID)
263264
iferr!=nil {
264265
err=xerrors.Errorf("subscribe to workspace updates: %w",err)
265266
}
267+
defers.WorkspaceUpdatesProvider.Unsubscribe(streamID.ID)
266268
default:
267269
err=xerrors.Errorf("workspace updates not supported by auth name %T",auth)
268270
}
@@ -273,6 +275,9 @@ func (s *DRPCService) WorkspaceUpdates(_ *proto.WorkspaceUpdatesRequest, stream
273275
for {
274276
select {
275277
caseupdates:=<-updatesCh:
278+
ifupdates==nil {
279+
returnnil
280+
}
276281
err:=stream.Send(updates)
277282
iferr!=nil {
278283
returnxerrors.Errorf("send workspace update: %w",err)

‎tailnet/workspaceupdates.go

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ func convertRows(v []database.GetWorkspacesAndAgentsRow) workspacesByOwner {
4040
WorkspaceName:ws.Name,
4141
JobStatus:ws.JobStatus,
4242
Transition:ws.Transition,
43+
Agents:ws.Agents,
4344
}
4445
ifbyID,exists:=m[ws.OwnerID];!exists {
4546
m[ws.OwnerID]=map[uuid.UUID]ownedWorkspace{ws.ID:owned}
@@ -68,14 +69,16 @@ func (s *sub) send(all workspacesByOwner) {
6869
defers.mu.Unlock()
6970

7071
// Filter to only the workspaces owned by the user
71-
own:=all[s.userID]
72-
update:=produceUpdate(s.prev,own)
73-
s.prev=own
72+
latest:=all[s.userID]
73+
update:=produceUpdate(s.prev,latest)
74+
s.prev=latest
7475
s.tx<-update
7576
}
7677

7778
typeWorkspaceUpdatesProviderinterface {
78-
Subscribe(userID uuid.UUID) (<-chan*proto.WorkspaceUpdate,error)
79+
Subscribe(peerID uuid.UUID,userID uuid.UUID) (<-chan*proto.WorkspaceUpdate,error)
80+
Unsubscribe(peerID uuid.UUID)
81+
Stop()
7982
}
8083

8184
typeWorkspaceStoreinterface {
@@ -84,25 +87,40 @@ type WorkspaceStore interface {
8487
}
8588

8689
typeupdatesProviderstruct {
87-
mu sync.RWMutex
88-
dbWorkspaceStore
89-
ps pubsub.Pubsub
90-
subs []*sub
90+
mu sync.RWMutex
91+
dbWorkspaceStore
92+
ps pubsub.Pubsub
93+
// Peer ID -> subscription
94+
subsmap[uuid.UUID]*sub
95+
latestworkspacesByOwner
9196
cancelFnfunc()
9297
}
9398

9499
var_WorkspaceUpdatesProvider= (*updatesProvider)(nil)
95100

96-
func (u*updatesProvider)Start()error {
97-
cancel,err:=u.ps.Subscribe(codersdk.AllWorkspacesNotifyChannel,u.handleUpdate)
101+
funcNewUpdatesProvider(ctx context.Context,dbWorkspaceStore,ps pubsub.Pubsub) (WorkspaceUpdatesProvider,error) {
102+
rows,err:=db.GetWorkspacesAndAgents(ctx)
103+
iferr!=nil&&!xerrors.Is(err,sql.ErrNoRows) {
104+
returnnil,err
105+
}
106+
out:=&updatesProvider{
107+
db:db,
108+
ps:ps,
109+
subs:map[uuid.UUID]*sub{},
110+
latest:convertRows(rows),
111+
}
112+
cancel,err:=ps.Subscribe(codersdk.AllWorkspacesNotifyChannel,out.handleUpdate)
98113
iferr!=nil {
99-
returnerr
114+
returnnil,err
100115
}
101-
u.cancelFn=cancel
102-
returnnil
116+
out.cancelFn=cancel
117+
returnout,nil
103118
}
104119

105120
func (u*updatesProvider)Stop() {
121+
for_,sub:=rangeu.subs {
122+
close(sub.tx)
123+
}
106124
u.cancelFn()
107125
}
108126

@@ -116,7 +134,6 @@ func (u *updatesProvider) handleUpdate(ctx context.Context, _ []byte) {
116134
wg:=&sync.WaitGroup{}
117135
latest:=convertRows(rows)
118136
u.mu.RLock()
119-
deferu.mu.RUnlock()
120137
for_,sub:=rangeu.subs {
121138
sub:=sub
122139
wg.Add(1)
@@ -125,18 +142,15 @@ func (u *updatesProvider) handleUpdate(ctx context.Context, _ []byte) {
125142
deferwg.Done()
126143
}()
127144
}
128-
wg.Wait()
129-
}
145+
u.mu.RUnlock()
130146

131-
funcNewUpdatesProvider(dbWorkspaceStore,ps pubsub.Pubsub)WorkspaceUpdatesProvider {
132-
return&updatesProvider{
133-
db:db,
134-
ps:ps,
135-
subs:make([]*sub,0),
136-
}
147+
u.mu.Lock()
148+
u.latest=latest
149+
u.mu.Unlock()
150+
wg.Wait()
137151
}
138152

139-
func (u*updatesProvider)Subscribe(userID uuid.UUID) (<-chan*proto.WorkspaceUpdate,error) {
153+
func (u*updatesProvider)Subscribe(peerID uuid.UUID,userID uuid.UUID) (<-chan*proto.WorkspaceUpdate,error) {
140154
u.mu.Lock()
141155
deferu.mu.Unlock()
142156

@@ -146,10 +160,25 @@ func (u *updatesProvider) Subscribe(userID uuid.UUID) (<-chan *proto.WorkspaceUp
146160
tx:tx,
147161
prev:make(workspacesByID),
148162
}
149-
u.subs=append(u.subs,sub)
163+
u.subs[peerID]=sub
164+
// Write initial state
165+
sub.send(u.latest)
150166
returntx,nil
151167
}
152168

169+
func (u*updatesProvider)Unsubscribe(peerID uuid.UUID) {
170+
u.mu.Lock()
171+
deferu.mu.Unlock()
172+
173+
sub,exists:=u.subs[peerID]
174+
if!exists {
175+
return
176+
}
177+
close(sub.tx)
178+
delete(u.subs,peerID)
179+
return
180+
}
181+
153182
funcproduceUpdate(old,newworkspacesByID)*proto.WorkspaceUpdate {
154183
out:=&proto.WorkspaceUpdate{
155184
UpsertedWorkspaces: []*proto.Workspace{},

‎tailnet/workspaceupdates_internal_test.go

Lines changed: 0 additions & 190 deletions
This file was deleted.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp