Skip to content

[BLOCKED/WIP] performance: Add batch InjectApplicationMessages to support high throughput#2222

Open
RicoSuter wants to merge 1 commit intodotnet:masterfrom
RicoSuter:feature/batch-apis
Open

[BLOCKED/WIP] performance: Add batch InjectApplicationMessages to support high throughput#2222
RicoSuter wants to merge 1 commit intodotnet:masterfrom
RicoSuter:feature/batch-apis

Conversation

@RicoSuter
Copy link
Copy Markdown

@RicoSuter RicoSuter commented Nov 24, 2025

This PR is a proposal to add an API to do batched message publish on the server side (InjectApplicationMessages). It’s the only way I can get my benchmark working with my “digital twin” synchronization library.

Am I doing something fundamentally wrong or would you consider merging this proposal?

I think we should try to simplify the code (e.g. single-methood calls into batch method) and add tests before merge. Also the other optimizations (subscription cache, promise cache) are nice to have but probably not game changing (can do more benchmarks without).

The MQTT PR where the only change is using single vs batched API can be found here:
https://github.com/RicoSuter/Namotion.Interceptor/pull/108/files#diff-192178ef0628084daa8939a2268ae10733b0426da036edfb4e9d4048199fd8e3R206

Benchmarks:

  • Client: 40k topic subscriptions
  • Client: Publishes 20k messages/s to server
  • Server: Publishes 20k messages/s to client
  • Which leads to 1.2 mio messages/minute per direction

(Tested on Mac Book Pro Max M4)

  • The non-batched version (see below) has no chance keeping up (reaches 400k of 1.2 mio messages, one side eventually disconnects/dies):
    • Server has extreme memory allocations/s
    • Client only receives 300k of the 1.2 mio messages
    • Extreme high latency (>10s)
    • Essentially without this optimization the scenario is not feasable

With batch APIs:

// Local project reference version with this PR:
if (messageCount > 0)
{
    await server.InjectApplicationMessages(
        new ArraySegment<InjectedMqttApplicationMessage>(messages, 0, messageCount),
        cancellationToken).ConfigureAwait(false);
}
===========================================================================================================================================
Client Benchmark - 1 minute - [2025-11-24 21:48:18.257]

Total processed changes:         1200201
Process memory:                  359.14 MB (204.26 MB in .NET heap)
Avg allocations over last 60s:   149.06 MB/s

Metric                               Avg        P50        P90        P95        P99      P99.9        Max        Min     StdDev      Count
-------------------------------------------------------------------------------------------------------------------------------------------
Modifications (changes/s)       20042.92   20135.90   20606.66   20730.81   20946.96   20946.96   20946.96   19142.20     442.52          -
Processing latency (ms)             0.00       0.00       0.00       0.01       0.01       0.03      99.82       0.00       0.12    1200201
End-to-end latency (ms)             8.97       8.43      14.63      17.12      24.13      74.26     112.07       0.15       5.60    1200201

===========================================================================================================================================
Server Benchmark - 1 minute - [2025-11-24 21:47:57.869]

Total processed changes:         1199212
Process memory:                  344.98 MB (198.41 MB in .NET heap)
Avg allocations over last 60s:   160.45 MB/s

Metric                               Avg        P50        P90        P95        P99      P99.9        Max        Min     StdDev      Count
-------------------------------------------------------------------------------------------------------------------------------------------
Modifications (changes/s)       19924.19   19980.00   20714.90   20941.27   23884.76   23884.76   23884.76   15138.86    1067.25          -
Processing latency (ms)             0.00       0.00       0.00       0.01       0.01       0.03      15.38      -0.00       0.08    1199212
End-to-end latency (ms)            12.56      10.89      21.59      25.75      36.40      63.13      76.25       0.23       7.03    1199212

Baseline (current NuGet version with non-batch APIs):

// Current version on NuGet:
for (var i = 0; i < messageCount; i++)
{
    await server.InjectApplicationMessage(messages[i], cancellationToken).ConfigureAwait(false);
}
===========================================================================================================================================
Client Benchmark - 1 minute - [2025-11-24 21:32:12.267]

Total processed changes:         269787
Process memory:                  308.7 MB (163.82 MB in .NET heap)
Avg allocations over last 60s:   62.83 MB/s

Metric                               Avg        P50        P90        P95        P99      P99.9        Max        Min     StdDev      Count
-------------------------------------------------------------------------------------------------------------------------------------------
Modifications (changes/s)        4506.57    4560.50    5579.78    5631.68    5791.27    5791.27    5791.27    2239.58     707.81          -
Processing latency (ms)             0.01       0.00       0.01       0.01       0.02       0.20      17.76       0.00       0.13     269787
End-to-end latency (ms)         60062.48   69404.52  117908.58  126513.16  133555.64  135001.25  135205.29     981.69   44304.32     269787

===========================================================================================================================================
Server Benchmark - 1 minute - [2025-11-24 21:32:00.559]

Total processed changes:         172518
Process memory:                  1828.53 MB (431.89 MB in .NET heap)
Avg allocations over last 61s:   1569.19 MB/s

Metric                               Avg        P50        P90        P95        P99      P99.9        Max        Min     StdDev      Count
-------------------------------------------------------------------------------------------------------------------------------------------
Modifications (changes/s)        2860.22    2944.60    3419.65    3471.32    3659.01    3659.01    3659.01    1231.97     465.98          -
Processing latency (ms)             0.03       0.00       0.01       0.02       1.77       2.38      11.27       0.00       0.29     172518
End-to-end latency (ms)         78845.78   78978.60  112728.62  118324.46  122899.01  123869.42  123995.07   34021.92   23549.03     172518

This benchmark can easily be tried out:
Clone the PR from the other repo, and switch to project references in Namotion.Interceptor.Mqtt:
RicoSuter/Namotion.Interceptor#108

@RicoSuter
Copy link
Copy Markdown
Author

@dotnet-policy-service agree

public Task<MqttPacket> WaitAsync()
{
// Lazy initialization - only allocate when actually needed
_promise ??= new AsyncTaskCompletionSource<MqttPacket>();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we may need to lock here? This method is exposed externally so that it might happen that the promise gets allocated twice. Probably a Lazy<> will help here. But I don't know if the Lazy will introduce new overhead which mitigates the initial optimization.

@chkr1011
Copy link
Copy Markdown
Collaborator

chkr1011 commented Feb 7, 2026

I had a look at your changes. The idea behind this sounds reasonable. But It still need to investigate a little more because several different optimizations are made. For example the fact that the QoS 0 packet is reused makes sense to me but I wonder if the injection of multiple application messages is an important strategy to apply.

@RicoSuter RicoSuter changed the title performance: Add batch InjectApplicationMessages to support high throughput [BLOCKED/WIP] performance: Add batch InjectApplicationMessages to support high throughput Feb 16, 2026
@RicoSuter
Copy link
Copy Markdown
Author

RicoSuter commented Feb 16, 2026

@chkr1011 thanks for having a look... this PR is just a part of bigger investigation I'm doing for high-throughput, I'm still working on a PR but I think it's too big to be accepted here, you can have a look here: RicoSuter#1

Need to split it up into smaller chunks so it is acceptable here...

first PR is a no-brainer fix: #2235

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants