@@ -1149,7 +1149,119 @@ func getOwnerFromTags(tags map[string]string) string {
11491149return ""
11501150}
11511151
1152- func (q * FakeQuerier )getProvisionerJobsByIDsWithQueuePositionLocked (_ context.Context ,ids []uuid.UUID ) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow ,error ) {
1152+ // provisionerTagsetContains checks if daemonTags contain all key-value pairs from jobTags
1153+ func provisionerTagsetContains (daemonTags ,jobTags map [string ]string )bool {
1154+ for jobKey ,jobValue := range jobTags {
1155+ if daemonValue ,exists := daemonTags [jobKey ];! exists || daemonValue != jobValue {
1156+ return false
1157+ }
1158+ }
1159+ return true
1160+ }
1161+
1162+ // GetProvisionerJobsByIDsWithQueuePosition mimics the SQL logic in pure Go
1163+ func (q * FakeQuerier )getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue (_ context.Context ,jobIDs []uuid.UUID ) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow ,error ) {
1164+ // Step 1: Filter provisionerJobs based on jobIDs
1165+ filteredJobs := make (map [uuid.UUID ]database.ProvisionerJob )
1166+ for _ ,job := range q .provisionerJobs {
1167+ for _ ,id := range jobIDs {
1168+ if job .ID == id {
1169+ filteredJobs [job .ID ]= job
1170+ }
1171+ }
1172+ }
1173+
1174+ // Step 2: Identify pending jobs
1175+ pendingJobs := make (map [uuid.UUID ]database.ProvisionerJob )
1176+ for _ ,job := range q .provisionerJobs {
1177+ if job .JobStatus == "pending" {
1178+ pendingJobs [job .ID ]= job
1179+ }
1180+ }
1181+
1182+ // Step 3: Identify pending jobs that have a matching provisioner
1183+ matchedJobs := make (map [uuid.UUID ]struct {})
1184+ for _ ,job := range pendingJobs {
1185+ for _ ,daemon := range q .provisionerDaemons {
1186+ if provisionerTagsetContains (daemon .Tags ,job .Tags ) {
1187+ matchedJobs [job .ID ]= struct {}{}
1188+ break
1189+ }
1190+ }
1191+ }
1192+
1193+ // Step 4: Rank pending jobs per provisioner
1194+ jobRanks := make (map [uuid.UUID ][]database.ProvisionerJob )
1195+ for _ ,job := range pendingJobs {
1196+ for _ ,daemon := range q .provisionerDaemons {
1197+ if provisionerTagsetContains (daemon .Tags ,job .Tags ) {
1198+ jobRanks [daemon .ID ]= append (jobRanks [daemon .ID ],job )
1199+ }
1200+ }
1201+ }
1202+
1203+ // Sort jobs per provisioner by CreatedAt
1204+ for daemonID := range jobRanks {
1205+ sort .Slice (jobRanks [daemonID ],func (i ,j int )bool {
1206+ return jobRanks [daemonID ][i ].CreatedAt .Before (jobRanks [daemonID ][j ].CreatedAt )
1207+ })
1208+ }
1209+
1210+ // Step 5: Compute queue position & max queue size across all provisioners
1211+ jobQueueStats := make (map [uuid.UUID ]database.GetProvisionerJobsByIDsWithQueuePositionRow )
1212+ for _ ,jobs := range jobRanks {
1213+ queueSize := int64 (len (jobs ))// Queue size per provisioner
1214+ for i ,job := range jobs {
1215+ queuePosition := int64 (i + 1 )
1216+
1217+ // If the job already exists, update only if this queuePosition is better
1218+ if existing ,exists := jobQueueStats [job .ID ];exists {
1219+ jobQueueStats [job .ID ]= database.GetProvisionerJobsByIDsWithQueuePositionRow {
1220+ ID :job .ID ,
1221+ CreatedAt :job .CreatedAt ,
1222+ ProvisionerJob :job ,
1223+ QueuePosition :min (existing .QueuePosition ,queuePosition ),
1224+ QueueSize :max (existing .QueueSize ,queueSize ),// Take the maximum queue size across provisioners
1225+ }
1226+ }else {
1227+ jobQueueStats [job .ID ]= database.GetProvisionerJobsByIDsWithQueuePositionRow {
1228+ ID :job .ID ,
1229+ CreatedAt :job .CreatedAt ,
1230+ ProvisionerJob :job ,
1231+ QueuePosition :queuePosition ,
1232+ QueueSize :queueSize ,
1233+ }
1234+ }
1235+ }
1236+ }
1237+
1238+ // Step 6: Compute the final results with minimal checks
1239+ var results []database.GetProvisionerJobsByIDsWithQueuePositionRow
1240+ for _ ,job := range filteredJobs {
1241+ // If the job has a computed rank, use it
1242+ if rank ,found := jobQueueStats [job .ID ];found {
1243+ results = append (results ,rank )
1244+ }else {
1245+ // Otherwise, return (0,0) for non-pending jobs and unranked pending jobs
1246+ results = append (results , database.GetProvisionerJobsByIDsWithQueuePositionRow {
1247+ ID :job .ID ,
1248+ CreatedAt :job .CreatedAt ,
1249+ ProvisionerJob :job ,
1250+ QueuePosition :0 ,
1251+ QueueSize :0 ,
1252+ })
1253+ }
1254+ }
1255+
1256+ // Step 7: Sort results by CreatedAt
1257+ sort .Slice (results ,func (i ,j int )bool {
1258+ return results [i ].CreatedAt .Before (results [j ].CreatedAt )
1259+ })
1260+
1261+ return results ,nil
1262+ }
1263+
1264+ func (q * FakeQuerier )getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue (_ context.Context ,ids []uuid.UUID ) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow ,error ) {
11531265//WITH pending_jobs AS (
11541266//SELECT
11551267//id, created_at
@@ -4237,7 +4349,7 @@ func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Conte
42374349if ids == nil {
42384350ids = []uuid.UUID {}
42394351}
4240- return q .getProvisionerJobsByIDsWithQueuePositionLocked (ctx ,ids )
4352+ return q .getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue (ctx ,ids )
42414353}
42424354
42434355func (q * FakeQuerier )GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner (ctx context.Context ,arg database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams ) ([]database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow ,error ) {
@@ -4306,7 +4418,7 @@ func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePosition
43064418LIMIT
43074419sqlc.narg('limit')::int;
43084420*/
4309- rowsWithQueuePosition ,err := q .getProvisionerJobsByIDsWithQueuePositionLocked (ctx ,nil )
4421+ rowsWithQueuePosition ,err := q .getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue (ctx ,nil )
43104422if err != nil {
43114423return nil ,err
43124424}