Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ public interface ISearchParameterOperations
Task UpdateSearchParameterAsync(ITypedElement searchParam, RawResource previousSearchParam, CancellationToken cancellationToken);

/// <summary>
/// This method should be called periodically to get any updates to SearchParameters
/// added to the DB by other service instances.
/// It should also be called when a user starts a reindex job
/// This method should be called periodically to get any updates to SearchParameters added to the DB by other service instances.
/// </summary>
/// <param name="cancellationToken">Cancellation token</param>
/// <param name="forceFullRefresh">When true, forces a full refresh from database instead of incremental updates</param>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,14 @@ await SearchParameterConcurrencyManager.ExecuteWithLockAsync(
}

/// <summary>
/// This method should be called periodically to get any updates to SearchParameters
/// added to the DB by other service instances.
/// It should also be called when a user starts a reindex job
/// This method should be called periodically to get any updates to SearchParameters added to the DB by other service instances.
/// </summary>
/// <param name="cancellationToken">Cancellation token</param>
/// <param name="forceFullRefresh">When true, forces a full refresh from database instead of incremental updates</param>
/// <returns>A task.</returns>
public async Task GetAndApplySearchParameterUpdates(CancellationToken cancellationToken = default, bool forceFullRefresh = false)
{
var st = DateTime.UtcNow;
var results = await _searchParameterStatusManager.GetSearchParameterStatusUpdates(cancellationToken, forceFullRefresh ? null : _searchParamLastUpdated);
var statuses = results.Statuses;

Expand Down Expand Up @@ -377,6 +376,10 @@ public async Task GetAndApplySearchParameterUpdates(CancellationToken cancellati
{
_searchParamLastUpdated = results.LastUpdated.Value;
}

var msg = $"Cache in sync={inCache && allHaveResources} Processed params={statuses.Count} SearchParamLastUpdated={_searchParamLastUpdated.Value.ToString("yyyy-MM-dd HH:mm:ss.fff")}";
_logger.LogInformation($"GetAndApplySearchParameterUpdates: {msg}");
await _searchParameterStatusManager.TryLogEvent("GetAndApplySearchParameterUpdates", "Warn", msg, st, cancellationToken);
}

// This should handle racing condition between saving new parameter on one VM and refreshing cache on the other,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public SearchParameterStatusManager(
_logger = logger;
}

internal async Task TryLogEvent(string process, string status, string text, DateTime? startDate, CancellationToken cancellationToken)
{
await _searchParameterStatusDataStore.TryLogEvent(process, status, text, startDate, cancellationToken);
}

internal async Task EnsureInitializedAsync(CancellationToken cancellationToken)
{
var updated = new List<SearchParameterInfo>();
Expand Down
2 changes: 1 addition & 1 deletion src/Microsoft.Health.Fhir.Shared.Client/FhirClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ public async Task<FhirResponse<Bundle>> PostBundleAsync(Resource bundle, FhirBun
return await CreateResponseAsync<Bundle>(response);
}

public async Task<(FhirResponse<Parameters> reponse, Uri uri)> PostReindexJobAsync(
public async Task<(FhirResponse<Parameters> Response, Uri Uri)> PostReindexJobAsync(
Parameters parameters,
string uniqueResource = null,
CancellationToken cancellationToken = default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
using Microsoft.Health.Fhir.Tests.Common;
using Microsoft.Health.Fhir.Tests.Common.FixtureParameters;
using Microsoft.Health.Test.Utilities;
using Newtonsoft.Json;
using Xunit;
using static Hl7.Fhir.Model.Bundle;
using Task = System.Threading.Tasks.Task;

namespace Microsoft.Health.Fhir.Tests.E2E.Rest.Reindex
Expand All @@ -38,14 +40,110 @@ public ReindexTests(HttpIntegrationTestFixture fixture)
_isSql = _fixture.DataStore == DataStore.SqlServer;
}

[Fact]
public async Task Given500SearchParams_WhenReindexCompletes_ThenSearchParamsAreEnabled()
{
await CancelAnyRunningReindexJobsAsync();

const int numberOfSearchParams = 10; // increase to 500 when cache is not updated by API calls and status is saved with resources in a single SQL transaction
const string urlPrefix = "http://my.org/";
var codes = new List<string>();
try
{
for (var i = 0; i < numberOfSearchParams; i++)
{
var code = $"c-id-{i}";
codes.Add(code);
}

var bundle = await CreatePersonSearchParamsAsync();
Assert.Equal(numberOfSearchParams, bundle.Entry.Count);
foreach (var entry in bundle.Entry)
{
Assert.True(entry.Resource as SearchParameter != null, $"actual={JsonConvert.SerializeObject(entry)}");
}

// check by urls
var response = await _fixture.TestFhirClient.SearchAsync($"SearchParameter?_summary=count&url={string.Join(",", codes.Select(_ => $"{urlPrefix}{_}"))}");
Assert.True(response.Resource.Total == numberOfSearchParams, $"Urls expected={numberOfSearchParams} actual={response.Resource.Total}");

var value = await _fixture.TestFhirClient.PostReindexJobAsync(new Parameters { Parameter = [] });
Assert.Equal(HttpStatusCode.Created, value.Response.Response.StatusCode);

await WaitForJobCompletionAsync(value.Uri, TimeSpan.FromSeconds(300));

await Parallel.ForEachAsync(codes, new ParallelOptions { MaxDegreeOfParallelism = 8 }, async (code, cancel) =>
{
await VerifySearchParameterIsEnabledAsync($"Person?{code}=test", code);
});
}
finally
{
await DeletePersonSearchParamsAsync();
}

async Task<Bundle> CreatePersonSearchParamsAsync()
{
var bundle = new Bundle { Type = Bundle.BundleType.Batch, Entry = new List<EntryComponent>() };

#if R5
var resourceTypes = new List<VersionIndependentResourceTypesAll?>();
resourceTypes.Add(Enum.Parse<VersionIndependentResourceTypesAll>("Person"));
#else
var resourceTypes = new List<ResourceType?>();
resourceTypes.Add(Enum.Parse<ResourceType>("Person"));
#endif

foreach (var code in codes)
{
var searchParam = new SearchParameter
{
Id = code,
Url = $"{urlPrefix}{code}",
Name = code,
Code = code,
Status = PublicationStatus.Active,
Type = SearchParamType.Token,
Expression = "Person.id",
Description = "any",
Base = resourceTypes,
};

bundle.Entry.Add(new EntryComponent { Request = new RequestComponent { Method = Bundle.HTTPVerb.PUT, Url = $"SearchParameter/{code}" }, Resource = searchParam });
}

var result = await _fixture.TestFhirClient.PostBundleAsync(bundle, new FhirBundleOptions { BundleProcessingLogic = FhirBundleProcessingLogic.Parallel });
return result;
}

async Task VerifySearchParameterIsEnabledAsync(string searchQuery, string searchParameterCode)
{
var response = await _fixture.TestFhirClient.SearchAsync(searchQuery);
Assert.NotNull(response);
var error = HasNotSupportedError(response.Resource);
Assert.False(error, $"Search param {searchParameterCode} is NOT supported after reindex.");
}

async Task DeletePersonSearchParamsAsync()
{
var bundle = new Bundle { Type = Bundle.BundleType.Batch, Entry = new List<EntryComponent>() };

foreach (var code in codes)
{
bundle.Entry.Add(new EntryComponent { Request = new RequestComponent { Method = Bundle.HTTPVerb.DELETE, Url = $"SearchParameter/{code}" } });
}

await _fixture.TestFhirClient.PostBundleAsync(bundle, new FhirBundleOptions { BundleProcessingLogic = FhirBundleProcessingLogic.Parallel });
}
}

[Fact]
public async Task GivenReindexJobWithConcurrentUpdates_ThenReportedCountsAreLessThanOriginal()
{
await CancelAnyRunningReindexJobsAsync();

var searchParam = new SearchParameter();
var testResources = new List<(string resourceType, string resourceId)>();
(FhirResponse<Parameters> response, Uri jobUri) value = default;

try
{
Expand All @@ -64,18 +162,18 @@ public async Task GivenReindexJobWithConcurrentUpdates_ThenReportedCountsAreLess
],
};

value = await _fixture.TestFhirClient.PostReindexJobAsync(parameters);
Assert.Equal(HttpStatusCode.Created, value.response.Response.StatusCode);
var value = await _fixture.TestFhirClient.PostReindexJobAsync(parameters);
Assert.Equal(HttpStatusCode.Created, value.Response.Response.StatusCode);

var tasks = new[]
{
WaitForJobCompletionAsync(value.jobUri, TimeSpan.FromSeconds(300)),
WaitForJobCompletionAsync(value.Uri, TimeSpan.FromSeconds(300)),
RandomPersonUpdate(testResources),
};
await Task.WhenAll(tasks);

// reported in reindex counts should be less than total resources created
await CheckReportedCounts(value.jobUri, testResources.Count, true);
await CheckReportedCounts(value.Uri, testResources.Count, true);
}
finally
{
Expand All @@ -99,7 +197,6 @@ public async Task GivenReindexJobWithMixedZeroAndNonZeroCountResources_WhenReind
var testResources = new List<(string resourceType, string resourceId)>();
var supplyDeliveryCount = 40 * storageMultiplier;
var personCount = 20 * storageMultiplier;
(FhirResponse<Parameters> response, Uri jobUri) value = default;

try
{
Expand Down Expand Up @@ -148,13 +245,13 @@ public async Task GivenReindexJobWithMixedZeroAndNonZeroCountResources_WhenReind
},
};

value = await _fixture.TestFhirClient.PostReindexJobAsync(parameters);
var value = await _fixture.TestFhirClient.PostReindexJobAsync(parameters);

Assert.Equal(HttpStatusCode.Created, value.response.Response.StatusCode);
Assert.NotNull(value.jobUri);
Assert.Equal(HttpStatusCode.Created, value.Response.Response.StatusCode);
Assert.NotNull(value.Uri);

// Wait for job to complete (this will wait for all sub-jobs to complete)
var jobStatus = await WaitForJobCompletionAsync(value.jobUri, TimeSpan.FromSeconds(300));
var jobStatus = await WaitForJobCompletionAsync(value.Uri, TimeSpan.FromSeconds(300));
Assert.True(
jobStatus == OperationStatus.Completed,
$"Expected Completed, got {jobStatus}");
Expand All @@ -164,7 +261,7 @@ public async Task GivenReindexJobWithMixedZeroAndNonZeroCountResources_WhenReind
await SearchCreatedResources("SupplyDelivery", supplyDeliveryCount);

// check what reindex job reported
await CheckReportedCounts(value.jobUri, testResources.Count, false);
await CheckReportedCounts(value.Uri, testResources.Count, false);

// Verify search parameter is working for SupplyDelivery (which has data)
// Use the ACTUAL count we got, not the desired count
Expand Down Expand Up @@ -645,27 +742,6 @@ private Person CreatePersonResource(string id, string name)
}
}

/// <summary>
/// Helper method to create and post a resource in a single operation
/// </summary>
private async Task<T> CreateAndPostResourceAsync<T>(string id, string name, Func<string, string, Task<T>> createResourceFunc)
where T : Resource
{
var resource = await createResourceFunc(id, name);

try
{
// Post the resource using the client's CreateAsync method
var response = await _fixture.TestFhirClient.CreateAsync(resource);
return response;
}
catch (Exception ex)
{
System.Diagnostics.Debug.WriteLine($"Failed to create resource {id}: {ex.Message}");
return resource; // Return the original resource even on failure so ID can be tracked
}
}

private async Task<SearchParameter> CreateCustomSearchParameterAsync(string code, string[] baseResourceTypes, string expression, SearchParamType searchParamType = SearchParamType.String)
{
#if R5
Expand Down
Loading