This PR is a proposal to add an API to do batched message publish on the server side (InjectApplicationMessages). It’s the only way I can get my benchmark working with my “digital twin” synchronization library.
Am I doing something fundamentally wrong or would you consider merging this proposal?
I think we should try to simplify the code (e.g. single-methood calls into batch method) and add tests before merge. Also the other optimizations (subscription cache, promise cache) are nice to have but probably not game changing (can do more benchmarks without).
The MQTT PR where the only change is using single vs batched API can be found here:
https://github.com/RicoSuter/Namotion.Interceptor/pull/108/files#diff-192178ef0628084daa8939a2268ae10733b0426da036edfb4e9d4048199fd8e3R206
Benchmarks:
- Client: 40k topic subscriptions
- Client: Publishes 20k messages/s to server
- Server: Publishes 20k messages/s to client
- Which leads to 1.2 mio messages/minute per direction
(Tested on Mac Book Pro Max M4)
- The non-batched version (see below) has no chance keeping up (reaches 400k of 1.2 mio messages, one side eventually disconnects/dies):
- Server has extreme memory allocations/s
- Client only receives 300k of the 1.2 mio messages
- Extreme high latency (>10s)
- Essentially without this optimization the scenario is not feasable
With batch APIs:
// Local project reference version with this PR:if(messageCount>0){awaitserver.InjectApplicationMessages(newArraySegment<InjectedMqttApplicationMessage>(messages,0,messageCount),cancellationToken).ConfigureAwait(false);}===========================================================================================================================================Client Benchmark - 1 minute - [2025-11-24 21:48:18.257]Total processed changes: 1200201Process memory: 359.14 MB (204.26 MB in .NET heap)Avg allocations over last 60s: 149.06 MB/sMetric Avg P50 P90 P95 P99 P99.9 Max Min StdDev Count-------------------------------------------------------------------------------------------------------------------------------------------Modifications (changes/s) 20042.92 20135.90 20606.66 20730.81 20946.96 20946.96 20946.96 19142.20 442.52 -Processing latency (ms) 0.00 0.00 0.00 0.01 0.01 0.03 99.82 0.00 0.12 1200201End-to-end latency (ms) 8.97 8.43 14.63 17.12 24.13 74.26 112.07 0.15 5.60 1200201===========================================================================================================================================Server Benchmark - 1 minute - [2025-11-24 21:47:57.869]Total processed changes: 1199212Process memory: 344.98 MB (198.41 MB in .NET heap)Avg allocations over last 60s: 160.45 MB/sMetric Avg P50 P90 P95 P99 P99.9 Max Min StdDev Count-------------------------------------------------------------------------------------------------------------------------------------------Modifications (changes/s) 19924.19 19980.00 20714.90 20941.27 23884.76 23884.76 23884.76 15138.86 1067.25 -Processing latency (ms) 0.00 0.00 0.00 0.01 0.01 0.03 15.38 -0.00 0.08 1199212End-to-end latency (ms) 12.56 10.89 21.59 25.75 36.40 63.13 76.25 0.23 7.03 1199212
Baseline (current NuGet version with non-batch APIs):
// Current version on NuGet:for(vari=0;i<messageCount;i++){awaitserver.InjectApplicationMessage(messages[i],cancellationToken).ConfigureAwait(false);}===========================================================================================================================================Client Benchmark - 1 minute - [2025-11-24 21:32:12.267]Total processed changes: 269787Process memory: 308.7 MB (163.82 MB in .NET heap)Avg allocations over last 60s: 62.83 MB/sMetric Avg P50 P90 P95 P99 P99.9 Max Min StdDev Count-------------------------------------------------------------------------------------------------------------------------------------------Modifications (changes/s) 4506.57 4560.50 5579.78 5631.68 5791.27 5791.27 5791.27 2239.58 707.81 -Processing latency (ms) 0.01 0.00 0.01 0.01 0.02 0.20 17.76 0.00 0.13 269787End-to-end latency (ms) 60062.48 69404.52 117908.58 126513.16 133555.64 135001.25 135205.29 981.69 44304.32 269787===========================================================================================================================================Server Benchmark - 1 minute - [2025-11-24 21:32:00.559]Total processed changes: 172518Process memory: 1828.53 MB (431.89 MB in .NET heap)Avg allocations over last 61s: 1569.19 MB/sMetric Avg P50 P90 P95 P99 P99.9 Max Min StdDev Count-------------------------------------------------------------------------------------------------------------------------------------------Modifications (changes/s) 2860.22 2944.60 3419.65 3471.32 3659.01 3659.01 3659.01 1231.97 465.98 -Processing latency (ms) 0.03 0.00 0.01 0.02 1.77 2.38 11.27 0.00 0.29 172518End-to-end latency (ms) 78845.78 78978.60 112728.62 118324.46 122899.01 123869.42 123995.07 34021.92 23549.03 172518
This benchmark can easily be tried out:
Clone the PR from the other repo, and switch to project references in Namotion.Interceptor.Mqtt:
RicoSuter/Namotion.Interceptor#108
Uh oh!
There was an error while loading.Please reload this page.
This PR is a proposal to add an API to do batched message publish on the server side (
InjectApplicationMessages). It’s the only way I can get my benchmark working with my “digital twin” synchronization library.Am I doing something fundamentally wrong or would you consider merging this proposal?
I think we should try to simplify the code (e.g. single-methood calls into batch method) and add tests before merge. Also the other optimizations (subscription cache, promise cache) are nice to have but probably not game changing (can do more benchmarks without).
The MQTT PR where the only change is using single vs batched API can be found here:
https://github.com/RicoSuter/Namotion.Interceptor/pull/108/files#diff-192178ef0628084daa8939a2268ae10733b0426da036edfb4e9d4048199fd8e3R206
Benchmarks:
(Tested on Mac Book Pro Max M4)
With batch APIs:
Baseline (current NuGet version with non-batch APIs):
This benchmark can easily be tried out:
Clone the PR from the other repo, and switch to project references in Namotion.Interceptor.Mqtt:
RicoSuter/Namotion.Interceptor#108