Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitde1435d

Browse files
committed
redesign
1 parent13d9896 commitde1435d

File tree

5 files changed

+141
-184
lines changed

5 files changed

+141
-184
lines changed

‎coderd/workspaces.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2089,11 +2089,6 @@ func (api *API) publishWorkspaceUpdate(ctx context.Context, ownerID uuid.UUID, e
20892089
api.Logger.Warn(ctx,"failed to publish workspace update",
20902090
slog.F("workspace_id",event.WorkspaceID),slog.Error(err))
20912091
}
2092-
err=api.Pubsub.Publish(codersdk.AllWorkspacesNotifyChannel, []byte(workspaceID.String()))
2093-
iferr!=nil {
2094-
api.Logger.Warn(ctx,"failed to publish all workspaces update",
2095-
slog.F("workspace_id",workspaceID),slog.Error(err))
2096-
}
20972092
}
20982093

20992094
func (api*API)publishWorkspaceAgentLogsUpdate(ctx context.Context,workspaceAgentID uuid.UUID,m agentsdk.LogsNotifyMessage) {

‎coderd/workspaceupdates.go

Lines changed: 128 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,18 @@ package coderd
22

33
import (
44
"context"
5-
"database/sql"
65
"sync"
76

87
"github.com/google/uuid"
98
"golang.org/x/xerrors"
109

1110
"github.com/coder/coder/v2/coderd/database"
1211
"github.com/coder/coder/v2/coderd/database/pubsub"
13-
"github.com/coder/coder/v2/coderd/util/slice"
1412
"github.com/coder/coder/v2/codersdk"
1513
"github.com/coder/coder/v2/tailnet"
1614
"github.com/coder/coder/v2/tailnet/proto"
1715
)
1816

19-
typeworkspacesByOwnermap[uuid.UUID]workspacesByID
20-
2117
typeworkspacesByIDmap[uuid.UUID]ownedWorkspace
2218

2319
typeownedWorkspacestruct {
@@ -27,140 +23,147 @@ type ownedWorkspace struct {
2723
Agents []database.AgentIDNamePair
2824
}
2925

30-
// Equal does not compare agents
31-
func (wownedWorkspace)Equal(otherownedWorkspace)bool {
32-
returnw.WorkspaceName==other.WorkspaceName&&
33-
w.JobStatus==other.JobStatus&&
34-
w.Transition==other.Transition
35-
}
36-
37-
funcconvertRows(v []database.GetWorkspacesAndAgentsRow)workspacesByOwner {
38-
m:=make(map[uuid.UUID]workspacesByID)
39-
for_,ws:=rangev {
40-
owned:=ownedWorkspace{
41-
WorkspaceName:ws.Name,
42-
JobStatus:ws.JobStatus,
43-
Transition:ws.Transition,
44-
Agents:ws.Agents,
45-
}
46-
ifbyID,exists:=m[ws.OwnerID];!exists {
47-
m[ws.OwnerID]=map[uuid.UUID]ownedWorkspace{ws.ID:owned}
48-
}else {
49-
byID[ws.ID]=owned
50-
m[ws.OwnerID]=byID
51-
}
52-
}
53-
returnworkspacesByOwner(m)
54-
}
55-
5626
funcconvertStatus(status database.ProvisionerJobStatus,trans database.WorkspaceTransition) proto.Workspace_Status {
5727
wsStatus:=codersdk.ConvertWorkspaceStatus(codersdk.ProvisionerJobStatus(status),codersdk.WorkspaceTransition(trans))
5828
returntailnet.WorkspaceStatusToProto(wsStatus)
5929
}
6030

6131
typesubstruct {
62-
mu sync.Mutex
32+
mu sync.RWMutex
6333
userID uuid.UUID
6434
txchan<-*proto.WorkspaceUpdate
65-
prevworkspacesByID
35+
stateworkspacesByID
36+
37+
dbUpdateQuerier
38+
ps pubsub.Pubsub
39+
40+
cancelFnfunc()
41+
}
42+
43+
func (s*sub)ownsAgent(agentID uuid.UUID)bool {
44+
s.mu.RLock()
45+
defers.mu.RUnlock()
46+
47+
for_,ws:=ranges.state {
48+
for_,agent:=rangews.Agents {
49+
ifagent.ID==agentID {
50+
returntrue
51+
}
52+
}
53+
}
54+
returnfalse
55+
}
56+
57+
func (s*sub)handleEvent(_ context.Context,event codersdk.WorkspaceEvent) {
58+
s.mu.Lock()
59+
defers.mu.Unlock()
60+
61+
out:=&proto.WorkspaceUpdate{
62+
UpsertedWorkspaces: []*proto.Workspace{},
63+
UpsertedAgents: []*proto.Agent{},
64+
DeletedWorkspaces: []*proto.Workspace{},
65+
DeletedAgents: []*proto.Agent{},
66+
}
67+
68+
switchevent.Kind {
69+
casecodersdk.WorkspaceEventKindNewAgent:
70+
out.UpsertedAgents=append(out.UpsertedAgents,&proto.Agent{
71+
WorkspaceId:tailnet.UUIDToByteSlice(event.WorkspaceID),
72+
Id:tailnet.UUIDToByteSlice(*event.AgentID),
73+
Name:*event.AgentName,
74+
})
75+
ws,ok:=s.state[event.WorkspaceID]
76+
if!ok {
77+
break
78+
}
79+
ws.Agents=append(ws.Agents, database.AgentIDNamePair{
80+
ID:*event.AgentID,
81+
Name:*event.AgentName,
82+
})
83+
s.state[event.WorkspaceID]=ws
84+
casecodersdk.WorkspaceEventKindStateChange:
85+
// TODO: One event for both upsertions and deletions
86+
default:
87+
return
88+
}
89+
s.tx<-out
90+
}
91+
92+
// start subscribes to updates for all workspaces owned by the user
93+
func (s*sub)start() (errerror) {
94+
s.mu.Lock()
95+
defers.mu.Unlock()
96+
97+
rows,err:=s.db.GetWorkspacesAndAgentsByOwnerID(context.Background(),s.userID)
98+
iferr!=nil {
99+
returnxerrors.Errorf("get workspaces and agents by owner ID: %w",err)
100+
}
101+
102+
initUpdate:=produceInitialUpdate(rows)
103+
s.tx<-initUpdate
104+
initState:=convertRows(rows)
105+
s.state=initState
106+
107+
cancel,err:=s.ps.Subscribe(codersdk.WorkspaceEventChannel(s.userID),codersdk.HandleWorkspaceEvent(s.handleEvent))
108+
iferr!=nil {
109+
returnxerrors.Errorf("subscribe to workspace event channel: %w",err)
110+
}
111+
112+
s.cancelFn=cancel
113+
returnnil
66114
}
67115

68-
func (s*sub)send(allworkspacesByOwner) {
116+
func (s*sub)stop() {
69117
s.mu.Lock()
70118
defers.mu.Unlock()
71119

72-
// Filter to only the workspaces owned by the user
73-
latest:=all[s.userID]
74-
update:=produceUpdate(s.prev,latest)
75-
s.prev=latest
76-
s.tx<-update
120+
ifs.cancelFn!=nil {
121+
s.cancelFn()
122+
}
123+
124+
close(s.tx)
77125
}
78126

79127
typeUpdateQuerierinterface {
80-
GetWorkspacesAndAgents(ctx context.Context) ([]database.GetWorkspacesAndAgentsRow,error)
128+
GetWorkspacesAndAgentsByOwnerID(ctx context.Context,ownerID uuid.UUID) ([]database.GetWorkspacesAndAgentsByOwnerIDRow,error)
81129
}
82130

83131
typeupdatesProviderstruct {
84132
mu sync.RWMutex
85-
dbUpdateQuerier
86-
ps pubsub.Pubsub
87133
// Peer ID -> subscription
88134
subsmap[uuid.UUID]*sub
89-
// Owner ID -> workspace ID -> workspace
90-
latestworkspacesByOwner
91-
cancelFnfunc()
135+
136+
dbUpdateQuerier
137+
ps pubsub.Pubsub
92138
}
93139

94-
func (u*updatesProvider)IsOwner(userID uuid.UUID,agentID uuid.UUID)error {
140+
func (u*updatesProvider)IsOwner(userID uuid.UUID,agentID uuid.UUID)bool {
95141
u.mu.RLock()
96142
deferu.mu.RUnlock()
97143

98-
workspaces,exists:=u.latest[userID]
99-
if!exists {
100-
returnxerrors.Errorf("workspace agent not found or you do not have permission: %w",sql.ErrNoRows)
101-
}
102-
for_,workspace:=rangeworkspaces {
103-
for_,agent:=rangeworkspace.Agents {
104-
ifagent.ID==agentID {
105-
returnnil
106-
}
144+
for_,sub:=rangeu.subs {
145+
ifsub.userID==userID&&sub.ownsAgent(agentID) {
146+
returntrue
107147
}
108148
}
109-
returnxerrors.Errorf("workspace agent not found or you do not have permission: %w",sql.ErrNoRows)
149+
returnfalse
110150
}
111151

112152
var_ tailnet.WorkspaceUpdatesProvider= (*updatesProvider)(nil)
113153

114-
funcNewUpdatesProvider(ctx context.Context,dbUpdateQuerier,ps pubsub.Pubsub) (tailnet.WorkspaceUpdatesProvider,error) {
115-
rows,err:=db.GetWorkspacesAndAgents(ctx)
116-
iferr!=nil&&!xerrors.Is(err,sql.ErrNoRows) {
117-
returnnil,err
118-
}
154+
funcNewUpdatesProvider(_ context.Context,dbUpdateQuerier,ps pubsub.Pubsub) (tailnet.WorkspaceUpdatesProvider,error) {
119155
out:=&updatesProvider{
120-
db:db,
121-
ps:ps,
122-
subs:map[uuid.UUID]*sub{},
123-
latest:convertRows(rows),
124-
}
125-
cancel,err:=ps.Subscribe(codersdk.AllWorkspacesNotifyChannel,out.handleUpdate)
126-
iferr!=nil {
127-
returnnil,err
156+
db:db,
157+
ps:ps,
158+
subs:map[uuid.UUID]*sub{},
128159
}
129-
out.cancelFn=cancel
130160
returnout,nil
131161
}
132162

133163
func (u*updatesProvider)Stop() {
134164
for_,sub:=rangeu.subs {
135-
close(sub.tx)
165+
sub.stop()
136166
}
137-
u.cancelFn()
138-
}
139-
140-
func (u*updatesProvider)handleUpdate(ctx context.Context,_ []byte) {
141-
rows,err:=u.db.GetWorkspacesAndAgents(ctx)
142-
iferr!=nil&&!xerrors.Is(err,sql.ErrNoRows) {
143-
// TODO: Log
144-
return
145-
}
146-
147-
wg:=&sync.WaitGroup{}
148-
latest:=convertRows(rows)
149-
u.mu.RLock()
150-
for_,sub:=rangeu.subs {
151-
sub:=sub
152-
wg.Add(1)
153-
gofunc() {
154-
sub.send(latest)
155-
deferwg.Done()
156-
}()
157-
}
158-
u.mu.RUnlock()
159-
160-
u.mu.Lock()
161-
u.latest=latest
162-
u.mu.Unlock()
163-
wg.Wait()
164167
}
165168

166169
func (u*updatesProvider)Subscribe(peerID uuid.UUID,userID uuid.UUID) (<-chan*proto.WorkspaceUpdate,error) {
@@ -171,11 +174,17 @@ func (u *updatesProvider) Subscribe(peerID uuid.UUID, userID uuid.UUID) (<-chan
171174
sub:=&sub{
172175
userID:userID,
173176
tx:tx,
174-
prev:make(workspacesByID),
177+
db:u.db,
178+
ps:u.ps,
179+
state:workspacesByID{},
180+
}
181+
err:=sub.start()
182+
iferr!=nil {
183+
sub.stop()
184+
returnnil,err
175185
}
186+
176187
u.subs[peerID]=sub
177-
// Write initial state
178-
sub.send(u.latest)
179188
returntx,nil
180189
}
181190

@@ -191,75 +200,40 @@ func (u *updatesProvider) Unsubscribe(peerID uuid.UUID) {
191200
delete(u.subs,peerID)
192201
}
193202

194-
funcproduceUpdate(old,newworkspacesByID)*proto.WorkspaceUpdate {
203+
funcproduceInitialUpdate(rows []database.GetWorkspacesAndAgentsByOwnerIDRow)*proto.WorkspaceUpdate {
195204
out:=&proto.WorkspaceUpdate{
196205
UpsertedWorkspaces: []*proto.Workspace{},
197206
UpsertedAgents: []*proto.Agent{},
198207
DeletedWorkspaces: []*proto.Workspace{},
199208
DeletedAgents: []*proto.Agent{},
200209
}
201210

202-
forwsID,newWorkspace:=rangenew {
203-
oldWorkspace,exists:=old[wsID]
204-
// Upsert both workspace and agents if the workspace is new
205-
if!exists {
206-
out.UpsertedWorkspaces=append(out.UpsertedWorkspaces,&proto.Workspace{
207-
Id:tailnet.UUIDToByteSlice(wsID),
208-
Name:newWorkspace.WorkspaceName,
209-
Status:convertStatus(newWorkspace.JobStatus,newWorkspace.Transition),
210-
})
211-
for_,agent:=rangenewWorkspace.Agents {
212-
out.UpsertedAgents=append(out.UpsertedAgents,&proto.Agent{
213-
Id:tailnet.UUIDToByteSlice(agent.ID),
214-
Name:agent.Name,
215-
WorkspaceId:tailnet.UUIDToByteSlice(wsID),
216-
})
217-
}
218-
continue
219-
}
220-
// Upsert workspace if the workspace is updated
221-
if!newWorkspace.Equal(oldWorkspace) {
222-
out.UpsertedWorkspaces=append(out.UpsertedWorkspaces,&proto.Workspace{
223-
Id:tailnet.UUIDToByteSlice(wsID),
224-
Name:newWorkspace.WorkspaceName,
225-
Status:convertStatus(newWorkspace.JobStatus,newWorkspace.Transition),
226-
})
227-
}
228-
229-
add,remove:=slice.SymmetricDifference(oldWorkspace.Agents,newWorkspace.Agents)
230-
for_,agent:=rangeadd {
211+
for_,row:=rangerows {
212+
out.UpsertedWorkspaces=append(out.UpsertedWorkspaces,&proto.Workspace{
213+
Id:tailnet.UUIDToByteSlice(row.ID),
214+
Name:row.Name,
215+
Status:convertStatus(row.JobStatus,row.Transition),
216+
})
217+
for_,agent:=rangerow.Agents {
231218
out.UpsertedAgents=append(out.UpsertedAgents,&proto.Agent{
232219
Id:tailnet.UUIDToByteSlice(agent.ID),
233220
Name:agent.Name,
234-
WorkspaceId:tailnet.UUIDToByteSlice(wsID),
235-
})
236-
}
237-
for_,agent:=rangeremove {
238-
out.DeletedAgents=append(out.DeletedAgents,&proto.Agent{
239-
Id:tailnet.UUIDToByteSlice(agent.ID),
240-
Name:agent.Name,
241-
WorkspaceId:tailnet.UUIDToByteSlice(wsID),
221+
WorkspaceId:tailnet.UUIDToByteSlice(row.ID),
242222
})
243223
}
244224
}
225+
returnout
226+
}
245227

246-
// Delete workspace and agents if the workspace is deleted
247-
forwsID,oldWorkspace:=rangeold {
248-
if_,exists:=new[wsID];!exists {
249-
out.DeletedWorkspaces=append(out.DeletedWorkspaces,&proto.Workspace{
250-
Id:tailnet.UUIDToByteSlice(wsID),
251-
Name:oldWorkspace.WorkspaceName,
252-
Status:convertStatus(oldWorkspace.JobStatus,oldWorkspace.Transition),
253-
})
254-
for_,agent:=rangeoldWorkspace.Agents {
255-
out.DeletedAgents=append(out.DeletedAgents,&proto.Agent{
256-
Id:tailnet.UUIDToByteSlice(agent.ID),
257-
Name:agent.Name,
258-
WorkspaceId:tailnet.UUIDToByteSlice(wsID),
259-
})
260-
}
228+
funcconvertRows(rows []database.GetWorkspacesAndAgentsByOwnerIDRow)workspacesByID {
229+
out:=make(workspacesByID)
230+
for_,row:=rangerows {
231+
out[row.ID]=ownedWorkspace{
232+
WorkspaceName:row.Name,
233+
JobStatus:row.JobStatus,
234+
Transition:row.Transition,
235+
Agents:row.Agents,
261236
}
262237
}
263-
264238
returnout
265239
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp