Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb1298a3

Browse files
feat: add WorkspaceUpdates tailnet RPC (#14847)
Closes#14716Closes#14717Adds a new user-scoped tailnet API endpoint (`api/v2/tailnet`) with a new RPC stream for receiving updates on workspaces owned by a specific user, as defined in#14716. When a stream is started, the `WorkspaceUpdatesProvider` will begin listening on the user-scoped pubsub events implemented in#14964. When a relevant event type is seen (such as a workspace state transition), the provider will query the DB for all the workspaces (and agents) owned by the user. This gets compared against the result of the previous query to produce a set of workspace updates. Workspace updates can be requested for any user ID, however only workspaces the authorised user is permitted to `ActionRead` will have their updates streamed.Opening a tunnel to an agent requires that the user can perform `ActionSSH` against the workspace containing it.
1 parentf941e78 commitb1298a3

25 files changed

+2220
-271
lines changed

‎coderd/apidoc/docs.go

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.

‎coderd/apidoc/swagger.json

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.

‎coderd/coderd.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,8 @@ func New(options *Options) *API {
493493
}
494494
}
495495

496+
updatesProvider:=NewUpdatesProvider(options.Logger.Named("workspace_updates"),options.Pubsub,options.Database,options.Authorizer)
497+
496498
// Start a background process that rotates keys. We intentionally start this after the caches
497499
// are created to force initial requests for a key to populate the caches. This helps catch
498500
// bugs that may only occur when a key isn't precached in tests and the latency cost is minimal.
@@ -523,6 +525,7 @@ func New(options *Options) *API {
523525
metricsCache:metricsCache,
524526
Auditor: atomic.Pointer[audit.Auditor]{},
525527
TailnetCoordinator: atomic.Pointer[tailnet.Coordinator]{},
528+
UpdatesProvider:updatesProvider,
526529
TemplateScheduleStore:options.TemplateScheduleStore,
527530
UserQuietHoursScheduleStore:options.UserQuietHoursScheduleStore,
528531
AccessControlStore:options.AccessControlStore,
@@ -652,12 +655,13 @@ func New(options *Options) *API {
652655
panic("CoordinatorResumeTokenProvider is nil")
653656
}
654657
api.TailnetClientService,err=tailnet.NewClientService(tailnet.ClientServiceOptions{
655-
Logger:api.Logger.Named("tailnetclient"),
656-
CoordPtr:&api.TailnetCoordinator,
657-
DERPMapUpdateFrequency:api.Options.DERPMapUpdateFrequency,
658-
DERPMapFn:api.DERPMap,
659-
NetworkTelemetryHandler:api.NetworkTelemetryBatcher.Handler,
660-
ResumeTokenProvider:api.Options.CoordinatorResumeTokenProvider,
658+
Logger:api.Logger.Named("tailnetclient"),
659+
CoordPtr:&api.TailnetCoordinator,
660+
DERPMapUpdateFrequency:api.Options.DERPMapUpdateFrequency,
661+
DERPMapFn:api.DERPMap,
662+
NetworkTelemetryHandler:api.NetworkTelemetryBatcher.Handler,
663+
ResumeTokenProvider:api.Options.CoordinatorResumeTokenProvider,
664+
WorkspaceUpdatesProvider:api.UpdatesProvider,
661665
})
662666
iferr!=nil {
663667
api.Logger.Fatal(context.Background(),"failed to initialize tailnet client service",slog.Error(err))
@@ -1327,6 +1331,10 @@ func New(options *Options) *API {
13271331
})
13281332
r.Get("/dispatch-methods",api.notificationDispatchMethods)
13291333
})
1334+
r.Route("/tailnet",func(r chi.Router) {
1335+
r.Use(apiKeyMiddleware)
1336+
r.Get("/",api.tailnetRPCConn)
1337+
})
13301338
})
13311339

13321340
ifoptions.SwaggerEndpoint {
@@ -1408,6 +1416,8 @@ type API struct {
14081416
AccessControlStore*atomic.Pointer[dbauthz.AccessControlStore]
14091417
PortSharer atomic.Pointer[portsharing.PortSharer]
14101418

1419+
UpdatesProvider tailnet.WorkspaceUpdatesProvider
1420+
14111421
HTTPAuth*HTTPAuthorizer
14121422

14131423
// APIHandler serves "/api/v2"
@@ -1489,6 +1499,7 @@ func (api *API) Close() error {
14891499
_=api.OIDCConvertKeyCache.Close()
14901500
_=api.AppSigningKeyCache.Close()
14911501
_=api.AppEncryptionKeyCache.Close()
1502+
_=api.UpdatesProvider.Close()
14921503
returnnil
14931504
}
14941505

‎coderd/database/dbfake/dbfake.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,14 @@ func (b WorkspaceBuildBuilder) Do() WorkspaceResponse {
224224
}
225225
_=dbgen.WorkspaceBuildParameters(b.t,b.db,b.params)
226226

227+
ifb.ws.Deleted {
228+
err=b.db.UpdateWorkspaceDeletedByID(ownerCtx, database.UpdateWorkspaceDeletedByIDParams{
229+
ID:b.ws.ID,
230+
Deleted:true,
231+
})
232+
require.NoError(b.t,err)
233+
}
234+
227235
ifb.ps!=nil {
228236
msg,err:=json.Marshal(wspubsub.WorkspaceEvent{
229237
Kind:wspubsub.WorkspaceEventKindStateChange,

‎coderd/workspaceagents.go

Lines changed: 114 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/coder/coder/v2/coderd/httpapi"
3434
"github.com/coder/coder/v2/coderd/httpmw"
3535
"github.com/coder/coder/v2/coderd/jwtutils"
36+
"github.com/coder/coder/v2/coderd/rbac"
3637
"github.com/coder/coder/v2/coderd/rbac/policy"
3738
"github.com/coder/coder/v2/coderd/wspubsub"
3839
"github.com/coder/coder/v2/codersdk"
@@ -844,31 +845,10 @@ func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.R
844845
return
845846
}
846847

847-
// Accept a resume_token query parameter to use the same peer ID.
848-
var (
849-
peerID=uuid.New()
850-
resumeToken=r.URL.Query().Get("resume_token")
851-
)
852-
ifresumeToken!="" {
853-
varerrerror
854-
peerID,err=api.Options.CoordinatorResumeTokenProvider.VerifyResumeToken(ctx,resumeToken)
855-
// If the token is missing the key ID, it's probably an old token in which
856-
// case we just want to generate a new peer ID.
857-
ifxerrors.Is(err,jwtutils.ErrMissingKeyID) {
858-
peerID=uuid.New()
859-
}elseiferr!=nil {
860-
httpapi.Write(ctx,rw,http.StatusUnauthorized, codersdk.Response{
861-
Message:workspacesdk.CoordinateAPIInvalidResumeToken,
862-
Detail:err.Error(),
863-
Validations: []codersdk.ValidationError{
864-
{Field:"resume_token",Detail:workspacesdk.CoordinateAPIInvalidResumeToken},
865-
},
866-
})
867-
return
868-
}else {
869-
api.Logger.Debug(ctx,"accepted coordinate resume token for peer",
870-
slog.F("peer_id",peerID.String()))
871-
}
848+
peerID,err:=api.handleResumeToken(ctx,rw,r)
849+
iferr!=nil {
850+
// handleResumeToken has already written the response.
851+
return
872852
}
873853

874854
api.WebsocketWaitMutex.Lock()
@@ -891,13 +871,47 @@ func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.R
891871
gohttpapi.Heartbeat(ctx,conn)
892872

893873
deferconn.Close(websocket.StatusNormalClosure,"")
894-
err=api.TailnetClientService.ServeClient(ctx,version,wsNetConn,peerID,workspaceAgent.ID)
874+
err=api.TailnetClientService.ServeClient(ctx,version,wsNetConn, tailnet.StreamID{
875+
Name:"client",
876+
ID:peerID,
877+
Auth: tailnet.ClientCoordinateeAuth{
878+
AgentID:workspaceAgent.ID,
879+
},
880+
})
895881
iferr!=nil&&!xerrors.Is(err,io.EOF)&&!xerrors.Is(err,context.Canceled) {
896882
_=conn.Close(websocket.StatusInternalError,err.Error())
897883
return
898884
}
899885
}
900886

887+
// handleResumeToken accepts a resume_token query parameter to use the same peer ID
888+
func (api*API)handleResumeToken(ctx context.Context,rw http.ResponseWriter,r*http.Request) (peerID uuid.UUID,errerror) {
889+
peerID=uuid.New()
890+
resumeToken:=r.URL.Query().Get("resume_token")
891+
ifresumeToken!="" {
892+
peerID,err=api.Options.CoordinatorResumeTokenProvider.VerifyResumeToken(ctx,resumeToken)
893+
// If the token is missing the key ID, it's probably an old token in which
894+
// case we just want to generate a new peer ID.
895+
ifxerrors.Is(err,jwtutils.ErrMissingKeyID) {
896+
peerID=uuid.New()
897+
err=nil
898+
}elseiferr!=nil {
899+
httpapi.Write(ctx,rw,http.StatusUnauthorized, codersdk.Response{
900+
Message:workspacesdk.CoordinateAPIInvalidResumeToken,
901+
Detail:err.Error(),
902+
Validations: []codersdk.ValidationError{
903+
{Field:"resume_token",Detail:workspacesdk.CoordinateAPIInvalidResumeToken},
904+
},
905+
})
906+
returnpeerID,err
907+
}else {
908+
api.Logger.Debug(ctx,"accepted coordinate resume token for peer",
909+
slog.F("peer_id",peerID.String()))
910+
}
911+
}
912+
returnpeerID,err
913+
}
914+
901915
// @Summary Post workspace agent log source
902916
// @ID post-workspace-agent-log-source
903917
// @Security CoderSessionToken
@@ -1469,6 +1483,80 @@ func (api *API) workspaceAgentsExternalAuthListen(ctx context.Context, rw http.R
14691483
}
14701484
}
14711485

1486+
// @Summary User-scoped tailnet RPC connection
1487+
// @ID user-scoped-tailnet-rpc-connection
1488+
// @Security CoderSessionToken
1489+
// @Tags Agents
1490+
// @Success 101
1491+
// @Router /tailnet [get]
1492+
func (api*API)tailnetRPCConn(rw http.ResponseWriter,r*http.Request) {
1493+
ctx:=r.Context()
1494+
1495+
version:="2.0"
1496+
qv:=r.URL.Query().Get("version")
1497+
ifqv!="" {
1498+
version=qv
1499+
}
1500+
iferr:=proto.CurrentVersion.Validate(version);err!=nil {
1501+
httpapi.Write(ctx,rw,http.StatusBadRequest, codersdk.Response{
1502+
Message:"Unknown or unsupported API version",
1503+
Validations: []codersdk.ValidationError{
1504+
{Field:"version",Detail:err.Error()},
1505+
},
1506+
})
1507+
return
1508+
}
1509+
1510+
peerID,err:=api.handleResumeToken(ctx,rw,r)
1511+
iferr!=nil {
1512+
// handleResumeToken has already written the response.
1513+
return
1514+
}
1515+
1516+
// Used to authorize tunnel request
1517+
sshPrep,err:=api.HTTPAuth.AuthorizeSQLFilter(r,policy.ActionSSH,rbac.ResourceWorkspace.Type)
1518+
iferr!=nil {
1519+
httpapi.Write(ctx,rw,http.StatusInternalServerError, codersdk.Response{
1520+
Message:"Internal error preparing sql filter.",
1521+
Detail:err.Error(),
1522+
})
1523+
return
1524+
}
1525+
1526+
api.WebsocketWaitMutex.Lock()
1527+
api.WebsocketWaitGroup.Add(1)
1528+
api.WebsocketWaitMutex.Unlock()
1529+
deferapi.WebsocketWaitGroup.Done()
1530+
1531+
conn,err:=websocket.Accept(rw,r,nil)
1532+
iferr!=nil {
1533+
httpapi.Write(ctx,rw,http.StatusBadRequest, codersdk.Response{
1534+
Message:"Failed to accept websocket.",
1535+
Detail:err.Error(),
1536+
})
1537+
return
1538+
}
1539+
ctx,wsNetConn:=codersdk.WebsocketNetConn(ctx,conn,websocket.MessageBinary)
1540+
deferwsNetConn.Close()
1541+
deferconn.Close(websocket.StatusNormalClosure,"")
1542+
1543+
gohttpapi.Heartbeat(ctx,conn)
1544+
err=api.TailnetClientService.ServeClient(ctx,version,wsNetConn, tailnet.StreamID{
1545+
Name:"client",
1546+
ID:peerID,
1547+
Auth: tailnet.ClientUserCoordinateeAuth{
1548+
Auth:&rbacAuthorizer{
1549+
sshPrep:sshPrep,
1550+
db:api.Database,
1551+
},
1552+
},
1553+
})
1554+
iferr!=nil&&!xerrors.Is(err,io.EOF)&&!xerrors.Is(err,context.Canceled) {
1555+
_=conn.Close(websocket.StatusInternalError,err.Error())
1556+
return
1557+
}
1558+
}
1559+
14721560
// createExternalAuthResponse creates an ExternalAuthResponse based on the
14731561
// provider type. This is to support legacy `/workspaceagents/me/gitauth`
14741562
// which uses `Username` and `Password`.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp