@@ -1149,7 +1149,119 @@ func getOwnerFromTags(tags map[string]string) string {
1149
1149
return ""
1150
1150
}
1151
1151
1152
- func (q * FakeQuerier )getProvisionerJobsByIDsWithQueuePositionLocked (_ context.Context ,ids []uuid.UUID ) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow ,error ) {
1152
+ // provisionerTagsetContains checks if daemonTags contain all key-value pairs from jobTags
1153
+ func provisionerTagsetContains (daemonTags ,jobTags map [string ]string )bool {
1154
+ for jobKey ,jobValue := range jobTags {
1155
+ if daemonValue ,exists := daemonTags [jobKey ];! exists || daemonValue != jobValue {
1156
+ return false
1157
+ }
1158
+ }
1159
+ return true
1160
+ }
1161
+
1162
+ // GetProvisionerJobsByIDsWithQueuePosition mimics the SQL logic in pure Go
1163
+ func (q * FakeQuerier )getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue (_ context.Context ,jobIDs []uuid.UUID ) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow ,error ) {
1164
+ // Step 1: Filter provisionerJobs based on jobIDs
1165
+ filteredJobs := make (map [uuid.UUID ]database.ProvisionerJob )
1166
+ for _ ,job := range q .provisionerJobs {
1167
+ for _ ,id := range jobIDs {
1168
+ if job .ID == id {
1169
+ filteredJobs [job .ID ]= job
1170
+ }
1171
+ }
1172
+ }
1173
+
1174
+ // Step 2: Identify pending jobs
1175
+ pendingJobs := make (map [uuid.UUID ]database.ProvisionerJob )
1176
+ for _ ,job := range q .provisionerJobs {
1177
+ if job .JobStatus == "pending" {
1178
+ pendingJobs [job .ID ]= job
1179
+ }
1180
+ }
1181
+
1182
+ // Step 3: Identify pending jobs that have a matching provisioner
1183
+ matchedJobs := make (map [uuid.UUID ]struct {})
1184
+ for _ ,job := range pendingJobs {
1185
+ for _ ,daemon := range q .provisionerDaemons {
1186
+ if provisionerTagsetContains (daemon .Tags ,job .Tags ) {
1187
+ matchedJobs [job .ID ]= struct {}{}
1188
+ break
1189
+ }
1190
+ }
1191
+ }
1192
+
1193
+ // Step 4: Rank pending jobs per provisioner
1194
+ jobRanks := make (map [uuid.UUID ][]database.ProvisionerJob )
1195
+ for _ ,job := range pendingJobs {
1196
+ for _ ,daemon := range q .provisionerDaemons {
1197
+ if provisionerTagsetContains (daemon .Tags ,job .Tags ) {
1198
+ jobRanks [daemon .ID ]= append (jobRanks [daemon .ID ],job )
1199
+ }
1200
+ }
1201
+ }
1202
+
1203
+ // Sort jobs per provisioner by CreatedAt
1204
+ for daemonID := range jobRanks {
1205
+ sort .Slice (jobRanks [daemonID ],func (i ,j int )bool {
1206
+ return jobRanks [daemonID ][i ].CreatedAt .Before (jobRanks [daemonID ][j ].CreatedAt )
1207
+ })
1208
+ }
1209
+
1210
+ // Step 5: Compute queue position & max queue size across all provisioners
1211
+ jobQueueStats := make (map [uuid.UUID ]database.GetProvisionerJobsByIDsWithQueuePositionRow )
1212
+ for _ ,jobs := range jobRanks {
1213
+ queueSize := int64 (len (jobs ))// Queue size per provisioner
1214
+ for i ,job := range jobs {
1215
+ queuePosition := int64 (i + 1 )
1216
+
1217
+ // If the job already exists, update only if this queuePosition is better
1218
+ if existing ,exists := jobQueueStats [job .ID ];exists {
1219
+ jobQueueStats [job .ID ]= database.GetProvisionerJobsByIDsWithQueuePositionRow {
1220
+ ID :job .ID ,
1221
+ CreatedAt :job .CreatedAt ,
1222
+ ProvisionerJob :job ,
1223
+ QueuePosition :min (existing .QueuePosition ,queuePosition ),
1224
+ QueueSize :max (existing .QueueSize ,queueSize ),// Take the maximum queue size across provisioners
1225
+ }
1226
+ }else {
1227
+ jobQueueStats [job .ID ]= database.GetProvisionerJobsByIDsWithQueuePositionRow {
1228
+ ID :job .ID ,
1229
+ CreatedAt :job .CreatedAt ,
1230
+ ProvisionerJob :job ,
1231
+ QueuePosition :queuePosition ,
1232
+ QueueSize :queueSize ,
1233
+ }
1234
+ }
1235
+ }
1236
+ }
1237
+
1238
+ // Step 6: Compute the final results with minimal checks
1239
+ var results []database.GetProvisionerJobsByIDsWithQueuePositionRow
1240
+ for _ ,job := range filteredJobs {
1241
+ // If the job has a computed rank, use it
1242
+ if rank ,found := jobQueueStats [job .ID ];found {
1243
+ results = append (results ,rank )
1244
+ }else {
1245
+ // Otherwise, return (0,0) for non-pending jobs and unranked pending jobs
1246
+ results = append (results , database.GetProvisionerJobsByIDsWithQueuePositionRow {
1247
+ ID :job .ID ,
1248
+ CreatedAt :job .CreatedAt ,
1249
+ ProvisionerJob :job ,
1250
+ QueuePosition :0 ,
1251
+ QueueSize :0 ,
1252
+ })
1253
+ }
1254
+ }
1255
+
1256
+ // Step 7: Sort results by CreatedAt
1257
+ sort .Slice (results ,func (i ,j int )bool {
1258
+ return results [i ].CreatedAt .Before (results [j ].CreatedAt )
1259
+ })
1260
+
1261
+ return results ,nil
1262
+ }
1263
+
1264
+ func (q * FakeQuerier )getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue (_ context.Context ,ids []uuid.UUID ) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow ,error ) {
1153
1265
//WITH pending_jobs AS (
1154
1266
//SELECT
1155
1267
//id, created_at
@@ -4237,7 +4349,7 @@ func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Conte
4237
4349
if ids == nil {
4238
4350
ids = []uuid.UUID {}
4239
4351
}
4240
- return q .getProvisionerJobsByIDsWithQueuePositionLocked (ctx ,ids )
4352
+ return q .getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue (ctx ,ids )
4241
4353
}
4242
4354
4243
4355
func (q * FakeQuerier )GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner (ctx context.Context ,arg database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams ) ([]database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow ,error ) {
@@ -4306,7 +4418,7 @@ func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePosition
4306
4418
LIMIT
4307
4419
sqlc.narg('limit')::int;
4308
4420
*/
4309
- rowsWithQueuePosition ,err := q .getProvisionerJobsByIDsWithQueuePositionLocked (ctx ,nil )
4421
+ rowsWithQueuePosition ,err := q .getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue (ctx ,nil )
4310
4422
if err != nil {
4311
4423
return nil ,err
4312
4424
}