[test] Fix 30+ flaky tests across multiple modules#2696
[test] Fix 30+ flaky tests across multiple modules#2696sushantmane wants to merge 1 commit intolinkedin:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR targets test stability by eliminating common sources of nondeterminism across the Venice codebase (unit + integration tests), primarily around async metric computation, cross-thread visibility, timing assumptions, and cluster resource sizing.
Changes:
- Replace ad-hoc
new MetricsRepository()usage withMetricsRepositoryUtilssingle-threaded configs and add waits where async gauges compute values. - Remove fixed sleeps / single-poll assumptions and replace with deterministic retry/wait utilities.
- Adjust integration test cluster configs/timeouts and add cleanup hooks to prevent cross-test state leakage (notably DaVinci backend singleton).
Reviewed changes
Copilot reviewed 86 out of 86 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| services/venice-server/src/test/java/com/linkedin/venice/stats/VersionedDIVStatsReporterTest.java | Switch to single-threaded metrics repo to avoid async-gauge executor flakiness. |
| services/venice-server/src/test/java/com/linkedin/venice/stats/ServerReadQuotaUsageStatsTest.java | Use single-threaded metrics repo for consistent async gauge behavior. |
| services/venice-server/src/test/java/com/linkedin/venice/stats/AggVersionedDIVStatsTest.java | Use single-threaded metrics repo in setup. |
| services/venice-server/src/test/java/com/linkedin/venice/stats/AggServerReadQuotaUsageStatsTest.java | Use single-threaded metrics repo and wait for async gauge computations before asserting. |
| services/venice-server/src/test/java/com/linkedin/venice/stats/AggServerHttpRequestStatsTest.java | Use single-threaded metrics repos for both normal and KV profiling metric sets. |
| services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java | Use single-threaded metrics repo for deterministic metric evaluation. |
| services/venice-server/src/test/java/com/linkedin/venice/listener/ServerConnectionStatsHandlerTest.java | Avoid Mockito thread-safety races by using a backed Attribute and explicit activation waiting. |
| services/venice-server/src/test/java/com/linkedin/venice/cleaner/BackupVersionOptimizationServiceTest.java | Add a delay to avoid same-millis timestamp collisions affecting optimization logic. |
| services/venice-router/src/test/java/com/linkedin/venice/router/TestMetaDataHandler.java | Use single-threaded metrics repo to avoid async gauge executor sharing. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreBackupVersionCleanupService.java | Avoid strict boundary races by subtracting an extra millisecond from retention thresholds. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/TestProtocolVersionAutoDetectionService.java | Configure VeniceMetricsRepository with single-threaded Tehuti metric config for determinism. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/stats/SystemStoreHealthCheckStatsOtelTest.java | Use single-threaded Tehuti metric config and repo for OTel + Tehuti tests. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/stats/ProtocolVersionAutoDetectionStatsOtelTest.java | Use single-threaded Tehuti metric config and repo for OTel + Tehuti tests. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/stats/PartitionHealthStatsOtelTest.java | Use single-threaded Tehuti repo for NPE-safety validations. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/stats/ErrorPartitionStatsOtelTest.java | Use single-threaded Tehuti metric config/repo for stability. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/stats/DisabledPartitionStatsOtelTest.java | Use single-threaded Tehuti metric config/repo for stability. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/stats/AdminConsumptionStatsOtelTest.java | Use single-threaded Tehuti metric config/repo for stability. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/stats/AddVersionLatencyStatsOtelTest.java | Use single-threaded Tehuti metric config/repo for stability. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/lingeringjob/HeartbeatBasedCheckerStatsOtelTest.java | Use single-threaded Tehuti metric config/repo for stability. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTaskTest.java | Add latch sequencing to make concurrent-counter assertions deterministic. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java | Wait for both persisted metadata and in-memory state to update before asserting execution id. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java | Disable additional background delay-driven routines that interfere with mocking/re-mocking. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/storagenode/StorageNodeComputeTest.java | Increase test timeout for long-running ZSTD-with-dict permutations. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestRead.java | Move readiness checks inside loop and reset metric baselines per configuration toggle. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapterITest.java | Remove fragile Kafka-version-dependent exception message assertions. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapper.java | Retry system-store empty pushes within wait loops to avoid store-not-ready races. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/IntegrationTestUtils.java | Increase wait budget for participant message store initialization. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/FastClientIndividualFeatureConfigurationTest.java | Wrap repeated batch-get assertions in nondeterministic wait to handle transient variability. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigrationMultiRegion.java | Adjust cluster sizing/replication and timeouts; add explicit test timeout. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java | Reduce RF and wait for router discovery before initial reads. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestLeaderReplicaFailover.java | Reorder waits to match causal pipeline and increase timeout for Helix/ZK propagation. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestGlobalRtDiv.java | Increase payload-per-message limits to still force chunking while keeping manifest within bounds. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDeferredVersionSwapWithSequentialRolloutWithDvc.java | Update store config to reduce swap wait and tighten overall assertion timeout. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatch.java | Replace fixed sleep with nondeterministic wait for version swap + dictionary download. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestAdminOperationWithPreviousVersion.java | Increase push completion wait budget for heavily loaded shared clusters. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClusterFixture.java | Reduce RF to 1 and document why for DaVinci-focused tests. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClusterAgnosticTest.java | Reduce server count and RF to match what the test is actually validating. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java | Increase timeouts, reduce RF, and reset DaVinci backend between methods to prevent singleton leakage. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientSubscribeTest.java | Increase timeouts and add @AfterMethod backend reset to avoid cascade failures across data-provider iterations. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientRecordTransformerFilterTest.java | Force blob transfer via config and remove filesystem snapshot existence assertions. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DataRecoveryTest.java | Reduce topic cleanup aggressiveness for multi-region stability. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/ActiveActiveReplicationForHybridTest.java | Wait for version availability across all DCs to avoid producer start races. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java | Increase test timeout budget. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestHolisticSeverHealthCheck.java | Use nondeterministic wait to handle cross-thread visibility of mocked repositories. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/multitaskscheduler/StoreMigrationManagerIntegrationTest.java | Increase wait budget for pause state to be observed. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/AbstractTestVeniceHelixAdmin.java | Enable concurrent init routines to reduce sequential startup delays/timeouts. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestFlinkMaterializedViewEndToEndWithMultipleConsumers.java | Accumulate message counts across polls instead of assuming a single poll returns all messages. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/client/store/TestD2ServiceDiscovery.java | Wrap discovery assertion in nondeterministic wait to handle transient service discovery timing. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/davinci/notifier/LeaderErrorNotifier.java | Add volatile and reorder flag update to avoid cross-thread visibility and ZK-roundtrip coupling. |
| internal/venice-common/src/test/java/com/linkedin/venice/pubsub/manager/TopicManagerStatsTest.java | Use single-threaded metrics repo to avoid async-gauge executor sharing. |
| internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapterTest.java | Replace nanoTime-based ordering checks with lock-held sequence counters. |
| internal/venice-common/src/test/java/com/linkedin/venice/meta/RetryManagerTest.java | Use single-threaded metrics repo for deterministic stats behavior. |
| internal/venice-client-common/src/test/java/com/linkedin/venice/stats/ThreadPoolStatsTest.java | Use single-threaded metrics repo to avoid shared executor flakes. |
| internal/venice-client-common/src/test/java/com/linkedin/venice/stats/ThreadPoolStatsOtelTest.java | Use single-threaded Tehuti metric config when validating OTel + Tehuti behavior. |
| internal/venice-client-common/src/test/java/com/linkedin/venice/stats/routing/HelixGroupStatsTest.java | Use single-threaded metrics repo for deterministic metric calculations. |
| internal/venice-client-common/src/test/java/com/linkedin/venice/stats/OpenTelemetryMetricsSetupTest.java | Remove/adjust tests around dimensions to match updated dimension expectations. |
| internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/VeniceMetricsDimensionsTest.java | Update expected dimension-name formatting to reflect removed dimensions. |
| internal/venice-client-common/src/test/java/com/linkedin/venice/stats/AbstractVeniceStatsTest.java | Use single-threaded metrics repos/configs to avoid shared executor interference. |
| internal/alpini/common/alpini-common-base/src/test/java/com/linkedin/alpini/base/registry/TestResourceRegistry.java | Increase future-get timeouts to reduce timing sensitivity. |
| clients/venice-producer/src/test/java/com/linkedin/venice/producer/VeniceProducerMetricsTest.java | Use single-threaded metrics repo throughout to avoid async gauge flakiness. |
| clients/venice-producer/src/test/java/com/linkedin/venice/producer/PartitionedProducerExecutorTest.java | Fix parallelism check to avoid races by using a latch that confirms both tasks observed each other. |
| clients/venice-producer/src/test/java/com/linkedin/venice/producer/online/OnlineVeniceProducerTest.java | Use single-threaded metrics repo throughout producer tests. |
| clients/venice-client/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker | Enable Mockito inline mock maker to support mocking final gRPC classes. |
| clients/venice-client/src/test/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClientTest.java | Increase timeouts and tune TimeoutProcessor tick/threads to reduce retry-timing sensitivity. |
| clients/venice-client/src/test/java/com/linkedin/venice/fastclient/meta/RequestBasedMetadataTest.java | Increase timeout budget to account for construction/fetch/close overhead on CI/JDK 17. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ServerMetadataServiceStatsOtelTest.java | Use single-threaded Tehuti metric config/repo for OTel stability. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ParticipantStoreConsumptionStatsTest.java | Use single-threaded Tehuti metric config/repo for OTel stability. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ParticipantStateTransitionStatsTest.java | Use dedicated AsyncGaugeExecutor + teardown to avoid shared-executor contention/leaks. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/KafkaConsumerServiceOtelMetricEntityTest.java | Update expected dimension sets to align with current metric entity definitions. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/IngestionStatsTest.java | Use dedicated AsyncGaugeExecutor in MetricConfig to avoid shared-executor contention. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/HeartbeatMonitoringServiceStatsOtelTest.java | Use single-threaded Tehuti metric config/repo for OTel stability. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/DIVStatsReporterTest.java | Use single-threaded metrics repo/config to avoid shared async-gauge executor flakes. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/DaVinciRecordTransformerStatsTest.java | Use dedicated AsyncGaugeExecutor and ensure proper close/cleanup. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/BlobTransferStatsTest.java | Use dedicated AsyncGaugeExecutor and ensure proper close/cleanup. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/AggVersionedStorageEngineStatsTest.java | Use single-threaded metrics repo for deterministic stats updates. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStatsTest.java | Provide a dedicated Tehuti executor config when validating dual Tehuti+OTel recording. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/AdaptiveThrottlingServiceStatsTest.java | Use single-threaded Tehuti metric config/repo for OTel stability. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/NativeMetadataRepositoryTest.java | Guard background refresh scheduler interactions and explicitly stop scheduler before unsubscribe assertions. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/TestRemoteIngestionRepairService.java | Avoid background thread races by not starting service and registering tasks deterministically. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java | Increase timeouts, isolate executors per test, avoid Mockito cross-thread stubbing, and reduce shutdown blocking. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ParticipantStoreConsumptionTaskTest.java | Make time-advance + verification deterministic under races via nondeterministic wait retries. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java | Use single-threaded metrics repository instances to avoid shared executor flakes. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceTest.java | Use stable pubsub alias and adjust tests to avoid timing assumptions. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java | Use stable pubsub alias to avoid metric/dimension assumptions. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerDaVinciRecordTransformerImplTest.java | Loosen strict single-signal assumption to atLeastOnce() for condition signaling under concurrency. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java | Add a test-only backend reset helper to prevent singleton leakage between tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| MetricsRepository metricsRepository = MetricsRepositoryUtils.createSingleThreadedMetricsRepository(); | ||
| MockTehutiReporter reporter = new MockTehutiReporter(); | ||
| metricsRepository.addReporter(reporter); |
There was a problem hiding this comment.
MetricsRepositoryUtils.createSingleThreadedMetricsRepository() creates a MetricsRepository with a dedicated AsyncGaugeExecutor. This test never closes the repository (or reporter), which can leak executor threads across the suite and reintroduce flakiness. Consider wrapping the repository in try-with-resources or adding a finally block to close both the MetricsRepository and MockTehutiReporter.
| // Sleep to ensure the next recordReadUsage timestamp is strictly after | ||
| // recordDatabaseOptimization timestamp. Without this, both can land on the same | ||
| // millisecond, causing whetherToOptimize() to return false permanently | ||
| // (lastOptimizationTimestamp >= lastReadUsageTimestamp). | ||
| Thread.sleep(NO_READ_THRESHOLD_MS_FOR_DATABASE_OPTIMIZATION + 1); | ||
|
|
There was a problem hiding this comment.
The Thread.sleep(NO_READ_THRESHOLD_MS_FOR_DATABASE_OPTIMIZATION + 1) relies on System.currentTimeMillis() advancing with sub-10ms granularity. On platforms with coarse clock resolution, the optimization and read-usage timestamps can still land in the same millisecond, keeping this test flaky. Prefer waiting until System.currentTimeMillis() is strictly greater than the captured optimization timestamp (loop/assert) or refactor the service to use an injectable clock in tests.
| /** | ||
| * Nulls out the static DaVinci backend singleton for test cleanup. When ThreadTimeoutException | ||
| * interrupts a test, the singleton may leak with stale state (e.g., cache config) that poisons | ||
| * subsequent tests. The leaked backend's resources are cleaned up by test framework teardown. | ||
| */ | ||
| // Visible for testing | ||
| public static void resetDaVinciBackendForTests() { | ||
| synchronized (AvroGenericDaVinciClient.class) { | ||
| daVinciBackend = null; | ||
| } | ||
| } |
There was a problem hiding this comment.
resetDaVinciBackendForTests() drops the static ReferenceCounted<DaVinciBackend> without releasing/closing it. This can leak backend resources/threads and can also cause NPEs if any in-flight cleanup later calls getBackend() or daVinciBackend.release(). Consider swapping the reference to a local variable under the lock and force-closing the backend safely (or releasing down to zero) before nulling, and/or guarding getBackend() against a null daVinciBackend with a clearer failure mode.
| @Test | ||
| public void testRecordLatency() { | ||
| MetricsRepository metricsRepository = new MetricsRepository(); | ||
| MetricsRepository metricsRepository = MetricsRepositoryUtils.createSingleThreadedMetricsRepository(); | ||
| String pubSubClusterAddress = "venice.kafka.dc-1.linkedin.com:12345"; | ||
| TopicManagerStats stats = | ||
| new TopicManagerStats(metricsRepository, pubSubClusterAddress, VeniceComponent.CONTROLLER); |
There was a problem hiding this comment.
MetricsRepositoryUtils.createSingleThreadedMetricsRepository() uses a dedicated AsyncGaugeExecutor, but this test never closes the MetricsRepository. To avoid leaking executor threads across tests, close the repository in a finally block or use try-with-resources.
| public void testGetReadQuotaUsageRatio() { | ||
| MetricsRepository metricsRepository = new MetricsRepository(); | ||
| MetricsRepository metricsRepository = MetricsRepositoryUtils.createSingleThreadedMetricsRepository(); | ||
| String storeName = "test-store"; | ||
| ServerReadQuotaUsageStats stats = | ||
| new ServerReadQuotaUsageStats(metricsRepository, storeName, new SystemTime(), null); |
There was a problem hiding this comment.
This test creates a single-threaded MetricsRepository via MetricsRepositoryUtils, but never closes it. Since the utility builds a dedicated AsyncGaugeExecutor, consider closing the repository (and any reporters) in a finally block / try-with-resources to prevent leaking metric-measurement threads across the test suite.
| @Test | ||
| public void testThreadPoolStatsReporterCanReport() { | ||
| MetricsRepository metricsRepository = new MetricsRepository(); | ||
| MetricsRepository metricsRepository = MetricsRepositoryUtils.createSingleThreadedMetricsRepository(); | ||
| MockTehutiReporter reporter = new MockTehutiReporter(); | ||
| metricsRepository.addReporter(reporter); |
There was a problem hiding this comment.
The MetricsRepository created via MetricsRepositoryUtils.createSingleThreadedMetricsRepository() is not closed. To avoid leaking the dedicated async-gauge executor thread(s) created by the repository, close it in a teardown/finally block or use try-with-resources.
dab1473 to
53ca127
Compare
53ca127 to
6a4d41f
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 82 out of 82 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -1265,7 +1270,7 @@ private StoreIngestionTaskFactory.Builder getIngestionTaskFactoryBuilder( | |||
| mockIngestionThrottler, | |||
| kafkaClusterBasedRecordThrottler, | |||
| mockMetricRepo, | |||
| "test-region", | |||
| inMemoryLocalKafkaBroker.getPubSubBrokerAddress(), | |||
| 1000, | |||
There was a problem hiding this comment.
The pubsubRegionAlias argument passed to KafkaConsumerService.ConsumerAssignmentStrategy.constructor.construct(...) is now a broker address (host:port). This value is used to build metric name prefixes (e.g., kafka_consumer_service_for_ + alias), so using an address (with :) can produce invalid/unexpected metric names and is semantically different from a region alias. It also looks inconsistent that the remote consumer service is still constructed with the local broker address at line 1273. Recommend passing a stable region alias string (as before) or, if uniqueness is needed, use a sanitized alias and ensure the remote service uses the remote alias/address consistently.
6a4d41f to
6a3057f
Compare
Test-only changes to fix flaky tests. No production behavior changes. Root cause categories fixed: - AsyncGauge shared executor: use dedicated MetricsRepositoryUtils executor - Missing volatile on cross-thread test fields - Utils.sleep() before assertions: replaced with waitForNonDeterministicAssertion - replicationFactor=2 with few servers: reduced to RF=1 where replication isn't tested - Version not waited in all regions: wait in ALL DCs before proceeding - Static DaVinci singleton leak across data provider iterations: @AfterMethod cleanup - Fragile exception message assertions: assert on type not message text - Accumulate poll results instead of expecting all in single poll - Try-finally for test resource cleanup (executor/MetricsRepository) - Topic cleanup interval too aggressive for multi-region tests - Router not yet discovered store version: wrap reads in waitForNonDeterministicAssertion - Mockito inline mock maker for final gRPC classes
6a3057f to
a776051
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 83 out of 83 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Do not start the service to avoid the background thread racing with pollRepairTasks() calls below. | ||
| // Instead, register tasks directly into the internal map. | ||
| RemoteIngestionRepairService repairService = new RemoteIngestionRepairService(1000000); | ||
| repairService.start(); | ||
| StoreIngestionTask mockBrockenIngestionTask = Mockito.mock(StoreIngestionTask.class); | ||
| StoreIngestionTask mockBrokenIngestionTask = Mockito.mock(StoreIngestionTask.class); | ||
| StoreIngestionTask mockWorkingIngestionTask = Mockito.mock(StoreIngestionTask.class); | ||
| repairService.registerRepairTask(mockBrockenIngestionTask, () -> { | ||
|
|
||
| Runnable brokenTask = () -> { | ||
| throw new VeniceException("AAAAHHH!!!!"); | ||
| }); | ||
| repairService.registerRepairTask(mockWorkingIngestionTask, () -> {}); | ||
| repairService.registerRepairTask(mockBrockenIngestionTask, () -> {/* This task works and should clear */}); | ||
| repairService.stop(); | ||
| }; | ||
| Runnable workingTask1 = () -> {}; | ||
| Runnable workingTask2 = () -> {/* This task works and should clear */}; | ||
|
|
||
| repairService.getIngestionRepairTasks() | ||
| .computeIfAbsent(mockBrokenIngestionTask, k -> new LinkedBlockingDeque<>()) | ||
| .offer(brokenTask); | ||
| repairService.getIngestionRepairTasks() | ||
| .computeIfAbsent(mockWorkingIngestionTask, k -> new LinkedBlockingDeque<>()) | ||
| .offer(workingTask1); | ||
| repairService.getIngestionRepairTasks() | ||
| .computeIfAbsent(mockBrokenIngestionTask, k -> new LinkedBlockingDeque<>()) | ||
| .offer(workingTask2); |
There was a problem hiding this comment.
This test no longer exercises the public lifecycle/API (start(), registerRepairTask(), stop()) and instead mutates getIngestionRepairTasks() directly, bypassing the thread-alive guard and the queueing behavior in registerRepairTask(). That reduces coverage for the class’ main contract and could miss regressions. A less racy alternative is to keep using registerRepairTask() but avoid background polling by not starting the service thread and invoking pollRepairTasks() directly (after making registerRepairTask() test-friendly), or start with a very long sleep interval and explicitly control when polling happens.
| import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CLUSTER_NAME; | ||
| import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_DRAINER_TYPE; | ||
| import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_NAME; | ||
| import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REQUEST_METHOD; | ||
| import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_ROUTE_NAME; | ||
| import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_NAME; |
There was a problem hiding this comment.
The removed assertions for setRegionName() (dimension + attributes) and the removed null-guard test for addCustomDimension(...) reduce coverage for behavior that still exists in OpenTelemetryMetricsSetup.Builder (see OpenTelemetryMetricsSetup.java:151 and :197-203). If those tests were removed due to flakiness, it would help to reintroduce them in a parallel-safe way (e.g., add the missing VENICE_REGION_NAME stub in setUp() and keep the assertions), so regressions in region dimension handling and null validation are still caught.
Summary
Test-only changes to fix flaky tests. No production behavior changes (except one test-visible static method on
AvroGenericDaVinciClient).85 test files changed across all modules.
Root cause categories fixed
MetricsRepositoryUtilsexecutor instead ofnew MetricsRepository()volatileon cross-thread test fields (NativeMetadataRepositoryTest, LeaderErrorNotifier)Utils.sleep()before assertions: replaced withwaitForNonDeterministicAssertion(TestBatch ZSTD)replicationFactor=2with few servers: reduced to RF=1 where replication isn't being tested (DaVinciClusterFixture, DaVinciClientTest, TestStoreMigration)@AfterMethodcleanup (DaVinciClientSubscribeTest, DaVinciClientTest)waitForNonDeterministicAssertion(TestStoreMigration)Test plan