Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

fix: optimize queue position sql query#17974

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
evgeniy-scherbina merged 4 commits intomainfromyevhenii/optimize-sql-query
May 28, 2025
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletioncoderd/database/dbauthz/dbauthz.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -2341,7 +2341,7 @@ func (q *querier) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUID)
return provisionerJobs, nil
}

func (q *querier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, ids[]uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
func (q *querier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, idsdatabase.GetProvisionerJobsByIDsWithQueuePositionParams) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
// TODO: Remove this once we have a proper rbac check for provisioner jobs.
// Details in https://github.com/coder/coder/issues/16160
return q.db.GetProvisionerJobsByIDsWithQueuePosition(ctx, ids)
Expand Down
2 changes: 1 addition & 1 deletioncoderd/database/dbauthz/dbauthz_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -4345,7 +4345,7 @@ func (s *MethodTestSuite) TestSystemFunctions() {
check.Args([]uuid.UUID{uuid.New()}).Asserts(rbac.ResourceSystem, policy.ActionRead)
}))
s.Run("GetProvisionerJobsByIDsWithQueuePosition", s.Subtest(func(db database.Store, check *expects) {
check.Args([]uuid.UUID{}).Asserts()
check.Args(database.GetProvisionerJobsByIDsWithQueuePositionParams{}).Asserts()
}))
s.Run("GetReplicaByID", s.Subtest(func(db database.Store, check *expects) {
check.Args(uuid.New()).Asserts(rbac.ResourceSystem, policy.ActionRead).Errors(sql.ErrNoRows)
Expand Down
8 changes: 4 additions & 4 deletionscoderd/database/dbmem/dbmem.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -4684,14 +4684,14 @@ func (q *FakeQuerier) GetProvisionerJobsByIDs(_ context.Context, ids []uuid.UUID
return jobs, nil
}

func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context,ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context,arg database.GetProvisionerJobsByIDsWithQueuePositionParams) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()

ifids == nil {
ids = []uuid.UUID{}
ifarg.IDs == nil {
arg.IDs = []uuid.UUID{}
}
return q.getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(ctx,ids)
return q.getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(ctx,arg.IDs)
}

func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx context.Context, arg database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams) ([]database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, error) {
Expand Down
2 changes: 1 addition & 1 deletioncoderd/database/dbmetrics/querymetrics.go
View file
Open in desktop

Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.

8 changes: 4 additions & 4 deletionscoderd/database/dbmock/dbmock.go
View file
Open in desktop

Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.

2 changes: 1 addition & 1 deletioncoderd/database/querier.go
View file
Open in desktop

Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.

27 changes: 21 additions & 6 deletionscoderd/database/querier_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -15,7 +15,6 @@ import (
"github.com/stretchr/testify/require"

"cdr.dev/slog/sloggers/slogtest"

"github.com/coder/coder/v2/coderd/coderdtest"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/db2sdk"
Expand All@@ -27,6 +26,7 @@ import (
"github.com/coder/coder/v2/coderd/database/migrations"
"github.com/coder/coder/v2/coderd/httpmw"
"github.com/coder/coder/v2/coderd/prebuilds"
"github.com/coder/coder/v2/coderd/provisionerdserver"
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/rbac/policy"
"github.com/coder/coder/v2/provisionersdk"
Expand DownExpand Up@@ -1268,7 +1268,10 @@ func TestQueuePosition(t *testing.T) {
Tags: database.StringMap{},
})

queued, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
queued, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: jobIDs,
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
require.NoError(t, err)
require.Len(t, queued, jobCount)
sort.Slice(queued, func(i, j int) bool {
Expand DownExpand Up@@ -1296,7 +1299,10 @@ func TestQueuePosition(t *testing.T) {
require.NoError(t, err)
require.Equal(t, jobs[0].ID, job.ID)

queued, err = db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
queued, err = db.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: jobIDs,
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
require.NoError(t, err)
require.Len(t, queued, jobCount)
sort.Slice(queued, func(i, j int) bool {
Expand DownExpand Up@@ -2550,7 +2556,10 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) {
}

// When: we fetch the jobs by their IDs
actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, filteredJobIDs)
actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: filteredJobIDs,
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
require.NoError(t, err)
require.Len(t, actualJobs, len(filteredJobs), "should return all unskipped jobs")

Expand DownExpand Up@@ -2693,7 +2702,10 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) {
}

// When: we fetch the jobs by their IDs
actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: jobIDs,
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
require.NoError(t, err)
require.Len(t, actualJobs, len(allJobs), "should return all jobs")

Expand DownExpand Up@@ -2788,7 +2800,10 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_OrderValidation(t *testing.T)
}

// When: we fetch the jobs by their IDs
actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: jobIDs,
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
require.NoError(t, err)
require.Len(t, actualJobs, len(allJobs), "should return all jobs")

Expand Down
21 changes: 15 additions & 6 deletionscoderd/database/queries.sql.go
View file
Open in desktop

Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.

12 changes: 8 additions & 4 deletionscoderd/database/queries/provisionerjobs.sql
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -80,17 +80,21 @@ pending_jobs AS (
WHERE
job_status = 'pending'
),
online_provisioner_daemons AS (
SELECT id, tags FROM provisioner_daemons pd
WHERE pd.last_seen_at IS NOT NULL AND pd.last_seen_at >= (NOW() - (@stale_interval_ms::bigint || ' ms')::interval)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This should probably be a more descriptive param likeprovisioner_daemon_stale_interval_ms

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

We use@stale_interval_ms everywhere - so I kept it for consistency.

),
ranked_jobs AS (
-- Step 3: Rank only pending jobs based on provisioner availability
SELECT
pj.id,
pj.created_at,
ROW_NUMBER() OVER (PARTITION BYpd.id ORDER BY pj.created_at ASC) AS queue_position,
COUNT(*) OVER (PARTITION BYpd.id) AS queue_size
ROW_NUMBER() OVER (PARTITION BYopd.id ORDER BY pj.created_at ASC) AS queue_position,
COUNT(*) OVER (PARTITION BYopd.id) AS queue_size
FROM
pending_jobs pj
INNER JOINprovisioner_daemons pd
ON provisioner_tagset_contains(pd.tags, pj.tags) -- Join only on the small pending set
INNER JOINonline_provisioner_daemons opd
ON provisioner_tagset_contains(opd.tags, pj.tags) -- Join only on the small pending set
),
final_jobs AS (
-- Step 4: Compute best queue position and max queue size per job
Expand Down
35 changes: 28 additions & 7 deletionscoderd/templateversions.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -53,7 +53,10 @@ func (api *API) templateVersion(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()
templateVersion := httpmw.TemplateVersionParam(r)

jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, []uuid.UUID{templateVersion.JobID})
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: []uuid.UUID{templateVersion.JobID},
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
if err != nil || len(jobs) == 0 {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching provisioner job.",
Expand DownExpand Up@@ -182,7 +185,10 @@ func (api *API) patchTemplateVersion(rw http.ResponseWriter, r *http.Request) {
return
}

jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, []uuid.UUID{templateVersion.JobID})
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: []uuid.UUID{templateVersion.JobID},
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
if err != nil || len(jobs) == 0 {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching provisioner job.",
Expand DownExpand Up@@ -733,7 +739,10 @@ func (api *API) fetchTemplateVersionDryRunJob(rw http.ResponseWriter, r *http.Re
return database.GetProvisionerJobsByIDsWithQueuePositionRow{}, false
}

jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, []uuid.UUID{jobUUID})
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: []uuid.UUID{jobUUID},
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
if httpapi.Is404Error(err) {
httpapi.Write(ctx, rw, http.StatusNotFound, codersdk.Response{
Message: fmt.Sprintf("Provisioner job %q not found.", jobUUID),
Expand DownExpand Up@@ -865,7 +874,10 @@ func (api *API) templateVersionsByTemplate(rw http.ResponseWriter, r *http.Reque
for _, version := range versions {
jobIDs = append(jobIDs, version.JobID)
}
jobs, err := store.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
jobs, err := store.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: jobIDs,
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching provisioner job.",
Expand DownExpand Up@@ -933,7 +945,10 @@ func (api *API) templateVersionByName(rw http.ResponseWriter, r *http.Request) {
})
return
}
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, []uuid.UUID{templateVersion.JobID})
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: []uuid.UUID{templateVersion.JobID},
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
if err != nil || len(jobs) == 0 {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching provisioner job.",
Expand DownExpand Up@@ -1013,7 +1028,10 @@ func (api *API) templateVersionByOrganizationTemplateAndName(rw http.ResponseWri
})
return
}
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, []uuid.UUID{templateVersion.JobID})
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: []uuid.UUID{templateVersion.JobID},
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
if err != nil || len(jobs) == 0 {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching provisioner job.",
Expand DownExpand Up@@ -1115,7 +1133,10 @@ func (api *API) previousTemplateVersionByOrganizationTemplateAndName(rw http.Res
return
}

jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, []uuid.UUID{previousTemplateVersion.JobID})
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: []uuid.UUID{previousTemplateVersion.JobID},
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
if err != nil || len(jobs) == 0 {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching provisioner job.",
Expand Down
5 changes: 4 additions & 1 deletioncoderd/workspacebuilds.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -797,7 +797,10 @@ func (api *API) workspaceBuildsData(ctx context.Context, workspaceBuilds []datab
for _, build := range workspaceBuilds {
jobIDs = append(jobIDs, build.JobID)
}
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: jobIDs,
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return workspaceBuildsData{}, xerrors.Errorf("get provisioner jobs: %w", err)
}
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp