@@ -40,10 +40,11 @@ type StoreReconciler struct {
4040registerer prometheus.Registerer
4141metrics * MetricsCollector
4242
43- cancelFn context.CancelCauseFunc
44- running atomic.Bool
45- stopped atomic.Bool
46- done chan struct {}
43+ cancelFn context.CancelCauseFunc
44+ running atomic.Bool
45+ stopped atomic.Bool
46+ done chan struct {}
47+ provisionNotifyCh chan * database.ProvisionerJob
4748}
4849
4950var _ prebuilds.ReconciliationOrchestrator = & StoreReconciler {}
@@ -56,13 +57,14 @@ func NewStoreReconciler(store database.Store,
5657registerer prometheus.Registerer ,
5758)* StoreReconciler {
5859reconciler := & StoreReconciler {
59- store :store ,
60- pubsub :ps ,
61- logger :logger ,
62- cfg :cfg ,
63- clock :clock ,
64- registerer :registerer ,
65- done :make (chan struct {},1 ),
60+ store :store ,
61+ pubsub :ps ,
62+ logger :logger ,
63+ cfg :cfg ,
64+ clock :clock ,
65+ registerer :registerer ,
66+ done :make (chan struct {},1 ),
67+ provisionNotifyCh :make (chan * database.ProvisionerJob ,100 ),
6668}
6769
6870reconciler .metrics = NewMetricsCollector (store ,logger ,reconciler )
@@ -100,6 +102,29 @@ func (c *StoreReconciler) Run(ctx context.Context) {
100102// NOTE: without this atomic bool, Stop might race with Run for the c.cancelFn above.
101103c .running .Store (true )
102104
105+ // Publish provisioning jobs outside of database transactions.
106+ // PGPubsub tries to acquire a new connection on Publish. A connection is held while a database transaction is active,
107+ // so we can exhaust available connections.
108+ go func () {
109+ for {
110+ select {
111+ case <- c .done :
112+ return
113+ case <- ctx .Done ():
114+ return
115+ case job := <- c .provisionNotifyCh :
116+ if job == nil {
117+ continue
118+ }
119+
120+ err := provisionerjobs .PostJob (c .pubsub ,* job )
121+ if err != nil {
122+ c .logger .Error (ctx ,"failed to post provisioner job to pubsub" ,slog .Error (err ))
123+ }
124+ }
125+ }
126+ }()
127+
103128for {
104129select {
105130// TODO: implement pubsub listener to allow reconciling a specific template imperatively once it has been changed,
@@ -571,10 +596,12 @@ func (c *StoreReconciler) provision(
571596return xerrors .Errorf ("provision workspace: %w" ,err )
572597}
573598
574- err = provisionerjobs .PostJob (c .pubsub ,* provisionerJob )
575- if err != nil {
576- // Client probably doesn't care about this error, so just log it.
577- c .logger .Error (ctx ,"failed to post provisioner job to pubsub" ,slog .Error (err ))
599+ // Publish provisioner job event outside of transaction.
600+ select {
601+ case c .provisionNotifyCh <- provisionerJob :
602+ default :// channel full, drop the message; provisioner will pick this job up later with its periodic check, though.
603+ c .logger .Warn (ctx ,"provisioner job notification queue full, dropping" ,
604+ slog .F ("job_id" ,provisionerJob .ID ),slog .F ("prebuild_id" ,prebuildID .String ()))
578605}
579606
580607c .logger .Info (ctx ,"prebuild job scheduled" ,slog .F ("transition" ,transition ),