Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita646478

Browse files
authored
fix: move pubsub publishing out of database transactions to avoid conn exhaustion (#17648)
Database transactions hold onto connections, and `pubsub.Publish` triesto acquire a connection of its own. If the latter is called within atransaction, this can lead to connection exhaustion.I plan two follow-ups to this PR:1. Make connection counts tuneablehttps://github.com/coder/coder/blob/main/cli/server.go#L2360-L2376We will then be able to write tests showing how connection exhaustionoccurs.2. Write a linter/ruleguard to prevent `pubsub.Publish` from beingcalled within a transaction.---------Signed-off-by: Danny Kopping <dannykopping@gmail.com>
1 parent82fdb6a commita646478

File tree

2 files changed

+156
-103
lines changed

2 files changed

+156
-103
lines changed

‎enterprise/coderd/prebuilds/reconcile.go

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,11 @@ type StoreReconciler struct {
4040
registerer prometheus.Registerer
4141
metrics*MetricsCollector
4242

43-
cancelFn context.CancelCauseFunc
44-
running atomic.Bool
45-
stopped atomic.Bool
46-
donechanstruct{}
43+
cancelFn context.CancelCauseFunc
44+
running atomic.Bool
45+
stopped atomic.Bool
46+
donechanstruct{}
47+
provisionNotifyChchan database.ProvisionerJob
4748
}
4849

4950
var_ prebuilds.ReconciliationOrchestrator=&StoreReconciler{}
@@ -56,13 +57,14 @@ func NewStoreReconciler(store database.Store,
5657
registerer prometheus.Registerer,
5758
)*StoreReconciler {
5859
reconciler:=&StoreReconciler{
59-
store:store,
60-
pubsub:ps,
61-
logger:logger,
62-
cfg:cfg,
63-
clock:clock,
64-
registerer:registerer,
65-
done:make(chanstruct{},1),
60+
store:store,
61+
pubsub:ps,
62+
logger:logger,
63+
cfg:cfg,
64+
clock:clock,
65+
registerer:registerer,
66+
done:make(chanstruct{},1),
67+
provisionNotifyCh:make(chan database.ProvisionerJob,10),
6668
}
6769

6870
reconciler.metrics=NewMetricsCollector(store,logger,reconciler)
@@ -100,6 +102,29 @@ func (c *StoreReconciler) Run(ctx context.Context) {
100102
// NOTE: without this atomic bool, Stop might race with Run for the c.cancelFn above.
101103
c.running.Store(true)
102104

105+
// Publish provisioning jobs outside of database transactions.
106+
// A connection is held while a database transaction is active; PGPubsub also tries to acquire a new connection on
107+
// Publish, so we can exhaust available connections.
108+
//
109+
// A single worker dequeues from the channel, which should be sufficient.
110+
// If any messages are missed due to congestion or errors, provisionerdserver has a backup polling mechanism which
111+
// will periodically pick up any queued jobs (see poll(time.Duration) in coderd/provisionerdserver/acquirer.go).
112+
gofunc() {
113+
for {
114+
select {
115+
case<-c.done:
116+
return
117+
case<-ctx.Done():
118+
return
119+
casejob:=<-c.provisionNotifyCh:
120+
err:=provisionerjobs.PostJob(c.pubsub,job)
121+
iferr!=nil {
122+
c.logger.Error(ctx,"failed to post provisioner job to pubsub",slog.Error(err))
123+
}
124+
}
125+
}
126+
}()
127+
103128
for {
104129
select {
105130
// TODO: implement pubsub listener to allow reconciling a specific template imperatively once it has been changed,
@@ -576,10 +601,16 @@ func (c *StoreReconciler) provision(
576601
returnxerrors.Errorf("provision workspace: %w",err)
577602
}
578603

579-
err=provisionerjobs.PostJob(c.pubsub,*provisionerJob)
580-
iferr!=nil {
581-
// Client probably doesn't care about this error, so just log it.
582-
c.logger.Error(ctx,"failed to post provisioner job to pubsub",slog.Error(err))
604+
ifprovisionerJob==nil {
605+
returnnil
606+
}
607+
608+
// Publish provisioner job event outside of transaction.
609+
select {
610+
casec.provisionNotifyCh<-*provisionerJob:
611+
default:// channel full, drop the message; provisioner will pick this job up later with its periodic check, though.
612+
c.logger.Warn(ctx,"provisioner job notification queue full, dropping",
613+
slog.F("job_id",provisionerJob.ID),slog.F("prebuild_id",prebuildID.String()))
583614
}
584615

585616
c.logger.Info(ctx,"prebuild job scheduled",slog.F("transition",transition),

‎enterprise/coderd/prebuilds/reconcile_test.go

Lines changed: 110 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/prometheus/client_golang/prometheus"
12+
"golang.org/x/xerrors"
1213

1314
"github.com/coder/coder/v2/coderd/database/dbtime"
1415
"github.com/coder/coder/v2/coderd/util/slice"
@@ -303,107 +304,128 @@ func TestPrebuildReconciliation(t *testing.T) {
303304
for_,prebuildLatestTransition:=rangetc.prebuildLatestTransitions {
304305
for_,prebuildJobStatus:=rangetc.prebuildJobStatuses {
305306
for_,templateDeleted:=rangetc.templateDeleted {
306-
t.Run(fmt.Sprintf("%s - %s - %s",tc.name,prebuildLatestTransition,prebuildJobStatus),func(t*testing.T) {
307-
t.Parallel()
308-
t.Cleanup(func() {
309-
ift.Failed() {
310-
t.Logf("failed to run test: %s",tc.name)
311-
t.Logf("templateVersionActive: %t",templateVersionActive)
312-
t.Logf("prebuildLatestTransition: %s",prebuildLatestTransition)
313-
t.Logf("prebuildJobStatus: %s",prebuildJobStatus)
307+
for_,useBrokenPubsub:=range []bool{true,false} {
308+
t.Run(fmt.Sprintf("%s - %s - %s - pubsub_broken=%v",tc.name,prebuildLatestTransition,prebuildJobStatus,useBrokenPubsub),func(t*testing.T) {
309+
t.Parallel()
310+
t.Cleanup(func() {
311+
ift.Failed() {
312+
t.Logf("failed to run test: %s",tc.name)
313+
t.Logf("templateVersionActive: %t",templateVersionActive)
314+
t.Logf("prebuildLatestTransition: %s",prebuildLatestTransition)
315+
t.Logf("prebuildJobStatus: %s",prebuildJobStatus)
316+
}
317+
})
318+
clock:=quartz.NewMock(t)
319+
ctx:=testutil.Context(t,testutil.WaitShort)
320+
cfg:= codersdk.PrebuildsConfig{}
321+
logger:=slogtest.Make(
322+
t,&slogtest.Options{IgnoreErrors:true},
323+
).Leveled(slog.LevelDebug)
324+
db,pubSub:=dbtestutil.NewDB(t)
325+
326+
ownerID:=uuid.New()
327+
dbgen.User(t,db, database.User{
328+
ID:ownerID,
329+
})
330+
org,template:=setupTestDBTemplate(t,db,ownerID,templateDeleted)
331+
templateVersionID:=setupTestDBTemplateVersion(
332+
ctx,
333+
t,
334+
clock,
335+
db,
336+
pubSub,
337+
org.ID,
338+
ownerID,
339+
template.ID,
340+
)
341+
preset:=setupTestDBPreset(
342+
t,
343+
db,
344+
templateVersionID,
345+
1,
346+
uuid.New().String(),
347+
)
348+
prebuild:=setupTestDBPrebuild(
349+
t,
350+
clock,
351+
db,
352+
pubSub,
353+
prebuildLatestTransition,
354+
prebuildJobStatus,
355+
org.ID,
356+
preset,
357+
template.ID,
358+
templateVersionID,
359+
)
360+
361+
if!templateVersionActive {
362+
// Create a new template version and mark it as active
363+
// This marks the template version that we care about as inactive
364+
setupTestDBTemplateVersion(ctx,t,clock,db,pubSub,org.ID,ownerID,template.ID)
314365
}
315-
})
316-
clock:=quartz.NewMock(t)
317-
ctx:=testutil.Context(t,testutil.WaitShort)
318-
cfg:= codersdk.PrebuildsConfig{}
319-
logger:=slogtest.Make(
320-
t,&slogtest.Options{IgnoreErrors:true},
321-
).Leveled(slog.LevelDebug)
322-
db,pubSub:=dbtestutil.NewDB(t)
323-
controller:=prebuilds.NewStoreReconciler(db,pubSub,cfg,logger,quartz.NewMock(t),prometheus.NewRegistry())
324-
325-
ownerID:=uuid.New()
326-
dbgen.User(t,db, database.User{
327-
ID:ownerID,
328-
})
329-
org,template:=setupTestDBTemplate(t,db,ownerID,templateDeleted)
330-
templateVersionID:=setupTestDBTemplateVersion(
331-
ctx,
332-
t,
333-
clock,
334-
db,
335-
pubSub,
336-
org.ID,
337-
ownerID,
338-
template.ID,
339-
)
340-
preset:=setupTestDBPreset(
341-
t,
342-
db,
343-
templateVersionID,
344-
1,
345-
uuid.New().String(),
346-
)
347-
prebuild:=setupTestDBPrebuild(
348-
t,
349-
clock,
350-
db,
351-
pubSub,
352-
prebuildLatestTransition,
353-
prebuildJobStatus,
354-
org.ID,
355-
preset,
356-
template.ID,
357-
templateVersionID,
358-
)
359-
360-
if!templateVersionActive {
361-
// Create a new template version and mark it as active
362-
// This marks the template version that we care about as inactive
363-
setupTestDBTemplateVersion(ctx,t,clock,db,pubSub,org.ID,ownerID,template.ID)
364-
}
365-
366-
// Run the reconciliation multiple times to ensure idempotency
367-
// 8 was arbitrary, but large enough to reasonably trust the result
368-
fori:=1;i<=8;i++ {
369-
require.NoErrorf(t,controller.ReconcileAll(ctx),"failed on iteration %d",i)
370-
371-
iftc.shouldCreateNewPrebuild!=nil {
372-
newPrebuildCount:=0
373-
workspaces,err:=db.GetWorkspacesByTemplateID(ctx,template.ID)
374-
require.NoError(t,err)
375-
for_,workspace:=rangeworkspaces {
376-
ifworkspace.ID!=prebuild.ID {
377-
newPrebuildCount++
366+
367+
ifuseBrokenPubsub {
368+
pubSub=&brokenPublisher{Pubsub:pubSub}
369+
}
370+
controller:=prebuilds.NewStoreReconciler(db,pubSub,cfg,logger,quartz.NewMock(t),prometheus.NewRegistry())
371+
372+
// Run the reconciliation multiple times to ensure idempotency
373+
// 8 was arbitrary, but large enough to reasonably trust the result
374+
fori:=1;i<=8;i++ {
375+
require.NoErrorf(t,controller.ReconcileAll(ctx),"failed on iteration %d",i)
376+
377+
iftc.shouldCreateNewPrebuild!=nil {
378+
newPrebuildCount:=0
379+
workspaces,err:=db.GetWorkspacesByTemplateID(ctx,template.ID)
380+
require.NoError(t,err)
381+
for_,workspace:=rangeworkspaces {
382+
ifworkspace.ID!=prebuild.ID {
383+
newPrebuildCount++
384+
}
378385
}
386+
// This test configures a preset that desires one prebuild.
387+
// In cases where new prebuilds should be created, there should be exactly one.
388+
require.Equal(t,*tc.shouldCreateNewPrebuild,newPrebuildCount==1)
379389
}
380-
// This test configures a preset that desires one prebuild.
381-
// In cases where new prebuilds should be created, there should be exactly one.
382-
require.Equal(t,*tc.shouldCreateNewPrebuild,newPrebuildCount==1)
383-
}
384390

385-
iftc.shouldDeleteOldPrebuild!=nil {
386-
builds,err:=db.GetWorkspaceBuildsByWorkspaceID(ctx, database.GetWorkspaceBuildsByWorkspaceIDParams{
387-
WorkspaceID:prebuild.ID,
388-
})
389-
require.NoError(t,err)
390-
if*tc.shouldDeleteOldPrebuild {
391-
require.Equal(t,2,len(builds))
392-
require.Equal(t,database.WorkspaceTransitionDelete,builds[0].Transition)
393-
}else {
394-
require.Equal(t,1,len(builds))
395-
require.Equal(t,prebuildLatestTransition,builds[0].Transition)
391+
iftc.shouldDeleteOldPrebuild!=nil {
392+
builds,err:=db.GetWorkspaceBuildsByWorkspaceID(ctx, database.GetWorkspaceBuildsByWorkspaceIDParams{
393+
WorkspaceID:prebuild.ID,
394+
})
395+
require.NoError(t,err)
396+
if*tc.shouldDeleteOldPrebuild {
397+
require.Equal(t,2,len(builds))
398+
require.Equal(t,database.WorkspaceTransitionDelete,builds[0].Transition)
399+
}else {
400+
require.Equal(t,1,len(builds))
401+
require.Equal(t,prebuildLatestTransition,builds[0].Transition)
402+
}
396403
}
397404
}
398-
}
399-
})
405+
})
406+
}
400407
}
401408
}
402409
}
403410
}
404411
}
405412
}
406413

414+
// brokenPublisher is used to validate that Publish() calls which always fail do not affect the reconciler's behavior,
415+
// since the messages published are not essential but merely advisory.
416+
typebrokenPublisherstruct {
417+
pubsub.Pubsub
418+
}
419+
420+
// Publish deliberately fails.
421+
// I'm explicitly _not_ checking for EventJobPosted (coderd/database/provisionerjobs/provisionerjobs.go) since that
422+
// requires too much knowledge of the underlying implementation.
423+
func (*brokenPublisher)Publish(eventstring,_ []byte)error {
424+
// Mimick some work being done.
425+
<-time.After(testutil.IntervalFast)
426+
returnxerrors.Errorf("failed to publish %q",event)
427+
}
428+
407429
funcTestMultiplePresetsPerTemplateVersion(t*testing.T) {
408430
t.Parallel()
409431

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp