Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit148442e

Browse files
chore: sync release v1.63.1 to main branch (#6504)
# DescriptionSyncing patch release v1.63.1 to main branch**↓↓ Please review and edit commit overrides before merging ↓↓**BEGIN_COMMIT_OVERRIDEfix: worker job buffer increased memory requirements causing excessivecpu for garbage collection (#6500)END_COMMIT_OVERRIDE---------Co-authored-by: Aris Tzoumas <atzoum@gmail.com>
1 parent488ba89 commit148442e

10 files changed

+75
-53
lines changed

‎CHANGELOG.md‎

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Changelog
22

3+
## [1.63.1](https://github.com/rudderlabs/rudder-server/compare/v1.63.0...v1.63.1) (2025-11-17)
4+
5+
6+
### Bug Fixes
7+
8+
* worker job buffer increased memory requirements causing excessive cpu for garbage collection ([#6500](https://github.com/rudderlabs/rudder-server/issues/6500)) ([26843ca](https://github.com/rudderlabs/rudder-server/commit/26843ca38685ed7cb80a2376d2bb140af34ba777))
9+
310
## [1.63.0](https://github.com/rudderlabs/rudder-server/compare/v1.62.0...v1.63.0) (2025-11-11)
411

512

‎router/handle_lifecycle.go‎

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,14 @@ func (rt *Handle) Setup(
108108
rt.noOfWorkers=getRouterConfigInt("noOfWorkers",destType,64)
109109
rt.maxNoOfJobsPerChannel=getRouterConfigInt("maxNoOfJobsPerChannel",destType,10000)
110110
rt.noOfJobsPerChannel=getRouterConfigInt("noOfJobsPerChannel",destType,1000)
111+
ifrt.noOfJobsPerChannel>rt.maxNoOfJobsPerChannel {// if noOfJobsPerChannel is more than max, set it as the new max
112+
rt.logger.Warnn("noOfJobsPerChannel is more than maxNoOfJobsPerChannel, setting maxNoOfJobsPerChannel to noOfJobsPerChannel value",
113+
obskit.DestinationType(rt.destType),
114+
logger.NewIntField("maxNoOfJobsPerChannel",int64(rt.maxNoOfJobsPerChannel)),
115+
logger.NewIntField("noOfJobsPerChannel",int64(rt.noOfJobsPerChannel)),
116+
)
117+
rt.maxNoOfJobsPerChannel=rt.noOfJobsPerChannel
118+
}
111119
// Explicitly control destination types for which we want to support batching
112120
// Avoiding stale configurations still having KAFKA batching enabled to cause issues with later versions of rudder-server
113121
batchingSupportedDestinations:=config.GetStringSliceVar([]string{"AM"},"Router.batchingSupportedDestinations")
@@ -336,6 +344,7 @@ func (rt *Handle) setupReloadableVars() {
336344
rt.reloadableConfig.oauthV2ExpirationTimeDiff=config.GetReloadableDurationVar(5,time.Minute,getRouterConfigKeys("oauth.expirationTimeDiff",rt.destType)...)
337345
rt.reloadableConfig.enableExperimentalBufferSizeCalculator=config.GetReloadableBoolVar(false,getRouterConfigKeys("enableExperimentalBufferSizeCalculator",rt.destType)...)
338346
rt.reloadableConfig.experimentalBufferSizeScalingFactor=config.GetReloadableFloat64Var(2.0,getRouterConfigKeys("experimentalBufferSizeScalingFactor",rt.destType)...)
347+
rt.reloadableConfig.experimentalBufferSizeMinimum=config.GetReloadableIntVar(500,1,getRouterConfigKeys("experimentalBufferSizeMinimum",rt.destType)...)
339348
rt.diagnosisTickerTime=config.GetDurationVar(60,time.Second,"Diagnostics.routerTimePeriod","Diagnostics.routerTimePeriodInS")
340349
rt.netClientTimeout=config.GetDurationVar(10,time.Second,
341350
"Router."+rt.destType+".httpTimeout",

‎router/partition_worker.go‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func newPartitionWorker(ctx context.Context, rt *Handle, partition string) *part
5555
workLoopThroughput,
5656
rt.reloadableConfig.experimentalBufferSizeScalingFactor,
5757
rt.noOfJobsPerChannel,
58+
rt.reloadableConfig.experimentalBufferSizeMinimum,
5859
),
5960
&workerBufferStats{
6061
onceEvery:kitsync.NewOnceEvery(5*time.Second),

‎router/types.go‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,5 @@ type reloadableConfig struct {
7373
oauthV2ExpirationTimeDiff config.ValueLoader[time.Duration]
7474
enableExperimentalBufferSizeCalculator config.ValueLoader[bool]// whether to use the experimental worker buffer size calculator or not
7575
experimentalBufferSizeScalingFactor config.ValueLoader[float64]// scaling factor to scale up the buffer size in the experimental calculator
76+
experimentalBufferSizeMinimum config.ValueLoader[int]// minimum buffer size in the experimental calculator
7677
}

‎router/worker_batch_loop.go‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ type workerBatchLoop struct {
1313
ctx context.Context// context for managing the lifecycle of the loop
1414
jobsBatchTimeout config.ValueLoader[time.Duration]// timeout for processing jobs in a batch
1515
noOfJobsToBatchInAWorker config.ValueLoader[int]// maximum number of jobs to batch in a worker before processing
16-
inputCh<-chanworkerJob// channel to receive jobs for processing
16+
inputCh<-chan*workerJob// channel to receive jobs for processing
1717
enableBatchingbool// whether to enable batching of jobs
1818
batchTransformfunc(routerJobs []types.RouterJobT) []types.DestinationJobT// function to transform router jobs into destination jobs in batch mode
1919
transformfunc(routerJobs []types.RouterJobT) []types.DestinationJobT// function to transform router jobs into destination jobs in non-batch mode
@@ -69,7 +69,7 @@ func (wl *workerBatchLoop) runLoop() {
6969
doProcessRouterJobs()
7070
}
7171
start:=time.Now()
72-
ifrouterJob:=wl.acceptWorkerJob(workerJob);routerJob!=nil {
72+
ifrouterJob:=wl.acceptWorkerJob(*workerJob);routerJob!=nil {
7373
routerJobs=append(routerJobs,*routerJob)
7474
ifwl.noOfJobsToBatchInAWorker.Load()<=len(routerJobs) {
7575
doProcessRouterJobs()// process the batch if it reaches the limit

‎router/worker_batch_loop_test.go‎

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
funcTestWorkerBatchLoop(t*testing.T) {
2222
t.Run("no batching and no transform at router, acceptWorkerJob returning always nil",func(t*testing.T) {
2323
// Setup
24-
inputCh:=make(chanworkerJob,10)
24+
inputCh:=make(chan*workerJob,10)
2525
batchSize:=5
2626

2727
varprocessedJobs []types.DestinationJobT
@@ -75,7 +75,7 @@ func TestWorkerBatchLoop(t *testing.T) {
7575
params:=&routerutils.JobParameters{
7676
TransformAt:"processor",// Not "router"
7777
}
78-
inputCh<-workerJob{job:job,parameters:params}
78+
inputCh<-&workerJob{job:job,parameters:params}
7979
}
8080

8181
// Wait a bit to ensure jobs are processed and some timeouts are triggered too
@@ -104,7 +104,7 @@ func TestWorkerBatchLoop(t *testing.T) {
104104

105105
t.Run("no batching and transform at router, then switch to no router transform",func(t*testing.T) {
106106
// Setup
107-
inputCh:=make(chanworkerJob,10)
107+
inputCh:=make(chan*workerJob,10)
108108
batchSize:=5
109109

110110
varprocessedJobs []types.DestinationJobT
@@ -184,7 +184,7 @@ func TestWorkerBatchLoop(t *testing.T) {
184184
params:=&routerutils.JobParameters{
185185
TransformAt:"router",
186186
}
187-
inputCh<-workerJob{job:job,parameters:params}
187+
inputCh<-&workerJob{job:job,parameters:params}
188188
}
189189

190190
// Send a job with transform_at != "router" - this should trigger batch processing
@@ -195,7 +195,7 @@ func TestWorkerBatchLoop(t *testing.T) {
195195
params:=&routerutils.JobParameters{
196196
TransformAt:"processor",// Not "router"
197197
}
198-
inputCh<-workerJob{job:job,parameters:params}
198+
inputCh<-&workerJob{job:job,parameters:params}
199199

200200
// Wait a bit to ensure processing
201201
time.Sleep(200*time.Millisecond)
@@ -226,7 +226,7 @@ func TestWorkerBatchLoop(t *testing.T) {
226226

227227
t.Run("with batching enabled",func(t*testing.T) {
228228
// Setup
229-
inputCh:=make(chanworkerJob,10)
229+
inputCh:=make(chan*workerJob,10)
230230
batchSize:=3
231231

232232
varprocessedJobs []types.DestinationJobT
@@ -292,7 +292,7 @@ func TestWorkerBatchLoop(t *testing.T) {
292292
params:=&routerutils.JobParameters{
293293
TransformAt:"router",
294294
}
295-
inputCh<-workerJob{job:job,parameters:params}
295+
inputCh<-&workerJob{job:job,parameters:params}
296296
}
297297

298298
// Wait for batch processing
@@ -306,7 +306,7 @@ func TestWorkerBatchLoop(t *testing.T) {
306306
params:=&routerutils.JobParameters{
307307
TransformAt:"router",
308308
}
309-
inputCh<-workerJob{job:job,parameters:params}
309+
inputCh<-&workerJob{job:job,parameters:params}
310310

311311
// Wait for timeout to trigger processing of remaining jobs
312312
time.Sleep(200*time.Millisecond+50*time.Millisecond)
@@ -338,7 +338,7 @@ func TestWorkerBatchLoop(t *testing.T) {
338338

339339
t.Run("jobsBatchTimeout resets even when no jobs during timeout",func(t*testing.T) {
340340
// Setup
341-
inputCh:=make(chanworkerJob,10)
341+
inputCh:=make(chan*workerJob,10)
342342
batchSize:=5
343343

344344
vartimeoutFireCountint
@@ -400,7 +400,7 @@ func TestWorkerBatchLoop(t *testing.T) {
400400
params:=&routerutils.JobParameters{
401401
TransformAt:"router",
402402
}
403-
inputCh<-workerJob{job:job,parameters:params}
403+
inputCh<-&workerJob{job:job,parameters:params}
404404

405405
// Wait longer than timeout to ensure timeout fires
406406
time.Sleep(60*time.Millisecond)
@@ -433,7 +433,7 @@ func TestWorkerBatchLoop(t *testing.T) {
433433

434434
t.Run("batch processing when batch size is reached",func(t*testing.T) {
435435
// Setup
436-
inputCh:=make(chanworkerJob,10)
436+
inputCh:=make(chan*workerJob,10)
437437
batchSize:=2// Small batch size for testing
438438

439439
varprocessedBatches [][]types.DestinationJobT
@@ -493,7 +493,7 @@ func TestWorkerBatchLoop(t *testing.T) {
493493
params:=&routerutils.JobParameters{
494494
TransformAt:"router",
495495
}
496-
inputCh<-workerJob{job:job,parameters:params}
496+
inputCh<-&workerJob{job:job,parameters:params}
497497
}
498498

499499
// Wait for processing
@@ -508,7 +508,7 @@ func TestWorkerBatchLoop(t *testing.T) {
508508
params:=&routerutils.JobParameters{
509509
TransformAt:"router",
510510
}
511-
inputCh<-workerJob{job:job,parameters:params}
511+
inputCh<-&workerJob{job:job,parameters:params}
512512
}
513513

514514
// Wait for processing

‎router/worker_buffer.go‎

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
typeworkerBufferstruct {
1414
maxCapacityint
1515
targetCapacityfunc()int
16-
jobschanworkerJob
16+
jobschan*workerJob
1717

1818
stats*workerBufferStats
1919
mu sync.RWMutex
@@ -36,7 +36,7 @@ func newWorkerBuffer(maxCapacity int, targetCapacity func() int, stats *workerBu
3636
wb:=&workerBuffer{
3737
maxCapacity:maxCapacity,
3838
targetCapacity:targetCapacity,
39-
jobs:make(chanworkerJob,maxCapacity),
39+
jobs:make(chan*workerJob,maxCapacity),
4040
stats:stats,
4141
}
4242
returnwb
@@ -50,13 +50,13 @@ func newSimpleWorkerBuffer(capacity int) *workerBuffer {
5050
wb:=&workerBuffer{
5151
maxCapacity:capacity,
5252
targetCapacity:func()int {returncapacity },
53-
jobs:make(chanworkerJob,capacity),
53+
jobs:make(chan*workerJob,capacity),
5454
stats:nil,
5555
}
5656
returnwb
5757
}
5858

59-
func (wb*workerBuffer)Jobs()<-chanworkerJob {
59+
func (wb*workerBuffer)Jobs()<-chan*workerJob {
6060
returnwb.jobs
6161
}
6262

@@ -127,7 +127,7 @@ func (rs *reservedSlot) Use(wj workerJob) {
127127
rs.wb.mu.Lock()
128128
deferrs.wb.mu.Unlock()
129129
rs.wb.reservations--
130-
rs.wb.jobs<-wj
130+
rs.wb.jobs<-&wj
131131
}
132132

133133
// Release releases the reserved slot from the worker's buffer

‎router/worker_buffer_calculator.go‎

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,13 @@ func newExperimentalBufferSizeCalculator(
4444
noOfJobsToBatchInAWorker config.ValueLoader[int],// number of jobs that a worker can batch together
4545
workLoopThroughput metric.SimpleMovingAverage,// sliding average of work loop throughput
4646
scalingFactor config.ValueLoader[float64],// scaling factor to scale up the buffer size
47+
minBufferSize config.ValueLoader[int],// minimum buffer size
4748
)bufferSizeCalculator {
4849
returnfunc()int {
49-
constminBufferSize=1
50+
constone=1
5051
m1:=workLoopThroughput.Load()// at least the average throughput of the work loop
5152
ifm1<1 {// if there is no throughput yet, the throughput is less than 1 per second, set buffer to minBufferSize
52-
returnminBufferSize
53+
returnone
5354
}
5455
m2:=float64(jobQueryBatchSize.Load()/noOfWorkers)// at least the average number of jobs per worker during pickup
5556
m3:=float64(noOfJobsToBatchInAWorker.Load())// at least equal to the number of jobs to batch in a worker
@@ -59,7 +60,7 @@ func newExperimentalBufferSizeCalculator(
5960
// calculate the maximum of the three metrics to determine the buffer size
6061
math.Max(
6162
math.Max(math.Max(m1,m2),m3)*scalingFactor.Load(),// scale up to provide some buffer
62-
minBufferSize,// ensure buffer size is at leastminBufferSize
63+
math.Max(one,float64(minBufferSize.Load())),// ensure buffer size is at leastone or the configured minimum
6364
)))
6465
}
6566
}
@@ -74,13 +75,15 @@ func newBufferSizeCalculatorSwitcher(
7475
workLoopThroughput metric.SimpleMovingAverage,// sliding average of work loop throughput
7576
scalingFactor config.ValueLoader[float64],// scaling factor to scale up the buffer size
7677
noOfJobsPerChannelint,// number of jobs per channel
78+
minBufferSize config.ValueLoader[int],// minimum buffer size
7779
)bufferSizeCalculator {
7880
new:=newExperimentalBufferSizeCalculator(
7981
jobQueryBatchSize,
8082
noOfWorkers,
8183
noOfJobsToBatchInAWorker,
8284
workLoopThroughput,
8385
scalingFactor,
86+
minBufferSize,
8487
)
8588
legacy:=newStandardBufferSizeCalculator(
8689
noOfJobsToBatchInAWorker,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp