Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb85ba58

Browse files
fix(coderd/database): consider tag sets when calculating queue position (#16685)
Relates to#15843## PR Contents- Reimplementation of the `GetProvisionerJobsByIDsWithQueuePosition` SQLquery to **take into account** provisioner job tags and provisionerdaemon tags.- Unit tests covering different **tag sets**, **job statuses**, and**job ordering** scenarios.## Notes- The original row order is preserved by introducing the `ordinality`field.- Unnecessary rows are filtered as early as possible to ensure thatexpensive joins operate on a smaller dataset.- A "fake" join with `provisioner_jobs` is added at the end to ensure`sqlc.embed` compiles successfully.- **Backward compatibility is preserved**—only the SQL query has beenupdated, while the Go code remains unchanged.
1 parent7637d39 commitb85ba58

File tree

7 files changed

+658
-67
lines changed

7 files changed

+658
-67
lines changed

‎coderd/database/dbmem/dbmem.go

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1149,7 +1149,119 @@ func getOwnerFromTags(tags map[string]string) string {
11491149
return""
11501150
}
11511151

1152-
func (q*FakeQuerier)getProvisionerJobsByIDsWithQueuePositionLocked(_ context.Context,ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow,error) {
1152+
// provisionerTagsetContains checks if daemonTags contain all key-value pairs from jobTags
1153+
funcprovisionerTagsetContains(daemonTags,jobTagsmap[string]string)bool {
1154+
forjobKey,jobValue:=rangejobTags {
1155+
ifdaemonValue,exists:=daemonTags[jobKey];!exists||daemonValue!=jobValue {
1156+
returnfalse
1157+
}
1158+
}
1159+
returntrue
1160+
}
1161+
1162+
// GetProvisionerJobsByIDsWithQueuePosition mimics the SQL logic in pure Go
1163+
func (q*FakeQuerier)getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(_ context.Context,jobIDs []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow,error) {
1164+
// Step 1: Filter provisionerJobs based on jobIDs
1165+
filteredJobs:=make(map[uuid.UUID]database.ProvisionerJob)
1166+
for_,job:=rangeq.provisionerJobs {
1167+
for_,id:=rangejobIDs {
1168+
ifjob.ID==id {
1169+
filteredJobs[job.ID]=job
1170+
}
1171+
}
1172+
}
1173+
1174+
// Step 2: Identify pending jobs
1175+
pendingJobs:=make(map[uuid.UUID]database.ProvisionerJob)
1176+
for_,job:=rangeq.provisionerJobs {
1177+
ifjob.JobStatus=="pending" {
1178+
pendingJobs[job.ID]=job
1179+
}
1180+
}
1181+
1182+
// Step 3: Identify pending jobs that have a matching provisioner
1183+
matchedJobs:=make(map[uuid.UUID]struct{})
1184+
for_,job:=rangependingJobs {
1185+
for_,daemon:=rangeq.provisionerDaemons {
1186+
ifprovisionerTagsetContains(daemon.Tags,job.Tags) {
1187+
matchedJobs[job.ID]=struct{}{}
1188+
break
1189+
}
1190+
}
1191+
}
1192+
1193+
// Step 4: Rank pending jobs per provisioner
1194+
jobRanks:=make(map[uuid.UUID][]database.ProvisionerJob)
1195+
for_,job:=rangependingJobs {
1196+
for_,daemon:=rangeq.provisionerDaemons {
1197+
ifprovisionerTagsetContains(daemon.Tags,job.Tags) {
1198+
jobRanks[daemon.ID]=append(jobRanks[daemon.ID],job)
1199+
}
1200+
}
1201+
}
1202+
1203+
// Sort jobs per provisioner by CreatedAt
1204+
fordaemonID:=rangejobRanks {
1205+
sort.Slice(jobRanks[daemonID],func(i,jint)bool {
1206+
returnjobRanks[daemonID][i].CreatedAt.Before(jobRanks[daemonID][j].CreatedAt)
1207+
})
1208+
}
1209+
1210+
// Step 5: Compute queue position & max queue size across all provisioners
1211+
jobQueueStats:=make(map[uuid.UUID]database.GetProvisionerJobsByIDsWithQueuePositionRow)
1212+
for_,jobs:=rangejobRanks {
1213+
queueSize:=int64(len(jobs))// Queue size per provisioner
1214+
fori,job:=rangejobs {
1215+
queuePosition:=int64(i+1)
1216+
1217+
// If the job already exists, update only if this queuePosition is better
1218+
ifexisting,exists:=jobQueueStats[job.ID];exists {
1219+
jobQueueStats[job.ID]= database.GetProvisionerJobsByIDsWithQueuePositionRow{
1220+
ID:job.ID,
1221+
CreatedAt:job.CreatedAt,
1222+
ProvisionerJob:job,
1223+
QueuePosition:min(existing.QueuePosition,queuePosition),
1224+
QueueSize:max(existing.QueueSize,queueSize),// Take the maximum queue size across provisioners
1225+
}
1226+
}else {
1227+
jobQueueStats[job.ID]= database.GetProvisionerJobsByIDsWithQueuePositionRow{
1228+
ID:job.ID,
1229+
CreatedAt:job.CreatedAt,
1230+
ProvisionerJob:job,
1231+
QueuePosition:queuePosition,
1232+
QueueSize:queueSize,
1233+
}
1234+
}
1235+
}
1236+
}
1237+
1238+
// Step 6: Compute the final results with minimal checks
1239+
varresults []database.GetProvisionerJobsByIDsWithQueuePositionRow
1240+
for_,job:=rangefilteredJobs {
1241+
// If the job has a computed rank, use it
1242+
ifrank,found:=jobQueueStats[job.ID];found {
1243+
results=append(results,rank)
1244+
}else {
1245+
// Otherwise, return (0,0) for non-pending jobs and unranked pending jobs
1246+
results=append(results, database.GetProvisionerJobsByIDsWithQueuePositionRow{
1247+
ID:job.ID,
1248+
CreatedAt:job.CreatedAt,
1249+
ProvisionerJob:job,
1250+
QueuePosition:0,
1251+
QueueSize:0,
1252+
})
1253+
}
1254+
}
1255+
1256+
// Step 7: Sort results by CreatedAt
1257+
sort.Slice(results,func(i,jint)bool {
1258+
returnresults[i].CreatedAt.Before(results[j].CreatedAt)
1259+
})
1260+
1261+
returnresults,nil
1262+
}
1263+
1264+
func (q*FakeQuerier)getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue(_ context.Context,ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow,error) {
11531265
//WITH pending_jobs AS (
11541266
//SELECT
11551267
//id, created_at
@@ -4237,7 +4349,7 @@ func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Conte
42374349
ifids==nil {
42384350
ids= []uuid.UUID{}
42394351
}
4240-
returnq.getProvisionerJobsByIDsWithQueuePositionLocked(ctx,ids)
4352+
returnq.getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(ctx,ids)
42414353
}
42424354

42434355
func (q*FakeQuerier)GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx context.Context,arg database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams) ([]database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow,error) {
@@ -4306,7 +4418,7 @@ func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePosition
43064418
LIMIT
43074419
sqlc.narg('limit')::int;
43084420
*/
4309-
rowsWithQueuePosition,err:=q.getProvisionerJobsByIDsWithQueuePositionLocked(ctx,nil)
4421+
rowsWithQueuePosition,err:=q.getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue(ctx,nil)
43104422
iferr!=nil {
43114423
returnnil,err
43124424
}

‎coderd/database/dump.sql

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROPINDEX idx_provisioner_jobs_status;
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
CREATEINDEXidx_provisioner_jobs_statusON provisioner_jobs USING btree (job_status);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp