Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitab7f678

Browse files
committed
redesign
1 parentb01d1cb commitab7f678

File tree

6 files changed

+177
-113
lines changed

6 files changed

+177
-113
lines changed

‎coderd/workspaces.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2068,11 +2068,6 @@ func (api *API) publishWorkspaceUpdate(ctx context.Context, workspaceID uuid.UUI
20682068
api.Logger.Warn(ctx,"failed to publish workspace update",
20692069
slog.F("workspace_id",workspaceID),slog.Error(err))
20702070
}
2071-
err=api.Pubsub.Publish(codersdk.AllWorkspacesNotifyChannel, []byte(workspaceID.String()))
2072-
iferr!=nil {
2073-
api.Logger.Warn(ctx,"failed to publish all workspaces update",
2074-
slog.F("workspace_id",workspaceID),slog.Error(err))
2075-
}
20762071
}
20772072

20782073
func (api*API)publishWorkspaceAgentLogsUpdate(ctx context.Context,workspaceAgentID uuid.UUID,m agentsdk.LogsNotifyMessage) {

‎coderd/workspaceupdates.go

Lines changed: 167 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package coderd
22

33
import (
44
"context"
5-
"database/sql"
5+
"fmt"
66
"sync"
77

88
"github.com/google/uuid"
@@ -16,8 +16,6 @@ import (
1616
"github.com/coder/coder/v2/tailnet/proto"
1717
)
1818

19-
typeworkspacesByOwnermap[uuid.UUID]workspacesByID
20-
2119
typeworkspacesByIDmap[uuid.UUID]ownedWorkspace
2220

2321
typeownedWorkspacestruct {
@@ -34,133 +32,184 @@ func (w ownedWorkspace) Equal(other ownedWorkspace) bool {
3432
w.Transition==other.Transition
3533
}
3634

37-
funcconvertRows(v []database.GetWorkspacesAndAgentsRow)workspacesByOwner {
38-
m:=make(map[uuid.UUID]workspacesByID)
39-
for_,ws:=rangev {
40-
owned:=ownedWorkspace{
41-
WorkspaceName:ws.Name,
42-
JobStatus:ws.JobStatus,
43-
Transition:ws.Transition,
44-
Agents:ws.Agents,
45-
}
46-
ifbyID,exists:=m[ws.OwnerID];!exists {
47-
m[ws.OwnerID]=map[uuid.UUID]ownedWorkspace{ws.ID:owned}
48-
}else {
49-
byID[ws.ID]=owned
50-
m[ws.OwnerID]=byID
51-
}
52-
}
53-
returnworkspacesByOwner(m)
54-
}
55-
5635
funcconvertStatus(status database.ProvisionerJobStatus,trans database.WorkspaceTransition) proto.Workspace_Status {
5736
wsStatus:=codersdk.ConvertWorkspaceStatus(codersdk.ProvisionerJobStatus(status),codersdk.WorkspaceTransition(trans))
5837
returntailnet.WorkspaceStatusToProto(wsStatus)
5938
}
6039

6140
typesubstruct {
62-
mu sync.Mutex
41+
mu sync.RWMutex
6342
userID uuid.UUID
6443
txchan<-*proto.WorkspaceUpdate
6544
prevworkspacesByID
45+
46+
dbUpdateQuerier
47+
ps pubsub.Pubsub
48+
49+
cancelFns []func()
50+
agentSubCancelFnfunc()
51+
wsSubCancelFnfunc()
6652
}
6753

68-
func (s*sub)send(allworkspacesByOwner) {
54+
func (s*sub)ownsAgent(agentID uuid.UUID)bool {
55+
s.mu.RLock()
56+
defers.mu.RUnlock()
57+
58+
for_,ws:=ranges.prev {
59+
for_,agent:=rangews.Agents {
60+
ifagent.ID==agentID {
61+
returntrue
62+
}
63+
}
64+
}
65+
returnfalse
66+
}
67+
68+
func (s*sub)handleUpdateEvent(ctx context.Context,_ []byte) {
69+
s.mu.Lock()
70+
defers.mu.Unlock()
71+
72+
rows,err:=s.db.GetWorkspacesAndAgentsByOwnerID(ctx,s.userID)
73+
iferr!=nil {
74+
return
75+
}
76+
s.handleRowsNoLock(rows)
77+
}
78+
79+
func (s*sub)handleInsertWorkspaceEvent(ctx context.Context,wsIDStr []byte) {
6980
s.mu.Lock()
7081
defers.mu.Unlock()
7182

72-
// Filter to only the workspaces owned by the user
73-
latest:=all[s.userID]
83+
wsID,err:=uuid.Parse(string(wsIDStr))
84+
iferr!=nil {
85+
return
86+
}
87+
88+
cancel,err:=s.ps.Subscribe(codersdk.WorkspaceNotifyChannel(wsID),s.handleUpdateEvent)
89+
iferr!=nil {
90+
return
91+
}
92+
s.cancelFns=append(s.cancelFns,cancel)
93+
94+
rows,err:=s.db.GetWorkspacesAndAgentsByOwnerID(ctx,s.userID)
95+
iferr!=nil {
96+
return
97+
}
98+
s.handleRowsNoLock(rows)
99+
}
100+
101+
func (s*sub)handleInsertAgentEvent(ctx context.Context,_ []byte) {
102+
s.mu.Lock()
103+
defers.mu.Unlock()
104+
105+
rows,err:=s.db.GetWorkspacesAndAgentsByOwnerID(ctx,s.userID)
106+
iferr!=nil {
107+
return
108+
}
109+
s.handleRowsNoLock(rows)
110+
}
111+
112+
func (s*sub)handleRowsNoLock(rows []database.GetWorkspacesAndAgentsByOwnerIDRow) {
113+
latest:=convertRows(rows)
74114
update:=produceUpdate(s.prev,latest)
75115
s.prev=latest
76116
s.tx<-update
77117
}
78118

119+
// start subscribes to updates for all workspaces owned by the user
120+
func (s*sub)start() (errerror) {
121+
s.mu.Lock()
122+
defers.mu.Unlock()
123+
124+
rows,err:=s.db.GetWorkspacesAndAgentsByOwnerID(context.Background(),s.userID)
125+
iferr!=nil {
126+
returnxerrors.Errorf("get workspaces and agents by owner ID: %w",err)
127+
}
128+
129+
// Send initial state
130+
s.handleRowsNoLock(rows)
131+
132+
for_,row:=rangerows {
133+
cancel,err:=s.ps.Subscribe(codersdk.WorkspaceNotifyChannel(row.ID),s.handleUpdateEvent)
134+
iferr!=nil {
135+
returnxerrors.Errorf("subscribe to workspace notify channel: %w",err)
136+
}
137+
s.cancelFns=append(s.cancelFns,cancel)
138+
}
139+
140+
cancel,err:=s.ps.Subscribe(WorkspaceInsertChannel(s.userID),s.handleInsertWorkspaceEvent)
141+
iferr!=nil {
142+
returnxerrors.Errorf("subscribe to new workspace channel: %w",err)
143+
}
144+
s.wsSubCancelFn=cancel
145+
146+
cancel,err=s.ps.Subscribe(fmt.Sprintf("new_agent:%s",s.userID.String()),s.handleInsertAgentEvent)
147+
iferr!=nil {
148+
returnxerrors.Errorf("subscribe to new agent channel: %w",err)
149+
}
150+
s.agentSubCancelFn=cancel
151+
returnnil
152+
}
153+
154+
func (s*sub)stop() {
155+
s.mu.Lock()
156+
defers.mu.Unlock()
157+
158+
for_,cancel:=ranges.cancelFns {
159+
cancel()
160+
}
161+
162+
ifs.wsSubCancelFn!=nil {
163+
s.wsSubCancelFn()
164+
}
165+
166+
ifs.agentSubCancelFn!=nil {
167+
s.agentSubCancelFn()
168+
}
169+
170+
close(s.tx)
171+
}
172+
79173
typeUpdateQuerierinterface {
80-
GetWorkspacesAndAgents(ctx context.Context) ([]database.GetWorkspacesAndAgentsRow,error)
174+
GetWorkspacesAndAgentsByOwnerID(ctx context.Context,ownerID uuid.UUID) ([]database.GetWorkspacesAndAgentsByOwnerIDRow,error)
81175
}
82176

83177
typeupdatesProviderstruct {
84178
mu sync.RWMutex
85-
dbUpdateQuerier
86-
ps pubsub.Pubsub
87179
// Peer ID -> subscription
88180
subsmap[uuid.UUID]*sub
89-
// Owner ID -> workspace ID -> workspace
90-
latestworkspacesByOwner
91-
cancelFnfunc()
181+
182+
dbUpdateQuerier
183+
ps pubsub.Pubsub
92184
}
93185

94-
func (u*updatesProvider)IsOwner(userID uuid.UUID,agentID uuid.UUID)error {
186+
func (u*updatesProvider)IsOwner(userID uuid.UUID,agentID uuid.UUID)bool {
95187
u.mu.RLock()
96188
deferu.mu.RUnlock()
97189

98-
workspaces,exists:=u.latest[userID]
99-
if!exists {
100-
returnxerrors.Errorf("workspace agent not found or you do not have permission: %w",sql.ErrNoRows)
101-
}
102-
for_,workspace:=rangeworkspaces {
103-
for_,agent:=rangeworkspace.Agents {
104-
ifagent.ID==agentID {
105-
returnnil
106-
}
190+
for_,sub:=rangeu.subs {
191+
ifsub.userID==userID&&sub.ownsAgent(agentID) {
192+
returntrue
107193
}
108194
}
109-
returnxerrors.Errorf("workspace agent not found or you do not have permission: %w",sql.ErrNoRows)
195+
returnfalse
110196
}
111197

112198
var_ tailnet.WorkspaceUpdatesProvider= (*updatesProvider)(nil)
113199

114-
funcNewUpdatesProvider(ctx context.Context,dbUpdateQuerier,ps pubsub.Pubsub) (tailnet.WorkspaceUpdatesProvider,error) {
115-
rows,err:=db.GetWorkspacesAndAgents(ctx)
116-
iferr!=nil&&!xerrors.Is(err,sql.ErrNoRows) {
117-
returnnil,err
118-
}
200+
funcNewUpdatesProvider(_ context.Context,dbUpdateQuerier,ps pubsub.Pubsub) (tailnet.WorkspaceUpdatesProvider,error) {
119201
out:=&updatesProvider{
120-
db:db,
121-
ps:ps,
122-
subs:map[uuid.UUID]*sub{},
123-
latest:convertRows(rows),
124-
}
125-
cancel,err:=ps.Subscribe(codersdk.AllWorkspacesNotifyChannel,out.handleUpdate)
126-
iferr!=nil {
127-
returnnil,err
202+
db:db,
203+
ps:ps,
204+
subs:map[uuid.UUID]*sub{},
128205
}
129-
out.cancelFn=cancel
130206
returnout,nil
131207
}
132208

133209
func (u*updatesProvider)Stop() {
134210
for_,sub:=rangeu.subs {
135-
close(sub.tx)
136-
}
137-
u.cancelFn()
138-
}
139-
140-
func (u*updatesProvider)handleUpdate(ctx context.Context,_ []byte) {
141-
rows,err:=u.db.GetWorkspacesAndAgents(ctx)
142-
iferr!=nil&&!xerrors.Is(err,sql.ErrNoRows) {
143-
// TODO: Log
144-
return
145-
}
146-
147-
wg:=&sync.WaitGroup{}
148-
latest:=convertRows(rows)
149-
u.mu.RLock()
150-
for_,sub:=rangeu.subs {
151-
sub:=sub
152-
wg.Add(1)
153-
gofunc() {
154-
sub.send(latest)
155-
deferwg.Done()
156-
}()
211+
sub.stop()
157212
}
158-
u.mu.RUnlock()
159-
160-
u.mu.Lock()
161-
u.latest=latest
162-
u.mu.Unlock()
163-
wg.Wait()
164213
}
165214

166215
func (u*updatesProvider)Subscribe(peerID uuid.UUID,userID uuid.UUID) (<-chan*proto.WorkspaceUpdate,error) {
@@ -169,13 +218,20 @@ func (u *updatesProvider) Subscribe(peerID uuid.UUID, userID uuid.UUID) (<-chan
169218

170219
tx:=make(chan*proto.WorkspaceUpdate,1)
171220
sub:=&sub{
172-
userID:userID,
173-
tx:tx,
174-
prev:make(workspacesByID),
221+
userID:userID,
222+
tx:tx,
223+
db:u.db,
224+
ps:u.ps,
225+
prev:workspacesByID{},
226+
cancelFns: []func(){},
227+
}
228+
err:=sub.start()
229+
iferr!=nil {
230+
sub.stop()
231+
returnnil,err
175232
}
233+
176234
u.subs[peerID]=sub
177-
// Write initial state
178-
sub.send(u.latest)
179235
returntx,nil
180236
}
181237

@@ -263,3 +319,24 @@ func produceUpdate(old, new workspacesByID) *proto.WorkspaceUpdate {
263319

264320
returnout
265321
}
322+
323+
funcconvertRows(rows []database.GetWorkspacesAndAgentsByOwnerIDRow)workspacesByID {
324+
out:=make(workspacesByID)
325+
for_,row:=rangerows {
326+
out[row.ID]=ownedWorkspace{
327+
WorkspaceName:row.Name,
328+
JobStatus:row.JobStatus,
329+
Transition:row.Transition,
330+
Agents:row.Agents,
331+
}
332+
}
333+
returnout
334+
}
335+
336+
funcWorkspaceInsertChannel(userID uuid.UUID)string {
337+
returnfmt.Sprintf("new_workspace:%s",userID.String())
338+
}
339+
340+
funcAgentInsertChannel(userID uuid.UUID)string {
341+
returnfmt.Sprintf("new_agent:%s",userID.String())
342+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp