Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat: Make workspace watching realtime instead of polling#4922

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
kylecarbs merged 2 commits intomainfromrealtimews
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletioncoderd/activitybump.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -54,7 +54,7 @@ func activityBumpWorkspace(log slog.Logger, db database.Store, workspace databas

newDeadline := database.Now().Add(bumpAmount)

if err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
if_,err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
ID: build.ID,
UpdatedAt: database.Now(),
ProvisionerState: build.ProvisionerState,
Expand Down
6 changes: 3 additions & 3 deletionscoderd/database/databasefake/databasefake.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -2823,7 +2823,7 @@ func (q *fakeQuerier) UpdateWorkspaceLastUsedAt(_ context.Context, arg database.
return sql.ErrNoRows
}

func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.UpdateWorkspaceBuildByIDParams) error {
func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.UpdateWorkspaceBuildByIDParams)(database.WorkspaceBuild,error) {
q.mutex.Lock()
defer q.mutex.Unlock()

Expand All@@ -2835,9 +2835,9 @@ func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.U
workspaceBuild.ProvisionerState = arg.ProvisionerState
workspaceBuild.Deadline = arg.Deadline
q.workspaceBuilds[index] = workspaceBuild
return nil
returnworkspaceBuild,nil
}
return sql.ErrNoRows
returndatabase.WorkspaceBuild{},sql.ErrNoRows
}

func (q *fakeQuerier) UpdateWorkspaceDeletedByID(_ context.Context, arg database.UpdateWorkspaceDeletedByIDParams) error {
Expand Down
2 changes: 1 addition & 1 deletioncoderd/database/querier.go
View file
Open in desktop

Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.

25 changes: 20 additions & 5 deletionscoderd/database/queries.sql.go
View file
Open in desktop

Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.

4 changes: 2 additions & 2 deletionscoderd/database/queries/workspacebuilds.sql
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -124,12 +124,12 @@ INSERT INTO
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING *;

-- name: UpdateWorkspaceBuildByID :exec
-- name: UpdateWorkspaceBuildByID :one
UPDATE
workspace_builds
SET
updated_at = $2,
provisioner_state = $3,
deadline = $4
WHERE
id = $1;
id = $1 RETURNING *;
14 changes: 10 additions & 4 deletionscoderd/httpapi/httpapi.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -228,14 +228,20 @@ func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (sendEvent f
buf := &bytes.Buffer{}
enc := json.NewEncoder(buf)

_, err := buf.WriteString(fmt.Sprintf("event: %s\ndata:", sse.Type))
_, err := buf.WriteString(fmt.Sprintf("event: %s\n", sse.Type))
if err != nil {
return err
}

err = enc.Encode(sse.Data)
if err != nil {
return err
if sse.Data != nil {
_, err = buf.WriteString("data: ")
if err != nil {
return err
}
err = enc.Encode(sse.Data)
if err != nil {
return err
}
}

err = buf.WriteByte('\n')
Expand Down
17 changes: 15 additions & 2 deletionscoderd/provisionerdaemons.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -223,6 +223,10 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty
if err != nil {
return nil, failJob(fmt.Sprintf("get owner: %s", err))
}
err = server.Pubsub.Publish(watchWorkspaceChannel(workspace.ID), []byte{})
if err != nil {
return nil, failJob(fmt.Sprintf("publish workspace update: %s", err))
}

// Compute parameters for the workspace to consume.
parameters, err := parameter.Compute(ctx, server.Database, parameter.ComputeScope{
Expand DownExpand Up@@ -543,7 +547,7 @@ func (server *provisionerdServer) FailJob(ctx context.Context, failJob *proto.Fa
if err != nil {
return nil, xerrors.Errorf("unmarshal workspace provision input: %w", err)
}
err = server.Database.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
build,err:= server.Database.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
ID: input.WorkspaceBuildID,
UpdatedAt: database.Now(),
ProvisionerState: jobType.WorkspaceBuild.State,
Expand All@@ -552,6 +556,10 @@ func (server *provisionerdServer) FailJob(ctx context.Context, failJob *proto.Fa
if err != nil {
return nil, xerrors.Errorf("update workspace build state: %w", err)
}
err = server.Pubsub.Publish(watchWorkspaceChannel(build.WorkspaceID), []byte{})
if err != nil {
return nil, xerrors.Errorf("update workspace: %w", err)
}
case *proto.FailedJob_TemplateImport_:
}

Expand DownExpand Up@@ -657,7 +665,7 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
if err != nil {
return xerrors.Errorf("update provisioner job: %w", err)
}
err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
_,err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
ID: workspaceBuild.ID,
Deadline: workspaceDeadline,
ProvisionerState: jobType.WorkspaceBuild.State,
Expand DownExpand Up@@ -692,6 +700,11 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
if err != nil {
return nil, xerrors.Errorf("complete job: %w", err)
}

err = server.Pubsub.Publish(watchWorkspaceChannel(workspaceBuild.WorkspaceID), []byte{})
if err != nil {
return nil, xerrors.Errorf("update workspace: %w", err)
}
case *proto.CompletedJob_TemplateDryRun_:
for _, resource := range jobType.TemplateDryRun.Resources {
server.Logger.Info(ctx, "inserting template dry-run job resource",
Expand Down
28 changes: 28 additions & 0 deletionscoderd/workspaceagents.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -539,13 +539,15 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request
Valid: true,
}
_ = updateConnectionTimes()
_ = api.Pubsub.Publish(watchWorkspaceChannel(build.WorkspaceID), []byte{})
}()

err = updateConnectionTimes()
if err != nil {
_ = conn.Close(websocket.StatusGoingAway, err.Error())
return
}
api.publishWorkspaceUpdate(ctx, build.WorkspaceID)

// End span so we don't get long lived trace data.
tracing.EndHTTPSpan(r, http.StatusOK, trace.SpanFromContext(ctx))
Expand DownExpand Up@@ -972,6 +974,32 @@ func (api *API) postWorkspaceAppHealth(rw http.ResponseWriter, r *http.Request)
}
}

resource, err := api.Database.GetWorkspaceResourceByID(r.Context(), workspaceAgent.ResourceID)
if err != nil {
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching workspace resource.",
Detail: err.Error(),
})
return
}
job, err := api.Database.GetWorkspaceBuildByJobID(r.Context(), resource.JobID)
if err != nil {
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching workspace build.",
Detail: err.Error(),
})
return
}
workspace, err := api.Database.GetWorkspaceByID(r.Context(), job.WorkspaceID)
if err != nil {
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching workspace.",
Detail: err.Error(),
})
return
}
api.publishWorkspaceUpdate(r.Context(), workspace.ID)

httpapi.Write(r.Context(), rw, http.StatusOK, nil)
}

Expand Down
5 changes: 5 additions & 0 deletionscoderd/workspacebuilds.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -574,6 +574,8 @@ func (api *API) postWorkspaceBuilds(rw http.ResponseWriter, r *http.Request) {
return
}

api.publishWorkspaceUpdate(ctx, workspace.ID)

httpapi.Write(ctx, rw, http.StatusCreated, apiBuild)
}

Expand DownExpand Up@@ -632,6 +634,9 @@ func (api *API) patchCancelWorkspaceBuild(rw http.ResponseWriter, r *http.Reques
})
return
}

api.publishWorkspaceUpdate(ctx, workspace.ID)

httpapi.Write(ctx, rw, http.StatusOK, codersdk.Response{
Message: "Job has been marked as canceled...",
})
Expand Down
105 changes: 68 additions & 37 deletionscoderd/workspaces.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -634,6 +634,8 @@ func (api *API) patchWorkspace(rw http.ResponseWriter, r *http.Request) {
return
}

api.publishWorkspaceUpdate(ctx, workspace.ID)

aReq.New = newWorkspace
rw.WriteHeader(http.StatusNoContent)
}
Expand DownExpand Up@@ -839,7 +841,7 @@ func (api *API) putExtendWorkspace(rw http.ResponseWriter, r *http.Request) {
return err
}

if err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
if_,err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
ID: build.ID,
UpdatedAt: build.UpdatedAt,
ProvisionerState: build.ProvisionerState,
Expand DownExpand Up@@ -883,48 +885,65 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
// Ignore all trace spans after this, they're not too useful.
ctx = trace.ContextWithSpan(ctx, tracing.NoopSpan)

t := time.NewTicker(time.Second * 1)
defer t.Stop()
cancelSubscribe, err := api.Pubsub.Subscribe(watchWorkspaceChannel(workspace.ID), func(_ context.Context, _ []byte) {
workspace, err := api.Database.GetWorkspaceByID(ctx, workspace.ID)
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error fetching workspace.",
Detail: err.Error(),
},
})
return
}

data, err := api.workspaceData(ctx, []database.Workspace{workspace})
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error fetching workspace data.",
Detail: err.Error(),
},
})
return
}

_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeData,
Data: convertWorkspace(
workspace,
data.builds[0],
data.templates[0],
findUser(workspace.OwnerID, data.users),
),
})
})
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error subscribing to workspace events.",
Detail: err.Error(),
},
})
return
}
defer cancelSubscribe()

// An initial ping signals to the request that the server is now ready
// and the client can begin servicing a channel with data.
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypePing,
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Is this needed becauseServerSentEventSender() doesn't send the first ping until the timeout has been hit once? Should we "prime" it in there instead?

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I think we want the caller to indicate readiness by sending theping themselves.

If we don't a race can occur where the client's request has been completed butsubscribe hasn't been triggered yet. We could fix this by moving the initialization below the subscribe and making it send a ping, which seems more reasonable.

Thoughts?

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Seems like this is a bit trickier than I thought... because we use thesendEvent callback in theSubscribe function, we can't initialize after safely. Seems like we'll have to keep it the way it is, but I'll add a comment.

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I added a comment and will leave it as-is for now because nothing comes to mind as being cleaner... if you have ideas leave em here and I'll implement!

Copy link
Member

@mafredrimafredriNov 7, 2022
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Hmm, I don't fully understand the race scenario, but I just realized that thehttpapi.Write above (if err) could be problematic since we've already primedrw for server side events inhttpapi.ServerSentEventSender (and started a timer for pings). So it might be a good idea to change things around a bit 👍🏻. If the subscription fails, I believe this handler wouldn't return until the first SSE auto-ping has been sent (because of the defer <-chan above).

kylecarbs reacted with thumbs up emoji
Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Fixed that error case! Goooood catch


for {
select {
case <-ctx.Done():
return
case <-senderClosed:
return
case <-t.C:
workspace, err := api.Database.GetWorkspaceByID(ctx, workspace.ID)
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error fetching workspace.",
Detail: err.Error(),
},
})
return
}

data, err := api.workspaceData(ctx, []database.Workspace{workspace})
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error fetching workspace data.",
Detail: err.Error(),
},
})
return
}

_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeData,
Data: convertWorkspace(
workspace,
data.builds[0],
data.templates[0],
findUser(workspace.OwnerID, data.users),
),
})
}
}
}
Expand DownExpand Up@@ -1213,3 +1232,15 @@ func splitQueryParameterByDelimiter(query string, delimiter rune, maintainQuotes

return parts
}

func watchWorkspaceChannel(id uuid.UUID) string {
return fmt.Sprintf("workspace:%s", id)
}

func (api *API) publishWorkspaceUpdate(ctx context.Context, workspaceID uuid.UUID) {
err := api.Pubsub.Publish(watchWorkspaceChannel(workspaceID), []byte{})
if err != nil {
api.Logger.Warn(ctx, "failed to publish workspace update",
slog.F("workspace_id", workspaceID), slog.Error(err))
}
}
Loading

[8]ページ先頭

©2009-2025 Movatter.jp