6
6
"encoding/json"
7
7
"errors"
8
8
"fmt"
9
- "math"
10
9
"strings"
11
10
"sync"
12
11
"sync/atomic"
@@ -300,6 +299,16 @@ func (c *StoreReconciler) ReconcileAll(ctx context.Context) error {
300
299
return xerrors .Errorf ("reconcile prebuild membership: %w" ,err )
301
300
}
302
301
302
+ // If the number of pending prebuilds is greater than the number allowed, skip reconciliation
303
+ sched ,err := NewJobsScheduler (c .logger ,snapshot .TotalPendingCount (),c .executeReconciliationAction )
304
+ if err != nil {
305
+ logger .Info (ctx ,"number of pending prebuilds is greater than the number allowed, skipping reconciliation" )
306
+ return nil
307
+ }
308
+
309
+ // Collect preset actions
310
+ resultsCh := make (chan PresetActions ,len (snapshot .Presets ))
311
+
303
312
var eg errgroup.Group
304
313
// Reconcile presets in parallel. Each preset in its own goroutine.
305
314
for _ ,preset := range snapshot .Presets {
@@ -311,7 +320,7 @@ func (c *StoreReconciler) ReconcileAll(ctx context.Context) error {
311
320
312
321
eg .Go (func ()error {
313
322
// Pass outer context.
314
- err = c .ReconcilePreset (ctx ,* ps )
323
+ actions , err : =c .ReconcilePreset (ctx ,* ps )
315
324
if err != nil {
316
325
logger .Error (
317
326
ctx ,
@@ -321,12 +330,31 @@ func (c *StoreReconciler) ReconcileAll(ctx context.Context) error {
321
330
)
322
331
}
323
332
// DO NOT return error otherwise the tx will end.
324
- return nil
333
+ select {
334
+ case <- ctx .Done ():
335
+ return nil
336
+ case resultsCh <- PresetActions {Preset :* ps ,Actions :actions }:
337
+ return nil
338
+ }
325
339
})
326
340
}
327
341
328
- // Release lock only when all preset reconciliation goroutines are finished.
329
- return eg .Wait ()
342
+ // Wait for all goroutines to finish
343
+ _ = eg .Wait ()
344
+ close (resultsCh )
345
+
346
+ results := make ([]PresetActions ,0 ,len (snapshot .Presets ))
347
+ for r := range resultsCh {
348
+ results = append (results ,r )
349
+ }
350
+
351
+ // Scheduler executes reconciliation actions
352
+ if err := sched .Run (ctx ,results );err != nil {
353
+ logger .Error (ctx ,"scheduler returned errors" ,slog .Error (err ))
354
+ return err
355
+ }
356
+
357
+ return nil
330
358
})
331
359
if err != nil {
332
360
logger .Error (ctx ,"failed to reconcile" ,slog .Error (err ))
@@ -442,7 +470,7 @@ func (c *StoreReconciler) SnapshotState(ctx context.Context, store database.Stor
442
470
return & state ,err
443
471
}
444
472
445
- func (c * StoreReconciler )ReconcilePreset (ctx context.Context ,ps prebuilds.PresetSnapshot )error {
473
+ func (c * StoreReconciler )ReconcilePreset (ctx context.Context ,ps prebuilds.PresetSnapshot )([] * prebuilds. ReconciliationActions , error ) {
446
474
logger := c .logger .With (
447
475
slog .F ("template_id" ,ps .Preset .TemplateID .String ()),
448
476
slog .F ("template_name" ,ps .Preset .TemplateName ),
@@ -463,36 +491,37 @@ func (c *StoreReconciler) ReconcilePreset(ctx context.Context, ps prebuilds.Pres
463
491
PresetID :ps .Preset .ID ,
464
492
})
465
493
if err != nil {
466
- return xerrors .Errorf ("failed to update preset prebuild status: %w" ,err )
494
+ return nil , xerrors .Errorf ("failed to update preset prebuild status: %w" ,err )
467
495
}
468
496
}
469
497
470
498
state := ps .CalculateState ()
499
+ logger .Debug (ctx ,"calculated reconciliation state for preset" ,
500
+ slog .F ("desired" ,state .Desired ),
501
+ slog .F ("actual" ,state .Actual ),
502
+ slog .F ("extraneous" ,state .Extraneous ),
503
+ slog .F ("starting" ,state .Starting ),
504
+ slog .F ("stopping" ,state .Stopping ),
505
+ slog .F ("deleting" ,state .Deleting ),
506
+ slog .F ("eligible" ,state .Eligible ))
507
+
471
508
actions ,err := c .CalculateActions (ctx ,ps )
472
509
if err != nil {
473
510
logger .Error (ctx ,"failed to calculate actions for preset" ,slog .Error (err ))
474
- return err
475
- }
476
-
477
- fields := []any {
478
- slog .F ("desired" ,state .Desired ),slog .F ("actual" ,state .Actual ),
479
- slog .F ("extraneous" ,state .Extraneous ),slog .F ("starting" ,state .Starting ),
480
- slog .F ("stopping" ,state .Stopping ),slog .F ("deleting" ,state .Deleting ),
481
- slog .F ("eligible" ,state .Eligible ),
511
+ return nil ,err
482
512
}
483
513
484
- levelFn := logger .Debug
485
- levelFn (ctx ,"calculated reconciliation state for preset" ,fields ... )
514
+ return actions ,nil
486
515
487
- var multiErr multierror.Error
488
- for _ ,action := range actions {
489
- err = c .executeReconciliationAction (ctx ,logger ,ps ,action )
490
- if err != nil {
491
- logger .Error (ctx ,"failed to execute action" ,"type" ,action .ActionType ,slog .Error (err ))
492
- multiErr .Errors = append (multiErr .Errors ,err )
493
- }
494
- }
495
- return multiErr .ErrorOrNil ()
516
+ // var multiErr multierror.Error
517
+ // for _, action := range actions {
518
+ // err = c.executeReconciliationAction(ctx, logger, ps, action)
519
+ // if err != nil {
520
+ // logger.Error(ctx, "failed to execute action", "type", action.ActionType, slog.Error(err))
521
+ // multiErr.Errors = append(multiErr.Errors, err)
522
+ // }
523
+ // }
524
+ // return multiErr.ErrorOrNil()
496
525
}
497
526
498
527
func (c * StoreReconciler )CalculateActions (ctx context.Context ,snapshot prebuilds.PresetSnapshot ) ([]* prebuilds.ReconciliationActions ,error ) {
@@ -574,8 +603,6 @@ func (c *StoreReconciler) executeReconciliationAction(ctx context.Context, logge
574
603
levelFn (ctx ,"calculated reconciliation action for preset" ,fields ... )
575
604
576
605
switch {
577
- case action .ActionType == prebuilds .ActionTypeBackoff :
578
- levelFn = logger .Warn
579
606
// Log at info level when there's a change to be effected.
580
607
case action .ActionType == prebuilds .ActionTypeCreate && action .Create > 0 :
581
608
levelFn = logger .Info
@@ -584,16 +611,6 @@ func (c *StoreReconciler) executeReconciliationAction(ctx context.Context, logge
584
611
}
585
612
586
613
switch action .ActionType {
587
- case prebuilds .ActionTypeBackoff :
588
- // If there is anything to backoff for (usually a cycle of failed prebuilds), then log and bail out.
589
- levelFn (ctx ,"template prebuild state retrieved, backing off" ,
590
- append (fields ,
591
- slog .F ("backoff_until" ,action .BackoffUntil .Format (time .RFC3339 )),
592
- slog .F ("backoff_secs" ,math .Round (action .BackoffUntil .Sub (c .clock .Now ()).Seconds ())),
593
- )... )
594
-
595
- return nil
596
-
597
614
case prebuilds .ActionTypeCreate :
598
615
// Unexpected things happen (i.e. bugs or bitflips); let's defend against disastrous outcomes.
599
616
// See https://blog.robertelder.org/causes-of-bit-flips-in-computer-memory/.
@@ -662,7 +679,7 @@ func (c *StoreReconciler) createPrebuiltWorkspace(ctx context.Context, prebuiltW
662
679
OrganizationID :template .OrganizationID ,
663
680
TemplateID :template .ID ,
664
681
Name :name ,
665
- LastUsedAt :c . clock . Now () ,
682
+ LastUsedAt :now ,
666
683
AutomaticUpdates :database .AutomaticUpdatesNever ,
667
684
AutostartSchedule : sql.NullString {},
668
685
Ttl : sql.NullInt64 {},