Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

WIP Further reduce memory footprint by collapsing consumer processes and deduplicating shape structs#3230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Draft
alco wants to merge23 commits intomain
base:main
Choose a base branch
Loading
fromalco/consumer-process-slimdown

Conversation

alco
Copy link
Member

@alcoalco commentedOct 5, 2025
edited
Loading

This is a logical continuation of#3198. The separate PR is for ease of review.

Key changes made here:

  • The ConsumerSupervisor tree is collapsed into a single Consumer process (@magnetised's brilliant idea)
  • Stack-wide config is stored in ETS and is looked up directly by Consumer processes when needed. This avoid copying the same config through ShapeCache -> DynamicConsumerSupervisor
  • Shape structs are looked up in ShapeStatus directly by Consumer processes. This significantly reduces the mem footprint of DynamicConsumerSupervisor (which keeps every child's config in its state) for sources that define large shapes (e.g. with long columns lists and/or where clauses) and allows us to eliminate more copying by no longer copying the whole ShapeStatus state into every task that starts one Consumer process.

I've compared overall mem usage of BEAM at 50K shapes betweenmain, previous PR, +collapsed processes, +remove shape structs from DynamicConsumerSupervisor:

 OS process memory (RES in htop)Total BEAM memoryTotal processes memoryTotal process countTotal consumer startup time
main3.76GB2.37GB1.88GB10399011sec
“Slimmed down config” PR3.21GB1.71GB1.22GB10399111sec
With collapsed processes2.56GB1.44GB979MB521429.5sec
With slimmed down DynamicConsumerSupervisor2.08GB1.12GB668MB521428.6sec

I took those figures from the visual progression below that demonstrates preliminary gains (I still have to fix tests and refactor config passing). In all of the following, Electric is started with 50K shapes in its persistent storage; I wait for its uptime to reach 2 minutes, take a screenshot and shut it down with a SIGTERM.

main:
mem_stats_main

alco/config-slimdown branch which is the previous PR#3198:
mem_stats_pre_optim

After43a6b26 (collapsing consumer processes):
mem_stats_without_consumer_sup

After196a21e (trim down DynamicConsumerSupervisor's state):
mem_stats_with_less_copying

balegas reacted with thumbs up emoji
@codecovCodecov
Copy link

codecovbot commentedOct 5, 2025
edited
Loading

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 70.34%. Comparing base (6afd6ca) to head (a535e82).

Additional details and impacted files
@@                   Coverage Diff                    @@##           alco/config-slimdown    #3230      +/-   ##========================================================- Coverage                 72.72%   70.34%   -2.39%========================================================  Files                       181       24     -157       Lines                      9662      961    -8701       Branches                    330       52     -278     ========================================================- Hits                       7027      676    -6351+ Misses                     2633      285    -2348+ Partials                      2        0       -2
FlagCoverage Δ
elixir74.47% <ø> (+4.13%)⬆️
elixir-client74.47% <ø> (+0.52%)⬆️
packages/experimental?
packages/react-hooks?
packages/typescript-client?
packages/y-electric55.12% <ø> (ø)
postgres-140000?
postgres-150000?
postgres-170000?
postgres-180000?
sync-service?
typescript55.12% <ø> (-32.15%)⬇️
unit-tests70.34% <ø> (-2.39%)⬇️

Flags with carried forward coverage won't be shown.Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report?Share it here.

🚀 New features to boost your workflow:
  • 📦JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Also remove the dependency injection for the most core function of theSnapshotter module. To preserve existing test logic and so avoidrewriting too much, we patch the function in tests using Repatch.
This is a singleton process per stack and the code is made easier tofollow when function calls to it are statically defined.
Generally speaking, this is unconventional, even within our owncodebase. ShapeStatus should be responsible for naming its own ETStables and encapsulating any other state pertaining to its function.Next, it doesn't need to keep storage in its state because it's onlyused for saving/restoring from backup. During initialisation, storage ispassed to it as an argument. Afterwards, only the backup directory pathis kept in the ShapeStatusOwner's state to be used at termination time.Finally, the way Consumer tests are written made it extra difficult toforego dynamic setting of the shape status module, so I left this optionfor use in tests.
This id is visible in Observer when viewing the supervisor's state.
And fix flaky consumer test as a result. Specifically, the "restarting aconsumer doesn't lower the last known offset when only snapshot ispresent" test would fail on the line        # Restart the shape cache and the consumers        Support.ComponentSetup.with_shape_cache(ctx)because shape removal resulted in a call to AsyncDeleter that failed tofind its configuration ETS table.
so as to avoid "spooky action from the distance" that unnecessarilyassumes the other process is alive
@alcoalcoforce-pushed thealco/config-slimdown branch from3635705 toc5ce1adCompareOctober 8, 2025 14:58
@alcoalcoforce-pushed thealco/consumer-process-slimdown branch from196a21e toafd1b5fCompareOctober 8, 2025 15:40
@alcoalcoforce-pushed thealco/config-slimdown branch frome8e17bc toe6fda79CompareOctober 8, 2025 20:07
@alcoalcoforce-pushed thealco/config-slimdown branch frome6fda79 to6afd6caCompareOctober 8, 2025 20:11
Instead of copying mostly the same config into all child specs ofDYnamicConsumerSupervisor, store the shared config inElectric.StackConfig.The shape itself takes up the bulk of the DynamicConsumerSupervisorstate's mem footprint. No need to pass it since it can be looked up fromShapeStatus.This also reduces copying of the shape struct between multipleprocesses. Like, for instance, we no longer copy the whole SHapeStatusstate into every task that starts consumer processes.
@alcoalcoforce-pushed thealco/consumer-process-slimdown branch fromafd1b5f toa535e82CompareOctober 9, 2025 08:43
@magnetisedmagnetised self-assigned thisOct 9, 2025
alco added a commit that referenced this pull requestOct 9, 2025
… of stack configuration sharing between processes (#3198)### TL;DR- removing some internal configuration fields which are stack-accessibleanyway and are simply passed between processes- removing some instances of dependency injection pattern which doeslittle more than obscure inter-module dependencies- introduce a stack-wide `StackConfig` process + ETS table to keepstatic configuration for the stack- strip away all path-related fields from PureFileStorage's struct toreduce per-consumer-process memory footprint### BackgroundAfter I inspected which Elixir processes in Electric were taking up themost memory, cloned configuration options that are repeated for everyshape were a clear offender. Most prominent of those is shape storageconfig that contains multiple paths, all sharing the same prefix.<details><summary>Here's the state of a single consumer process running inproduction</summary>```%{  monitors: [],  buffer: [],  registry: :"Registry.ShapeChanges:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",  otel_ctx: %{    :"$__otel_baggage_ctx_key" => %{      "stack_id" => {"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", []}    },    {:otel_tracer, :span_ctx} => {:span_ctx,     224916200833174271611003428090384065735, 13323822870313796073, 0,     {:tracestate, []}, true, true, false,     {:otel_span_ets, #Function<2.22926739/1 in :otel_tracer_server.on_end/1>}}  },  hibernate_after: 30000,  writer: {Electric.ShapeCache.PureFileStorage,   {:writer_state,    {:writer_acc, [], [], 0, [], 0, 0, LogOffset.last_before_real_offsets(),     LogOffset.last_before_real_offsets(), LogOffset.last_before_real_offsets(),     LogOffset.last_before_real_offsets(), 0, 0, 0, false,     {LogOffset.last_before_real_offsets(), []}}, nil, nil,    #Reference<1.702501262.1906180100.191398>, "latest.0",    %Electric.ShapeCache.PureFileStorage{      buffer_ets: nil,      base_path: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",      data_dir: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/000000000-9999999999999999",      tmp_dir: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/.tmp/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",      metadata_dir: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/000000000-9999999999999999/metadata",      log_dir: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/000000000-9999999999999999/log",      stack_id: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",      stack_ets: :"Elixir.Electric.ShapeCache.PureFileStorage:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",      stack_task_supervisor: {:via, Registry,       {:"Electric.ProcessRegistry:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",        {Electric.ShapeCache.PureFileStorage.TaskSupervisor, nil}}},      shape_handle: "000000000-9999999999999999",      chunk_bytes_threshold: 10485760,      flush_period: 1000,      compaction_config: %{period: 600000, keep_complete_chunks: 2},      version: 1    }}},  storage: {Electric.ShapeCache.PureFileStorage,   %Electric.ShapeCache.PureFileStorage{     buffer_ets: nil,     base_path: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",     data_dir: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/000000000-9999999999999999",     tmp_dir: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/.tmp/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",     metadata_dir: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/000000000-9999999999999999/metadata",     log_dir: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/000000000-9999999999999999/log",     stack_id: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",     stack_ets: :"Elixir.Electric.ShapeCache.PureFileStorage:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",     stack_task_supervisor: {:via, Registry,      {:"Electric.ProcessRegistry:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",       {Electric.ShapeCache.PureFileStorage.TaskSupervisor, nil}}},     shape_handle: "000000000-9999999999999999",     chunk_bytes_threshold: 10485760,     flush_period: 1000,     compaction_config: %{period: 600000, keep_complete_chunks: 2},     version: 1   }},  stack_id: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",  chunk_bytes_threshold: 10485760,  inspector: {Electric.Postgres.Inspector.EtsInspector,   [          stack_id: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",     server: {:via, Registry,      {:"Electric.ProcessRegistry:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",       {Electric.Postgres.Inspector.EtsInspector, nil}}}   ]},  shape_status: {Electric.ShapeCache.ShapeStatus,   %Electric.ShapeCache.ShapeStatus{     shape_meta_table: :"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:shape_meta_table",     storage: {Electric.ShapeCache.PureFileStorage,      %{        tmp_dir: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/.tmp/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",        base_path: "./persistent/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/shapes/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",        stack_id: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",        chunk_bytes_threshold: 10485760,        stack_ets: :"Elixir.Electric.ShapeCache.PureFileStorage:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",        stack_task_supervisor: {:via, Registry,         {:"Electric.ProcessRegistry:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",          {Electric.ShapeCache.PureFileStorage.TaskSupervisor, nil}}},        flush_period: 1000,        compaction_config: %{period: 600000, keep_complete_chunks: 2}      }}   }},  db_pool: {:via, Registry,   {:"Electric.ProcessRegistry:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",    {Electric.DbPool, :snapshot}}},  publication_manager: {Electric.Replication.PublicationManager,   [stack_id: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"]},  shape_handle: "000000000-9999999999999999",  log_producer: {:via, Registry,   {:"Electric.ProcessRegistry:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",    {Electric.Replication.ShapeLogCollector, nil}}},  run_with_conn_fn: &Electric.Shapes.Consumer.Snapshotter.run_with_conn/2,  create_snapshot_fn: &Electric.Shapes.Consumer.Snapshotter.query_in_readonly_txn/7,  shape: Shape.new!({00000, "public.table"},     where: "...",    columns: [...]  ),  latest_offset: LogOffset.last_before_real_offsets(),  pg_snapshot: %{    filter_txns?: true,    xip_list: [],    xmax: 0,    xmin: 0  },  snapshot_started: true,  txn_offset_mapping: [],  materializer_subscribed?: false,  awaiting_snapshot_start: [],  hibernate_timer: nil,  cleaned?: false}```</details>Reducing that in-memory size of that state is the main focus of this PR.***N.B.** The size of the pretty-printed state above doesn't directlymap to its in-memory size because the majority of values there are atoms(which are interned in BEAM) and some maps that are printed more thanonce may actually be stored once in memory. But due to the dataimmutability and each Elixir process having independent heap, there'snot a lot of structured sharing happening.*### Overview of code changesThe key change is made to the PureFileStorage struct: I've removed allof its fields that can be calculated from the shared options andrefactored field accesses into function calls in the storageimplementation.Another significant refactoring is avoiding passing `shape_status_state`everywhere since it's static per stack. I haven't been able to remove itfrom `Electric.Shapes.Consumer` because of how pervasively mocks areused in consumer tests.Repairing tests after doing the refactorings has eaten a large chunk ofthe development time. It is in large part due to how configuration ispassed between modules dynamically, with ad-hoc Nimble schemas, defaultvalues being set in multiple places, etc. We should really centralizestack-wide config at some point which is now possible to introducegradually with `Electric.StackConfig`.### Note on the use of ETS for `Electric.StackConfig`My original goal was to persist the shared configuration into`:persistent_term` because it's very fast for reads and doesn't copydata into process heaps. But there's a dealbreaker in the way`:persistent_term` works: modifying or deleting a value stored in itresults in the BEAM VM scheduling a heap scanning + GC pass for _allrunning processes_.### Mem footprint comparisonsFollowing are comparisons done in different environments.First, a **comparison of OS stats** when running Electric locally usingthe `main` branch vs this PR.<img width="2560" height="3072"alt="main_shared_config_htop_custom_storage_cmp"src="https://github.com/user-attachments/assets/a8398402-ac96-449e-9a2b-4f17db4bd718"/>Based on these numbers, the difference appears to be small. I'mattributing it to the fact that BEAM retains memory it has previouslyrequested from the OS and doesn't immediately release it. Take a look atthe **memory usage numbers reported by the BEAM VM** itself below(screenshots are from Erlang's Observer tool):<img width="1260" height="1438"alt="main_shared_config_sys_stats_custom_storage_cmp"src="https://github.com/user-attachments/assets/27c65fdf-f968-419b-a58d-cd72d96a1a23"/>Here the difference is more pronounced.Finally, a **comparison of OTEL metrics gathered in staging env** basedon charts that show memory footprint in relation to the total number ofshapes, within a 1.5-hour time window.[`main`branch](https://ui.honeycomb.io/electric-sql-06/environments/pr-846/datasets/electric-region/board-query/cDij6tGm74Z/result/xbb766Tqwxb?vs=hideCompare&cs_2=omitMissingValues):<img width="2343" height="863" alt="main_staging_mem"src="https://github.com/user-attachments/assets/75625c8b-9f89-42a3-9a57-a690eff82e86"/>[PRbranch](https://ui.honeycomb.io/electric-sql-06/environments/pr-846/datasets/electric-region/board-query/cDij6tGm74Z/result/Dd69w6wUvdu?vs=hideCompare&cs_2=omitMissingValues):<img width="2343" height="863" alt="shared_config_staging_mem"src="https://github.com/user-attachments/assets/29499523-4f4e-4c79-9adf-97667ae5cae9"/>Key points:- at 65K shapes `main` shows combined process heap at 2.32GB and the PRbranch is at 1.81GB- when shape expiry eventually brings the number of shapes down to 60K,`main` sits at 2.1GB whereas the PR branch is at 1.73GB.### Takeaways- In an architecture where a large number of user-defined resources(shapes) are apped to Elixir processes, we need to be very precise aboutwhat to keep in the process state.- Shared configuration should be centralized as much as possible asopposed to duplicating it in tens of thousands of processes.- There's also the cost of copying the same terms from one memorylocation to another every time a new process is started that uses thatterm.- The configuration options passed to supervisor children are also keptin the supervisor process' memory as part of child specs. This isespecially noticeable in the case of DynamicConsumerSupervisor, whichends up using 200-300MB just to keep shared config and shapes that arealready stored in ETS tables. This is addressed in#3230.- The next big improvement in memory efficiency is going to be found intransaction processing and ShapeLogCollector. We need to investigate theimpact of streaming transaction data and decoding only the few columnsfrom each successive row that are needed for shape index look up first,before we decode the whole transaction received from Postgres intomemory. The rows themselves can be more efficiently written into shapelogs without fully materializing them in memory and passing around asmessages. The ShapeLogCollector's index could be eventually moved todisk, with an in-memory cache for highly active shapes.
Base automatically changed fromalco/config-slimdown tomainOctober 9, 2025 10:24
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment

Reviewers

No reviews

Assignees

@magnetisedmagnetised

Labels

None yet

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

2 participants

@alco@magnetised

[8]ページ先頭

©2009-2025 Movatter.jp