Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd57f3e6

Browse files
committed
move core impl to coderd
1 parent9faa940 commitd57f3e6

File tree

8 files changed

+358
-277
lines changed

8 files changed

+358
-277
lines changed

‎cli/server.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,14 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
719719
options.Database=dbmetrics.New(options.Database,options.PrometheusRegistry)
720720
}
721721

722+
wsUpdates,err:=coderd.NewUpdatesProvider(ctx,&coderd.WorkspaceUpdateStore{
723+
Store:options.Database,
724+
},options.Pubsub)
725+
iferr!=nil {
726+
returnxerrors.Errorf("create workspace updates provider: %w",err)
727+
}
728+
options.WorkspaceUpdatesProvider=wsUpdates
729+
722730
vardeploymentIDstring
723731
err=options.Database.InTx(func(tx database.Store)error {
724732
// This will block until the lock is acquired, and will be

‎coderd/coderd.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,8 @@ type Options struct {
228228

229229
WorkspaceAppsStatsCollectorOptions workspaceapps.StatsCollectorOptions
230230

231+
WorkspaceUpdatesProvider tailnet.WorkspaceUpdatesProvider
232+
231233
// This janky function is used in telemetry to parse fields out of the raw
232234
// JWT. It needs to be passed through like this because license parsing is
233235
// under the enterprise license, and can't be imported into AGPL.
@@ -591,12 +593,13 @@ func New(options *Options) *API {
591593
panic("CoordinatorResumeTokenProvider is nil")
592594
}
593595
api.TailnetClientService,err=tailnet.NewClientService(tailnet.ClientServiceOptions{
594-
Logger:api.Logger.Named("tailnetclient"),
595-
CoordPtr:&api.TailnetCoordinator,
596-
DERPMapUpdateFrequency:api.Options.DERPMapUpdateFrequency,
597-
DERPMapFn:api.DERPMap,
598-
NetworkTelemetryHandler:api.NetworkTelemetryBatcher.Handler,
599-
ResumeTokenProvider:api.Options.CoordinatorResumeTokenProvider,
596+
Logger:api.Logger.Named("tailnetclient"),
597+
CoordPtr:&api.TailnetCoordinator,
598+
DERPMapUpdateFrequency:api.Options.DERPMapUpdateFrequency,
599+
DERPMapFn:api.DERPMap,
600+
NetworkTelemetryHandler:api.NetworkTelemetryBatcher.Handler,
601+
ResumeTokenProvider:api.Options.CoordinatorResumeTokenProvider,
602+
WorkspaceUpdatesProvider:api.Options.WorkspaceUpdatesProvider,
600603
})
601604
iferr!=nil {
602605
api.Logger.Fatal(api.ctx,"failed to initialize tailnet client service",slog.Error(err))

‎coderd/workspaces.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2146,11 +2146,12 @@ func (api *API) tailnet(rw http.ResponseWriter, r *http.Request) {
21462146

21472147
gohttpapi.Heartbeat(ctx,conn)
21482148
err=api.TailnetClientService.ServeUserClient(ctx,version,wsNetConn, tailnet.ServeUserClientOptions{
2149-
PeerID:peerID,
2150-
UserID:owner.ID,
2151-
Subject:&ownerRoles,
2152-
Authz:api.Authorizer,
2153-
Database:api.Database,
2149+
PeerID:peerID,
2150+
UserID:owner.ID,
2151+
Subject:&ownerRoles,
2152+
Authz:api.Authorizer,
2153+
// TODO:
2154+
// Database: api.Database,
21542155
})
21552156
iferr!=nil&&!xerrors.Is(err,io.EOF)&&!xerrors.Is(err,context.Canceled) {
21562157
_=conn.Close(websocket.StatusInternalError,err.Error())

‎coderd/workspaceupdates.go

Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
package coderd
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"sync"
7+
8+
"github.com/google/uuid"
9+
"golang.org/x/xerrors"
10+
11+
"github.com/coder/coder/v2/coderd/database"
12+
"github.com/coder/coder/v2/coderd/database/db2sdk"
13+
"github.com/coder/coder/v2/coderd/database/pubsub"
14+
"github.com/coder/coder/v2/coderd/rbac"
15+
"github.com/coder/coder/v2/coderd/util/slice"
16+
"github.com/coder/coder/v2/codersdk"
17+
"github.com/coder/coder/v2/tailnet"
18+
"github.com/coder/coder/v2/tailnet/proto"
19+
)
20+
21+
typeworkspacesByOwnermap[uuid.UUID]workspacesByID
22+
23+
typeworkspacesByIDmap[uuid.UUID]ownedWorkspace
24+
25+
typeownedWorkspacestruct {
26+
WorkspaceNamestring
27+
JobStatus codersdk.ProvisionerJobStatus
28+
Transition codersdk.WorkspaceTransition
29+
Agents []tailnet.AgentIDNamePair
30+
}
31+
32+
typeWorkspaceUpdateStorestruct {
33+
database.Store
34+
}
35+
36+
var_ tailnet.UpdateQuerier= (*WorkspaceUpdateStore)(nil)
37+
38+
func (u*WorkspaceUpdateStore)GetWorkspaceRBACByAgentID(ctx context.Context,agentID uuid.UUID) (rbac.Objecter,error) {
39+
ws,err:=u.Store.GetWorkspaceByAgentID(ctx,agentID)
40+
iferr!=nil {
41+
returnnil,err
42+
}
43+
returnws,nil
44+
}
45+
46+
func (u*WorkspaceUpdateStore)GetWorkspacesAndAgents(ctx context.Context) ([]tailnet.WorkspacesAndAgents,error) {
47+
rows,err:=u.Store.GetWorkspacesAndAgents(ctx)
48+
iferr!=nil {
49+
returnnil,err
50+
}
51+
52+
out:=db2sdk.List(rows,func(v database.GetWorkspacesAndAgentsRow) tailnet.WorkspacesAndAgents {
53+
return tailnet.WorkspacesAndAgents{
54+
ID:v.ID,
55+
Name:v.Name,
56+
OwnerID:v.OwnerID,
57+
JobStatus:codersdk.ProvisionerJobStatus(v.JobStatus),
58+
Transition:codersdk.WorkspaceTransition(v.Transition),
59+
Agents:db2sdk.List(v.Agents,func(database.AgentIDNamePair) tailnet.AgentIDNamePair {
60+
return tailnet.AgentIDNamePair{
61+
ID:v.ID,
62+
Name:v.Name,
63+
}
64+
}),
65+
}
66+
})
67+
returnout,nil
68+
}
69+
70+
// Equal does not compare agents
71+
func (wownedWorkspace)Equal(otherownedWorkspace)bool {
72+
returnw.WorkspaceName==other.WorkspaceName&&
73+
w.JobStatus==other.JobStatus&&
74+
w.Transition==other.Transition
75+
}
76+
77+
funcconvertRows(v []tailnet.WorkspacesAndAgents)workspacesByOwner {
78+
m:=make(map[uuid.UUID]workspacesByID)
79+
for_,ws:=rangev {
80+
owned:=ownedWorkspace{
81+
WorkspaceName:ws.Name,
82+
JobStatus:ws.JobStatus,
83+
Transition:ws.Transition,
84+
Agents:ws.Agents,
85+
}
86+
ifbyID,exists:=m[ws.OwnerID];!exists {
87+
m[ws.OwnerID]=map[uuid.UUID]ownedWorkspace{ws.ID:owned}
88+
}else {
89+
byID[ws.ID]=owned
90+
m[ws.OwnerID]=byID
91+
}
92+
}
93+
returnworkspacesByOwner(m)
94+
}
95+
96+
funcconvertStatus(status codersdk.ProvisionerJobStatus,trans codersdk.WorkspaceTransition) proto.Workspace_Status {
97+
wsStatus:=codersdk.ConvertWorkspaceStatus(status,trans)
98+
returntailnet.WorkspaceStatusToProto(wsStatus)
99+
}
100+
101+
typesubstruct {
102+
mu sync.Mutex
103+
userID uuid.UUID
104+
txchan<-*proto.WorkspaceUpdate
105+
prevworkspacesByID
106+
}
107+
108+
func (s*sub)send(allworkspacesByOwner) {
109+
s.mu.Lock()
110+
defers.mu.Unlock()
111+
112+
// Filter to only the workspaces owned by the user
113+
latest:=all[s.userID]
114+
update:=produceUpdate(s.prev,latest)
115+
s.prev=latest
116+
s.tx<-update
117+
}
118+
119+
typeupdatesProviderstruct {
120+
mu sync.RWMutex
121+
db tailnet.UpdateQuerier
122+
ps pubsub.Pubsub
123+
// Peer ID -> subscription
124+
subsmap[uuid.UUID]*sub
125+
latestworkspacesByOwner
126+
cancelFnfunc()
127+
}
128+
129+
var_ tailnet.WorkspaceUpdatesProvider= (*updatesProvider)(nil)
130+
131+
funcNewUpdatesProvider(ctx context.Context,db tailnet.UpdateQuerier,ps pubsub.Pubsub) (tailnet.WorkspaceUpdatesProvider,error) {
132+
rows,err:=db.GetWorkspacesAndAgents(ctx)
133+
iferr!=nil&&!xerrors.Is(err,sql.ErrNoRows) {
134+
returnnil,err
135+
}
136+
out:=&updatesProvider{
137+
db:db,
138+
ps:ps,
139+
subs:map[uuid.UUID]*sub{},
140+
latest:convertRows(rows),
141+
}
142+
cancel,err:=ps.Subscribe(codersdk.AllWorkspacesNotifyChannel,out.handleUpdate)
143+
iferr!=nil {
144+
returnnil,err
145+
}
146+
out.cancelFn=cancel
147+
returnout,nil
148+
}
149+
150+
func (u*updatesProvider)Stop() {
151+
for_,sub:=rangeu.subs {
152+
close(sub.tx)
153+
}
154+
u.cancelFn()
155+
}
156+
157+
func (u*updatesProvider)handleUpdate(ctx context.Context,_ []byte) {
158+
rows,err:=u.db.GetWorkspacesAndAgents(ctx)
159+
iferr!=nil&&!xerrors.Is(err,sql.ErrNoRows) {
160+
// TODO: Log
161+
return
162+
}
163+
164+
wg:=&sync.WaitGroup{}
165+
latest:=convertRows(rows)
166+
u.mu.RLock()
167+
for_,sub:=rangeu.subs {
168+
sub:=sub
169+
wg.Add(1)
170+
gofunc() {
171+
sub.send(latest)
172+
deferwg.Done()
173+
}()
174+
}
175+
u.mu.RUnlock()
176+
177+
u.mu.Lock()
178+
u.latest=latest
179+
u.mu.Unlock()
180+
wg.Wait()
181+
}
182+
183+
func (u*updatesProvider)Subscribe(peerID uuid.UUID,userID uuid.UUID) (<-chan*proto.WorkspaceUpdate,error) {
184+
u.mu.Lock()
185+
deferu.mu.Unlock()
186+
187+
tx:=make(chan*proto.WorkspaceUpdate,1)
188+
sub:=&sub{
189+
userID:userID,
190+
tx:tx,
191+
prev:make(workspacesByID),
192+
}
193+
u.subs[peerID]=sub
194+
// Write initial state
195+
sub.send(u.latest)
196+
returntx,nil
197+
}
198+
199+
func (u*updatesProvider)Unsubscribe(peerID uuid.UUID) {
200+
u.mu.Lock()
201+
deferu.mu.Unlock()
202+
203+
sub,exists:=u.subs[peerID]
204+
if!exists {
205+
return
206+
}
207+
close(sub.tx)
208+
delete(u.subs,peerID)
209+
}
210+
211+
funcproduceUpdate(old,newworkspacesByID)*proto.WorkspaceUpdate {
212+
out:=&proto.WorkspaceUpdate{
213+
UpsertedWorkspaces: []*proto.Workspace{},
214+
UpsertedAgents: []*proto.Agent{},
215+
DeletedWorkspaces: []*proto.Workspace{},
216+
DeletedAgents: []*proto.Agent{},
217+
}
218+
219+
forwsID,newWorkspace:=rangenew {
220+
oldWorkspace,exists:=old[wsID]
221+
// Upsert both workspace and agents if the workspace is new
222+
if!exists {
223+
out.UpsertedWorkspaces=append(out.UpsertedWorkspaces,&proto.Workspace{
224+
Id:tailnet.UUIDToByteSlice(wsID),
225+
Name:newWorkspace.WorkspaceName,
226+
Status:convertStatus(newWorkspace.JobStatus,newWorkspace.Transition),
227+
})
228+
for_,agent:=rangenewWorkspace.Agents {
229+
out.UpsertedAgents=append(out.UpsertedAgents,&proto.Agent{
230+
Id:tailnet.UUIDToByteSlice(agent.ID),
231+
Name:agent.Name,
232+
WorkspaceId:tailnet.UUIDToByteSlice(wsID),
233+
})
234+
}
235+
continue
236+
}
237+
// Upsert workspace if the workspace is updated
238+
if!newWorkspace.Equal(oldWorkspace) {
239+
out.UpsertedWorkspaces=append(out.UpsertedWorkspaces,&proto.Workspace{
240+
Id:tailnet.UUIDToByteSlice(wsID),
241+
Name:newWorkspace.WorkspaceName,
242+
Status:convertStatus(newWorkspace.JobStatus,newWorkspace.Transition),
243+
})
244+
}
245+
246+
add,remove:=slice.SymmetricDifference(oldWorkspace.Agents,newWorkspace.Agents)
247+
for_,agent:=rangeadd {
248+
out.UpsertedAgents=append(out.UpsertedAgents,&proto.Agent{
249+
Id:tailnet.UUIDToByteSlice(agent.ID),
250+
Name:agent.Name,
251+
WorkspaceId:tailnet.UUIDToByteSlice(wsID),
252+
})
253+
}
254+
for_,agent:=rangeremove {
255+
out.DeletedAgents=append(out.DeletedAgents,&proto.Agent{
256+
Id:tailnet.UUIDToByteSlice(agent.ID),
257+
Name:agent.Name,
258+
WorkspaceId:tailnet.UUIDToByteSlice(wsID),
259+
})
260+
}
261+
}
262+
263+
// Delete workspace and agents if the workspace is deleted
264+
forwsID,oldWorkspace:=rangeold {
265+
if_,exists:=new[wsID];!exists {
266+
out.DeletedWorkspaces=append(out.DeletedWorkspaces,&proto.Workspace{
267+
Id:tailnet.UUIDToByteSlice(wsID),
268+
Name:oldWorkspace.WorkspaceName,
269+
Status:convertStatus(oldWorkspace.JobStatus,oldWorkspace.Transition),
270+
})
271+
for_,agent:=rangeoldWorkspace.Agents {
272+
out.DeletedAgents=append(out.DeletedAgents,&proto.Agent{
273+
Id:tailnet.UUIDToByteSlice(agent.ID),
274+
Name:agent.Name,
275+
WorkspaceId:tailnet.UUIDToByteSlice(wsID),
276+
})
277+
}
278+
}
279+
}
280+
281+
returnout
282+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp