Skip to content

Commit 9ebbaf0

Browse files
committed
🐛 Batch sibling propagation queries to avoid exceeding RavenDB session request limit
When updating user indicators on 30+ endpoints where names match by EndpointId.Name rather than SanitizedName, the previous code issued one extra DB query per endpoint inside the loop to propagate the indicator to sibling documents sharing the same SanitizedName. This exceeded RavenDB's default limit of 30 requests per session. Replace the per-iteration inner queries with a single batched query after the loop, capping total session requests at 2 regardless of input size.
1 parent ab18476 commit 9ebbaf0

2 files changed

Lines changed: 53 additions & 6 deletions

File tree

src/ServiceControl.Persistence.RavenDB/Throughput/LicensingDataStore.cs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,10 @@ public async Task UpdateUserIndicatorOnEndpoints(List<UpdateUserIndicator> userI
190190
.Where(document => document.SanitizedName.In(updates.Keys) || document.EndpointId.Name.In(updates.Keys));
191191

192192
var documents = await query.ToListAsync(cancellationToken);
193+
194+
// Collect sanitized names needing sibling propagation to avoid issuing a query per document in the loop below.
195+
var sanitizedNameToUserIndicator = new Dictionary<string, string>();
196+
193197
foreach (var document in documents)
194198
{
195199
if (updates.TryGetValue(document.SanitizedName, out var newValueFromSanitizedName))
@@ -199,14 +203,25 @@ public async Task UpdateUserIndicatorOnEndpoints(List<UpdateUserIndicator> userI
199203
else if (updates.TryGetValue(document.EndpointId.Name, out var newValueFromEndpoint))
200204
{
201205
document.UserIndicator = newValueFromEndpoint;
202-
//update all that match this sanitized name
203-
var sanitizedMatchingQuery = session.Query<EndpointDocument>()
204-
.Where(sanitizedDocument => sanitizedDocument.SanitizedName == document.SanitizedName && sanitizedDocument.EndpointId.Name != document.EndpointId.Name);
205-
var sanitizedMatchingDocuments = await sanitizedMatchingQuery.ToListAsync(cancellationToken);
206+
sanitizedNameToUserIndicator[document.SanitizedName] = newValueFromEndpoint;
207+
}
208+
}
206209

207-
foreach (var matchingDocumentOnSanitizedName in sanitizedMatchingDocuments)
210+
if (sanitizedNameToUserIndicator.Count > 0)
211+
{
212+
// One batched query for all sibling documents, instead of one query per document.
213+
var sanitizedNames = sanitizedNameToUserIndicator.Keys.ToList();
214+
var alreadyLoadedIds = documents.Select(d => d.Id).ToHashSet();
215+
216+
var siblingDocuments = await session.Query<EndpointDocument>()
217+
.Where(d => d.SanitizedName.In(sanitizedNames))
218+
.ToListAsync(cancellationToken);
219+
220+
foreach (var sibling in siblingDocuments.Where(d => !alreadyLoadedIds.Contains(d.Id)))
221+
{
222+
if (sanitizedNameToUserIndicator.TryGetValue(sibling.SanitizedName, out var indicator))
208223
{
209-
matchingDocumentOnSanitizedName.UserIndicator = newValueFromEndpoint;
224+
sibling.UserIndicator = indicator;
210225
}
211226
}
212227
}

src/ServiceControl.Persistence.Tests/Throughput/EndpointsTests.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,38 @@ public async Task Should_update_indicators_on_all_endpoint_sources_when_updated_
199199
Assert.That(foundEndpointMonitoring.UserIndicator, Is.EqualTo(userIndicator));
200200
}
201201

202+
[Test]
203+
public async Task Should_update_user_indicators_on_more_than_30_endpoints_without_hitting_session_request_limit()
204+
{
205+
// Arrange
206+
// Each pair shares a sanitized name but has different raw names.
207+
// Updating by raw name (not sanitized name) triggers a sibling propagation query.
208+
// In the original code, that was one DB query per endpoint, exceeding RavenDB's
209+
// default limit of 30 requests per session when 30+ endpoints are updated at once.
210+
const int endpointCount = 30;
211+
var userIndicator = "someIndicator";
212+
213+
for (var i = 0; i < endpointCount; i++)
214+
{
215+
var sanitizedName = $"Endpoint{i}";
216+
await LicensingDataStore.SaveEndpoint(new Endpoint(sanitizedName, ThroughputSource.Audit) { SanitizedName = sanitizedName }, default);
217+
await LicensingDataStore.SaveEndpoint(new Endpoint($"schema.{sanitizedName}", ThroughputSource.Monitoring) { SanitizedName = sanitizedName }, default);
218+
}
219+
220+
var updates = Enumerable.Range(0, endpointCount)
221+
.Select(i => new UpdateUserIndicator { Name = $"schema.Endpoint{i}", UserIndicator = userIndicator })
222+
.ToList();
223+
224+
// Act - must not throw InvalidOperationException due to exceeding session request limit
225+
await LicensingDataStore.UpdateUserIndicatorOnEndpoints(updates, default);
226+
227+
// Assert
228+
var allEndpoints = (await LicensingDataStore.GetAllEndpoints(true, default)).ToList();
229+
230+
Assert.That(allEndpoints, Has.Count.EqualTo(endpointCount * 2));
231+
Assert.That(allEndpoints, Has.All.Matches<Endpoint>(e => e.UserIndicator == userIndicator));
232+
}
233+
202234
[TestCase(10, 5, false)]
203235
[TestCase(10, 20, true)]
204236
public async Task Should_correctly_report_throughput_existence_for_X_days(int daysSinceLastThroughputEntry, int timeFrameToCheck, bool expectedValue)

0 commit comments

Comments
 (0)