Skip to content

Commit 221af03

Browse files
authored
Merge pull request #5392 from Particular/batching-fix-6.13
2 parents ab18476 + 9ebbaf0 commit 221af03

2 files changed

Lines changed: 53 additions & 6 deletions

File tree

src/ServiceControl.Persistence.RavenDB/Throughput/LicensingDataStore.cs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,10 @@ public async Task UpdateUserIndicatorOnEndpoints(List<UpdateUserIndicator> userI
190190
.Where(document => document.SanitizedName.In(updates.Keys) || document.EndpointId.Name.In(updates.Keys));
191191

192192
var documents = await query.ToListAsync(cancellationToken);
193+
194+
// Collect sanitized names needing sibling propagation to avoid issuing a query per document in the loop below.
195+
var sanitizedNameToUserIndicator = new Dictionary<string, string>();
196+
193197
foreach (var document in documents)
194198
{
195199
if (updates.TryGetValue(document.SanitizedName, out var newValueFromSanitizedName))
@@ -199,14 +203,25 @@ public async Task UpdateUserIndicatorOnEndpoints(List<UpdateUserIndicator> userI
199203
else if (updates.TryGetValue(document.EndpointId.Name, out var newValueFromEndpoint))
200204
{
201205
document.UserIndicator = newValueFromEndpoint;
202-
//update all that match this sanitized name
203-
var sanitizedMatchingQuery = session.Query<EndpointDocument>()
204-
.Where(sanitizedDocument => sanitizedDocument.SanitizedName == document.SanitizedName && sanitizedDocument.EndpointId.Name != document.EndpointId.Name);
205-
var sanitizedMatchingDocuments = await sanitizedMatchingQuery.ToListAsync(cancellationToken);
206+
sanitizedNameToUserIndicator[document.SanitizedName] = newValueFromEndpoint;
207+
}
208+
}
206209

207-
foreach (var matchingDocumentOnSanitizedName in sanitizedMatchingDocuments)
210+
if (sanitizedNameToUserIndicator.Count > 0)
211+
{
212+
// One batched query for all sibling documents, instead of one query per document.
213+
var sanitizedNames = sanitizedNameToUserIndicator.Keys.ToList();
214+
var alreadyLoadedIds = documents.Select(d => d.Id).ToHashSet();
215+
216+
var siblingDocuments = await session.Query<EndpointDocument>()
217+
.Where(d => d.SanitizedName.In(sanitizedNames))
218+
.ToListAsync(cancellationToken);
219+
220+
foreach (var sibling in siblingDocuments.Where(d => !alreadyLoadedIds.Contains(d.Id)))
221+
{
222+
if (sanitizedNameToUserIndicator.TryGetValue(sibling.SanitizedName, out var indicator))
208223
{
209-
matchingDocumentOnSanitizedName.UserIndicator = newValueFromEndpoint;
224+
sibling.UserIndicator = indicator;
210225
}
211226
}
212227
}

src/ServiceControl.Persistence.Tests/Throughput/EndpointsTests.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,38 @@ public async Task Should_update_indicators_on_all_endpoint_sources_when_updated_
199199
Assert.That(foundEndpointMonitoring.UserIndicator, Is.EqualTo(userIndicator));
200200
}
201201

202+
[Test]
203+
public async Task Should_update_user_indicators_on_more_than_30_endpoints_without_hitting_session_request_limit()
204+
{
205+
// Arrange
206+
// Each pair shares a sanitized name but has different raw names.
207+
// Updating by raw name (not sanitized name) triggers a sibling propagation query.
208+
// In the original code, that was one DB query per endpoint, exceeding RavenDB's
209+
// default limit of 30 requests per session when 30+ endpoints are updated at once.
210+
const int endpointCount = 30;
211+
var userIndicator = "someIndicator";
212+
213+
for (var i = 0; i < endpointCount; i++)
214+
{
215+
var sanitizedName = $"Endpoint{i}";
216+
await LicensingDataStore.SaveEndpoint(new Endpoint(sanitizedName, ThroughputSource.Audit) { SanitizedName = sanitizedName }, default);
217+
await LicensingDataStore.SaveEndpoint(new Endpoint($"schema.{sanitizedName}", ThroughputSource.Monitoring) { SanitizedName = sanitizedName }, default);
218+
}
219+
220+
var updates = Enumerable.Range(0, endpointCount)
221+
.Select(i => new UpdateUserIndicator { Name = $"schema.Endpoint{i}", UserIndicator = userIndicator })
222+
.ToList();
223+
224+
// Act - must not throw InvalidOperationException due to exceeding session request limit
225+
await LicensingDataStore.UpdateUserIndicatorOnEndpoints(updates, default);
226+
227+
// Assert
228+
var allEndpoints = (await LicensingDataStore.GetAllEndpoints(true, default)).ToList();
229+
230+
Assert.That(allEndpoints, Has.Count.EqualTo(endpointCount * 2));
231+
Assert.That(allEndpoints, Has.All.Matches<Endpoint>(e => e.UserIndicator == userIndicator));
232+
}
233+
202234
[TestCase(10, 5, false)]
203235
[TestCase(10, 20, true)]
204236
public async Task Should_correctly_report_throughput_existence_for_X_days(int daysSinceLastThroughputEntry, int timeFrameToCheck, bool expectedValue)

0 commit comments

Comments
 (0)