- Notifications
You must be signed in to change notification settings - Fork271
WIP Further reduce memory footprint by collapsing consumer processes and deduplicating shape structs#3230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Draft
alco wants to merge23 commits intomainChoose a base branch fromalco/consumer-process-slimdown
base:main
Could not load branches
Branch not found:{{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline, and old review comments may become outdated.
Uh oh!
There was an error while loading.Please reload this page.
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.Learn more about bidirectional Unicode characters
codecovbot commentedOct 5, 2025 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@## alco/config-slimdown #3230 +/- ##========================================================- Coverage 72.72% 70.34% -2.39%======================================================== Files 181 24 -157 Lines 9662 961 -8701 Branches 330 52 -278 ========================================================- Hits 7027 676 -6351+ Misses 2633 285 -2348+ Partials 2 0 -2
Flags with carried forward coverage won't be shown.Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Also remove the dependency injection for the most core function of theSnapshotter module. To preserve existing test logic and so avoidrewriting too much, we patch the function in tests using Repatch.
This is a singleton process per stack and the code is made easier tofollow when function calls to it are statically defined.
Generally speaking, this is unconventional, even within our owncodebase. ShapeStatus should be responsible for naming its own ETStables and encapsulating any other state pertaining to its function.Next, it doesn't need to keep storage in its state because it's onlyused for saving/restoring from backup. During initialisation, storage ispassed to it as an argument. Afterwards, only the backup directory pathis kept in the ShapeStatusOwner's state to be used at termination time.Finally, the way Consumer tests are written made it extra difficult toforego dynamic setting of the shape status module, so I left this optionfor use in tests.
This id is visible in Observer when viewing the supervisor's state.
And fix flaky consumer test as a result. Specifically, the "restarting aconsumer doesn't lower the last known offset when only snapshot ispresent" test would fail on the line # Restart the shape cache and the consumers Support.ComponentSetup.with_shape_cache(ctx)because shape removal resulted in a call to AsyncDeleter that failed tofind its configuration ETS table.
so as to avoid "spooky action from the distance" that unnecessarilyassumes the other process is alive
196a21e
toafd1b5f
CompareInstead of copying mostly the same config into all child specs ofDYnamicConsumerSupervisor, store the shared config inElectric.StackConfig.The shape itself takes up the bulk of the DynamicConsumerSupervisorstate's mem footprint. No need to pass it since it can be looked up fromShapeStatus.This also reduces copying of the shape struct between multipleprocesses. Like, for instance, we no longer copy the whole SHapeStatusstate into every task that starts consumer processes.
afd1b5f
toa535e82
Comparealco added a commit that referenced this pull requestOct 9, 2025
… of stack configuration sharing between processes (#3198)### TL;DR- removing some internal configuration fields which are stack-accessibleanyway and are simply passed between processes- removing some instances of dependency injection pattern which doeslittle more than obscure inter-module dependencies- introduce a stack-wide `StackConfig` process + ETS table to keepstatic configuration for the stack- strip away all path-related fields from PureFileStorage's struct toreduce per-consumer-process memory footprint### BackgroundAfter I inspected which Elixir processes in Electric were taking up themost memory, cloned configuration options that are repeated for everyshape were a clear offender. Most prominent of those is shape storageconfig that contains multiple paths, all sharing the same prefix.<details><summary>Here's the state of a single consumer process running inproduction</summary>```%{ monitors: [], buffer: [], registry: :"Registry.ShapeChanges:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", otel_ctx: %{ :"$__otel_baggage_ctx_key" => %{ "stack_id" => {"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", []} }, {:otel_tracer, :span_ctx} => {:span_ctx, 224916200833174271611003428090384065735, 13323822870313796073, 0, {:tracestate, []}, true, true, false, {:otel_span_ets, #Function<2.22926739/1 in :otel_tracer_server.on_end/1>}} }, hibernate_after: 30000, writer: {Electric.ShapeCache.PureFileStorage, {:writer_state, {:writer_acc, [], [], 0, [], 0, 0, LogOffset.last_before_real_offsets(), LogOffset.last_before_real_offsets(), LogOffset.last_before_real_offsets(), LogOffset.last_before_real_offsets(), 0, 0, 0, false, {LogOffset.last_before_real_offsets(), []}}, nil, nil, #Reference<1.702501262.1906180100.191398>, "latest.0", %Electric.ShapeCache.PureFileStorage{ buffer_ets: nil, base_path: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", data_dir: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/000000000-9999999999999999", tmp_dir: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/.tmp/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", metadata_dir: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/000000000-9999999999999999/metadata", log_dir: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/000000000-9999999999999999/log", stack_id: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", stack_ets: :"Elixir.Electric.ShapeCache.PureFileStorage:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", stack_task_supervisor: {:via, Registry, {:"Electric.ProcessRegistry:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", {Electric.ShapeCache.PureFileStorage.TaskSupervisor, nil}}}, shape_handle: "000000000-9999999999999999", chunk_bytes_threshold: 10485760, flush_period: 1000, compaction_config: %{period: 600000, keep_complete_chunks: 2}, version: 1 }}}, storage: {Electric.ShapeCache.PureFileStorage, %Electric.ShapeCache.PureFileStorage{ buffer_ets: nil, base_path: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", data_dir: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/000000000-9999999999999999", tmp_dir: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/.tmp/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", metadata_dir: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/000000000-9999999999999999/metadata", log_dir: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/000000000-9999999999999999/log", stack_id: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", stack_ets: :"Elixir.Electric.ShapeCache.PureFileStorage:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", stack_task_supervisor: {:via, Registry, {:"Electric.ProcessRegistry:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", {Electric.ShapeCache.PureFileStorage.TaskSupervisor, nil}}}, shape_handle: "000000000-9999999999999999", chunk_bytes_threshold: 10485760, flush_period: 1000, compaction_config: %{period: 600000, keep_complete_chunks: 2}, version: 1 }}, stack_id: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", chunk_bytes_threshold: 10485760, inspector: {Electric.Postgres.Inspector.EtsInspector, [ stack_id: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", server: {:via, Registry, {:"Electric.ProcessRegistry:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", {Electric.Postgres.Inspector.EtsInspector, nil}}} ]}, shape_status: {Electric.ShapeCache.ShapeStatus, %Electric.ShapeCache.ShapeStatus{ shape_meta_table: :"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:shape_meta_table", storage: {Electric.ShapeCache.PureFileStorage, %{ tmp_dir: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/.tmp/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", base_path: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", stack_id: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", chunk_bytes_threshold: 10485760, stack_ets: :"Elixir.Electric.ShapeCache.PureFileStorage:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", stack_task_supervisor: {:via, Registry, {:"Electric.ProcessRegistry:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", {Electric.ShapeCache.PureFileStorage.TaskSupervisor, nil}}}, flush_period: 1000, compaction_config: %{period: 600000, keep_complete_chunks: 2} }} }}, db_pool: {:via, Registry, {:"Electric.ProcessRegistry:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", {Electric.DbPool, :snapshot}}}, publication_manager: {Electric.Replication.PublicationManager, [stack_id: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"]}, shape_handle: "000000000-9999999999999999", log_producer: {:via, Registry, {:"Electric.ProcessRegistry:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", {Electric.Replication.ShapeLogCollector, nil}}}, run_with_conn_fn: &Electric.Shapes.Consumer.Snapshotter.run_with_conn/2, create_snapshot_fn: &Electric.Shapes.Consumer.Snapshotter.query_in_readonly_txn/7, shape: Shape.new!({00000, "public.table"}, where: "...", columns: [...] ), latest_offset: LogOffset.last_before_real_offsets(), pg_snapshot: %{ filter_txns?: true, xip_list: [], xmax: 0, xmin: 0 }, snapshot_started: true, txn_offset_mapping: [], materializer_subscribed?: false, awaiting_snapshot_start: [], hibernate_timer: nil, cleaned?: false}```</details>Reducing that in-memory size of that state is the main focus of this PR.***N.B.** The size of the pretty-printed state above doesn't directlymap to its in-memory size because the majority of values there are atoms(which are interned in BEAM) and some maps that are printed more thanonce may actually be stored once in memory. But due to the dataimmutability and each Elixir process having independent heap, there'snot a lot of structured sharing happening.*### Overview of code changesThe key change is made to the PureFileStorage struct: I've removed allof its fields that can be calculated from the shared options andrefactored field accesses into function calls in the storageimplementation.Another significant refactoring is avoiding passing `shape_status_state`everywhere since it's static per stack. I haven't been able to remove itfrom `Electric.Shapes.Consumer` because of how pervasively mocks areused in consumer tests.Repairing tests after doing the refactorings has eaten a large chunk ofthe development time. It is in large part due to how configuration ispassed between modules dynamically, with ad-hoc Nimble schemas, defaultvalues being set in multiple places, etc. We should really centralizestack-wide config at some point which is now possible to introducegradually with `Electric.StackConfig`.### Note on the use of ETS for `Electric.StackConfig`My original goal was to persist the shared configuration into`:persistent_term` because it's very fast for reads and doesn't copydata into process heaps. But there's a dealbreaker in the way`:persistent_term` works: modifying or deleting a value stored in itresults in the BEAM VM scheduling a heap scanning + GC pass for _allrunning processes_.### Mem footprint comparisonsFollowing are comparisons done in different environments.First, a **comparison of OS stats** when running Electric locally usingthe `main` branch vs this PR.<img width="2560" height="3072"alt="main_shared_config_htop_custom_storage_cmp"src="https://github.com/user-attachments/assets/a8398402-ac96-449e-9a2b-4f17db4bd718"/>Based on these numbers, the difference appears to be small. I'mattributing it to the fact that BEAM retains memory it has previouslyrequested from the OS and doesn't immediately release it. Take a look atthe **memory usage numbers reported by the BEAM VM** itself below(screenshots are from Erlang's Observer tool):<img width="1260" height="1438"alt="main_shared_config_sys_stats_custom_storage_cmp"src="https://github.com/user-attachments/assets/27c65fdf-f968-419b-a58d-cd72d96a1a23"/>Here the difference is more pronounced.Finally, a **comparison of OTEL metrics gathered in staging env** basedon charts that show memory footprint in relation to the total number ofshapes, within a 1.5-hour time window.[`main`branch](https://ui.honeycomb.io/electric-sql-06/environments/pr-846/datasets/electric-region/board-query/cDij6tGm74Z/result/xbb766Tqwxb?vs=hideCompare&cs_2=omitMissingValues):<img width="2343" height="863" alt="main_staging_mem"src="https://github.com/user-attachments/assets/75625c8b-9f89-42a3-9a57-a690eff82e86"/>[PRbranch](https://ui.honeycomb.io/electric-sql-06/environments/pr-846/datasets/electric-region/board-query/cDij6tGm74Z/result/Dd69w6wUvdu?vs=hideCompare&cs_2=omitMissingValues):<img width="2343" height="863" alt="shared_config_staging_mem"src="https://github.com/user-attachments/assets/29499523-4f4e-4c79-9adf-97667ae5cae9"/>Key points:- at 65K shapes `main` shows combined process heap at 2.32GB and the PRbranch is at 1.81GB- when shape expiry eventually brings the number of shapes down to 60K,`main` sits at 2.1GB whereas the PR branch is at 1.73GB.### Takeaways- In an architecture where a large number of user-defined resources(shapes) are apped to Elixir processes, we need to be very precise aboutwhat to keep in the process state.- Shared configuration should be centralized as much as possible asopposed to duplicating it in tens of thousands of processes.- There's also the cost of copying the same terms from one memorylocation to another every time a new process is started that uses thatterm.- The configuration options passed to supervisor children are also keptin the supervisor process' memory as part of child specs. This isespecially noticeable in the case of DynamicConsumerSupervisor, whichends up using 200-300MB just to keep shared config and shapes that arealready stored in ETS tables. This is addressed in#3230.- The next big improvement in memory efficiency is going to be found intransaction processing and ShapeLogCollector. We need to investigate theimpact of streaming transaction data and decoding only the few columnsfrom each successive row that are needed for shape index look up first,before we decode the whole transaction received from Postgres intomemory. The rows themselves can be more efficiently written into shapelogs without fully materializing them in memory and passing around asmessages. The ShapeLogCollector's index could be eventually moved todisk, with an in-memory cache for highly active shapes.
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading.Please reload this page.
This is a logical continuation of#3198. The separate PR is for ease of review.
Key changes made here:
I've compared overall mem usage of BEAM at 50K shapes between
main
, previous PR, +collapsed processes, +remove shape structs from DynamicConsumerSupervisor:I took those figures from the visual progression below that demonstrates preliminary gains (I still have to fix tests and refactor config passing). In all of the following, Electric is started with 50K shapes in its persistent storage; I wait for its uptime to reach 2 minutes, take a screenshot and shut it down with a SIGTERM.
main
:alco/config-slimdown
branch which is the previous PR#3198:After43a6b26 (collapsing consumer processes):

After196a21e (trim down DynamicConsumerSupervisor's state):
