@@ -1147,7 +1147,119 @@ func getOwnerFromTags(tags map[string]string) string {
1147
1147
return ""
1148
1148
}
1149
1149
1150
- func (q * FakeQuerier )getProvisionerJobsByIDsWithQueuePositionLocked (_ context.Context ,ids []uuid.UUID ) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow ,error ) {
1150
+ // provisionerTagsetContains checks if daemonTags contain all key-value pairs from jobTags
1151
+ func provisionerTagsetContains (daemonTags ,jobTags map [string ]string )bool {
1152
+ for jobKey ,jobValue := range jobTags {
1153
+ if daemonValue ,exists := daemonTags [jobKey ];! exists || daemonValue != jobValue {
1154
+ return false
1155
+ }
1156
+ }
1157
+ return true
1158
+ }
1159
+
1160
+ // GetProvisionerJobsByIDsWithQueuePosition mimics the SQL logic in pure Go
1161
+ func (q * FakeQuerier )getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue (_ context.Context ,jobIDs []uuid.UUID ) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow ,error ) {
1162
+ // Step 1: Filter provisionerJobs based on jobIDs
1163
+ filteredJobs := make (map [uuid.UUID ]database.ProvisionerJob )
1164
+ for _ ,job := range q .provisionerJobs {
1165
+ for _ ,id := range jobIDs {
1166
+ if job .ID == id {
1167
+ filteredJobs [job .ID ]= job
1168
+ }
1169
+ }
1170
+ }
1171
+
1172
+ // Step 2: Identify pending jobs
1173
+ pendingJobs := make (map [uuid.UUID ]database.ProvisionerJob )
1174
+ for _ ,job := range q .provisionerJobs {
1175
+ if job .JobStatus == "pending" {
1176
+ pendingJobs [job .ID ]= job
1177
+ }
1178
+ }
1179
+
1180
+ // Step 3: Identify pending jobs that have a matching provisioner
1181
+ matchedJobs := make (map [uuid.UUID ]struct {})
1182
+ for _ ,job := range pendingJobs {
1183
+ for _ ,daemon := range q .provisionerDaemons {
1184
+ if provisionerTagsetContains (daemon .Tags ,job .Tags ) {
1185
+ matchedJobs [job .ID ]= struct {}{}
1186
+ break
1187
+ }
1188
+ }
1189
+ }
1190
+
1191
+ // Step 4: Rank pending jobs per provisioner
1192
+ jobRanks := make (map [uuid.UUID ][]database.ProvisionerJob )
1193
+ for _ ,job := range pendingJobs {
1194
+ for _ ,daemon := range q .provisionerDaemons {
1195
+ if provisionerTagsetContains (daemon .Tags ,job .Tags ) {
1196
+ jobRanks [daemon .ID ]= append (jobRanks [daemon .ID ],job )
1197
+ }
1198
+ }
1199
+ }
1200
+
1201
+ // Sort jobs per provisioner by CreatedAt
1202
+ for daemonID := range jobRanks {
1203
+ sort .Slice (jobRanks [daemonID ],func (i ,j int )bool {
1204
+ return jobRanks [daemonID ][i ].CreatedAt .Before (jobRanks [daemonID ][j ].CreatedAt )
1205
+ })
1206
+ }
1207
+
1208
+ // Step 5: Compute queue position & max queue size across all provisioners
1209
+ jobQueueStats := make (map [uuid.UUID ]database.GetProvisionerJobsByIDsWithQueuePositionRow )
1210
+ for _ ,jobs := range jobRanks {
1211
+ queueSize := int64 (len (jobs ))// Queue size per provisioner
1212
+ for i ,job := range jobs {
1213
+ queuePosition := int64 (i + 1 )
1214
+
1215
+ // If the job already exists, update only if this queuePosition is better
1216
+ if existing ,exists := jobQueueStats [job .ID ];exists {
1217
+ jobQueueStats [job .ID ]= database.GetProvisionerJobsByIDsWithQueuePositionRow {
1218
+ ID :job .ID ,
1219
+ CreatedAt :job .CreatedAt ,
1220
+ ProvisionerJob :job ,
1221
+ QueuePosition :min (existing .QueuePosition ,queuePosition ),
1222
+ QueueSize :max (existing .QueueSize ,queueSize ),// Take the maximum queue size across provisioners
1223
+ }
1224
+ }else {
1225
+ jobQueueStats [job .ID ]= database.GetProvisionerJobsByIDsWithQueuePositionRow {
1226
+ ID :job .ID ,
1227
+ CreatedAt :job .CreatedAt ,
1228
+ ProvisionerJob :job ,
1229
+ QueuePosition :queuePosition ,
1230
+ QueueSize :queueSize ,
1231
+ }
1232
+ }
1233
+ }
1234
+ }
1235
+
1236
+ // Step 6: Compute the final results with minimal checks
1237
+ var results []database.GetProvisionerJobsByIDsWithQueuePositionRow
1238
+ for _ ,job := range filteredJobs {
1239
+ // If the job has a computed rank, use it
1240
+ if rank ,found := jobQueueStats [job .ID ];found {
1241
+ results = append (results ,rank )
1242
+ }else {
1243
+ // Otherwise, return (0,0) for non-pending jobs and unranked pending jobs
1244
+ results = append (results , database.GetProvisionerJobsByIDsWithQueuePositionRow {
1245
+ ID :job .ID ,
1246
+ CreatedAt :job .CreatedAt ,
1247
+ ProvisionerJob :job ,
1248
+ QueuePosition :0 ,
1249
+ QueueSize :0 ,
1250
+ })
1251
+ }
1252
+ }
1253
+
1254
+ // Step 7: Sort results by CreatedAt
1255
+ sort .Slice (results ,func (i ,j int )bool {
1256
+ return results [i ].CreatedAt .Before (results [j ].CreatedAt )
1257
+ })
1258
+
1259
+ return results ,nil
1260
+ }
1261
+
1262
+ func (q * FakeQuerier )getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue (_ context.Context ,ids []uuid.UUID ) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow ,error ) {
1151
1263
//WITH pending_jobs AS (
1152
1264
//SELECT
1153
1265
//id, created_at
@@ -4149,7 +4261,7 @@ func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Conte
4149
4261
if ids == nil {
4150
4262
ids = []uuid.UUID {}
4151
4263
}
4152
- return q .getProvisionerJobsByIDsWithQueuePositionLocked (ctx ,ids )
4264
+ return q .getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue (ctx ,ids )
4153
4265
}
4154
4266
4155
4267
func (q * FakeQuerier )GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner (ctx context.Context ,arg database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams ) ([]database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow ,error ) {
@@ -4218,7 +4330,7 @@ func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePosition
4218
4330
LIMIT
4219
4331
sqlc.narg('limit')::int;
4220
4332
*/
4221
- rowsWithQueuePosition ,err := q .getProvisionerJobsByIDsWithQueuePositionLocked (ctx ,nil )
4333
+ rowsWithQueuePosition ,err := q .getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue (ctx ,nil )
4222
4334
if err != nil {
4223
4335
return nil ,err
4224
4336
}