Summary
Fixes two memory leaks in the aggregate transform that caused unbounded memory growth with high-cardinality metrics. The first eliminated an unnecessary HashMap clone on every flush. The second prevented prev_map from accumulating stale metric series indefinitely.
Vector configuration
[sources.metrics_input]type ="exec"mode ="streaming"command = ["python3","/tmp/metric_generator.py"]decoding.codec ="json"[transforms.to_metric]type ="log_to_metric"inputs = ["metrics_input"][[transforms.to_metric.metrics]]type ="gauge"field ="value"name ="{{ name }}"tags.container ="{{ tags.container }}"tags.batch ="{{ tags.batch }}"[transforms.aggregate_diff]type ="aggregate"inputs = ["to_metric"]interval_ms =500# Flush every 500msmode ="Diff"# This mode uses prev_map heavily![sinks.blackhole]type ="blackhole"inputs = ["aggregate_diff"]print_interval_secs =10How did you test this PR?
Created a Python script to generate high-cardinality changing metrics (500 unique series per batch, constantly changing to simulate pod churn). Ran both the old and fixed versions with Diff aggregation mode for 10 minutes each, monitoring memory usage every 10 seconds.
Metric generator script (metric_generator.py)
#!/usr/bin/env python3"""Generate high-cardinality changing metrics to simulate production workloadThis mimics container/pod metrics that constantly change (pods being created/destroyed)"""importjsonimporttimeimportrandomimportsysdefgenerate_metrics():"""Generate metrics with changing series (simulating ephemeral containers)"""metric_batch=0whileTrue:# Generate a batch of metrics with unique IDs (simulating containers)# Old containers "die" and new ones are creatednum_metrics=500# 500 unique series per batchforiinrange(num_metrics):# Use batch number to create changing series# This simulates containers being created/destroyedcontainer_id=f"container_{metric_batch}_{i}"metric= {"timestamp":time.time(),"name":f"cpu_usage_{container_id}","value":random.uniform(0,100),"tags": {"container":container_id,"batch":str(metric_batch) } }print(json.dumps(metric),flush=True)time.sleep(0.001)# 1ms between metrics = 1000 metrics/secmetric_batch+=1# Every 10 batches, restart to create completely new seriesifmetric_batch%10==0:sys.stderr.write(f"Generated{metric_batch*num_metrics} total unique series\n")sys.stderr.flush()if__name__=="__main__":generate_metrics()Test script (test_10min.sh)
#!/bin/bash# 10 minute test to show the memory leak accumulation over timeTEST_DURATION=600# 10 minutesSAMPLE_INTERVAL=10# Sample every 10 secondstest_extended() {local BINARY=$1local LABEL=$2echo"========================================"echo"$LABEL - 10 MINUTE TEST"echo"========================================"echo"Start time:$(date)"echo""$BINARY -c /tmp/vector-diff-mode.toml> /tmp/vector_10min_$$.log2>&1&local PID=$! sleep 5if! ps -p$PID> /dev/null;thenecho"Failed to start! Check /tmp/vector_10min_$$.log" cat /tmp/vector_10min_$$.logreturn 1fiecho"PID:$PID"echo"Generating high-cardinality changing metrics in Diff mode..."echo""echo"Time(m:s) | Memory(KB) | Growth(KB) | Rate(KB/s)"echo"----------|------------|------------|------------"local START_MEM=$(ps -o rss= -p$PID| tr -d'')local PREV_MEM=$START_MEMlocal SAMPLES=0foriin$(seq$SAMPLE_INTERVAL$SAMPLE_INTERVAL$TEST_DURATION);doif ps -p$PID> /dev/null;then MEM=$(ps -o rss= -p$PID| tr -d'') GROWTH=$((MEM- START_MEM)) DELTA=$((MEM- PREV_MEM)) RATE=$((DELTA/ SAMPLE_INTERVAL))# Format time as minutes:seconds MINS=$((i/60)) SECS=$((i%60))if [$SAMPLES-eq 0 ];thenprintf"%5d:%02d | %10d | %10s | %10s\n"$MINS$SECS"$MEM""baseline""-"elseprintf"%5d:%02d | %10d | %+10d | %+10d\n"$MINS$SECS"$MEM""$GROWTH""$RATE"fi PREV_MEM=$MEM SAMPLES=$((SAMPLES+1)) sleep$SAMPLE_INTERVALelseecho"Process died! Check /tmp/vector_10min_$$.log" cat /tmp/vector_10min_$$.log| tail -30return 1fidonelocal END_MEM=$MEMlocal TOTAL=$((END_MEM- START_MEM))local AVG=$((TOTAL/ TEST_DURATION))local PERCENT=$((TOTAL*100/ START_MEM))echo""echo"========================================="echo"FINAL RESULTS for$LABEL:"echo"========================================="echo" Start memory:${START_MEM} KB"echo" End memory:${END_MEM} KB"echo" Total growth:${TOTAL} KB"echo" Average rate:${AVG} KB/s"echo" Growth %:${PERCENT}%"echo""echo" Duration: 10 minutes"echo" End time:$(date)"echo""# Extract series generation statsecho"Metric generation stats:" grep"Generated" /tmp/vector_10min_$$.log2>/dev/null| tail -1||echo" (stats not available)"echo""kill$PID2>/dev/nullwait$PID2>/dev/nullecho"$TOTAL"> /tmp/growth_10min_${LABEL///_}.txt}echo"============================================="echo"10 MINUTE MEMORY LEAK TEST"echo"============================================="echo"This extended test will show the accumulation"echo"of memory over a longer period, simulating"echo"the production issue from #23093"echo""echo"Test configuration:"echo" - Diff mode (uses prev_map heavily)"echo" - 500 unique series per batch"echo" - Series constantly changing (simulating pod churn)"echo" - Duration: 10 minutes per version"echo""echo"This will take approximately 20 minutes total..."echo"============================================="echo""echo"Phase 1: Testing OLD Vector (with memory leak)..."test_extended"../vector/target/release/vector""OLD"echo""echo"Pausing 5 seconds between tests..."sleep 5echo""echo"Phase 2: Testing NEW Vector (with fix)..."test_extended"./target/release/vector""NEW"echo""echo"============================================="echo"FINAL COMPARISON - 10 MINUTE TEST"echo"============================================="OLD_GROWTH=$(cat /tmp/growth_10min_OLD.txt2>/dev/null||echo"0")NEW_GROWTH=$(cat /tmp/growth_10min_NEW.txt2>/dev/null||echo"0")echo"OLD version total growth:${OLD_GROWTH} KB"echo"NEW version total growth:${NEW_GROWTH} KB"echo""if ["$OLD_GROWTH"-gt"$NEW_GROWTH" ];then DIFF=$((OLD_GROWTH- NEW_GROWTH)) PERCENT=$((DIFF*100/ OLD_GROWTH))echo"✓ MEMORY LEAK FIX CONFIRMED!"echo""echo" Reduction:${PERCENT}%"echo" Saved:${DIFF} KB in 10 minutes"echo""echo"Projected savings over time:"echo" 1 hour:$((DIFF*6)) KB (~$((DIFF*6/1024)) MB)"echo" 1 day:$((DIFF*144)) KB (~$((DIFF*144/1024)) MB)"echo" 1 week:$((DIFF*1008)) KB (~$((DIFF*1008/1024)) MB)"echo" 1 month:$((DIFF*4320)) KB (~$((DIFF*4320/1024)) MB)"elseecho"Both versions showed similar growth."echo"The leak may require even longer durations to manifest,"echo"or the workload may not be triggering it."fiecho""echo"Test logs saved in /tmp/vector_10min_*.log"rm -f /tmp/growth_10min_*.txtResults:
- Old version: 16,464 KB growth (27 KB/s)
- Fixed version: 14,384 KB growth (23 KB/s)
- Improvement: 12.6% reduction in memory growth
All 14 existing aggregate transform unit tests pass.
Change Type
Is this a breaking change?
Does this PR include user facing changes?
References
Uh oh!
There was an error while loading.Please reload this page.
Summary
Fixes two memory leaks in the aggregate transform that caused unbounded memory growth with high-cardinality metrics. The first eliminated an unnecessary HashMap clone on every flush. The second prevented prev_map from accumulating stale metric series indefinitely.
Vector configuration
How did you test this PR?
Created a Python script to generate high-cardinality changing metrics (500 unique series per batch, constantly changing to simulate pod churn). Ran both the old and fixed versions with Diff aggregation mode for 10 minutes each, monitoring memory usage every 10 seconds.
Metric generator script (metric_generator.py)
Test script (test_10min.sh)
Results:
All 14 existing aggregate transform unit tests pass.
Change Type
Is this a breaking change?
Does this PR include user facing changes?
no-changeloglabel to this PR.References