Skip to content

Commit 53ca127

Browse files
committed
[test] Fix 30+ flaky tests across multiple modules
Test-only changes to fix flaky tests. No production behavior changes. Root cause categories fixed: - AsyncGauge shared executor: use dedicated MetricsRepositoryUtils executor - Missing volatile on cross-thread test fields - Utils.sleep() before assertions: replaced with waitForNonDeterministicAssertion - replicationFactor=2 with few servers: reduced to RF=1 where replication isn't tested - Version not waited in all regions: wait in ALL DCs before proceeding - Static DaVinci singleton leak across data provider iterations: @AfterMethod cleanup - Fragile exception message assertions: assert on type not message text - Accumulate poll results instead of expecting all in single poll - Try-finally for test resource cleanup (executor/MetricsRepository) - Topic cleanup interval too aggressive for multi-region tests - Router not yet discovered store version: wrap reads in waitForNonDeterministicAssertion - Mockito inline mock maker for final gRPC classes
1 parent fcaa09c commit 53ca127

83 files changed

Lines changed: 876 additions & 465 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

AGENTS.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# Repository Guidelines
2+
3+
## Project Structure & Module Organization
4+
5+
Venice is a multi-module Gradle build. Runtime services sit in `services/*/src`, while client libraries live under
6+
`clients/*/src`. Shared logic and generated schemas are centralized in `internal/` and `specs/`. Integration artefacts
7+
(Docker images, remote runners, connectors) are kept in `integrations/`, `docker/`, and `scripts/`. Tests are co-located
8+
with code, and end-to-end scenarios live in `tests/venice-pulsar-test`. Consult `docs/` for design notes before altering
9+
cross-module contracts.
10+
11+
## Build, Test, and Development Commands
12+
13+
- `./gradlew clean build` – compile every module, run unit/integration tests, and assemble distributables (server,
14+
router, controller).
15+
- `./gradlew test` – execute module unit suites; use `-PskipIntegrationTests=true` when iterating quickly.
16+
- `./gradlew :services:venice-server:installDist` (or similar) – produce runnable binaries for targeted modules.
17+
- `./gradlew spotlessApply` – auto-format Java sources per Eclipse config; the pre-commit hook calls this.
18+
- `./gradlew spotbugsMain` & `./gradlew check` – static analysis plus aggregated quality gate.
19+
20+
## Coding Style & Naming Conventions
21+
22+
Java 8+ code formatted via Spotless using `gradle/spotless/eclipse-java-formatter.xml`. Do not hand-tune indentation;
23+
run Spotless instead. Adopt `UpperCamelCase` for classes, `lowerCamelCase` for fields/methods, and `SNAKE_CASE` only for
24+
constants. Favor `Optional` over `null` checks in new code and keep logging through `VeniceLogger`. Shared config
25+
objects live in `Venice*Config` classes; follow existing naming to keep discovery predictable.
26+
27+
## Testing Guidelines
28+
29+
Unit tests use TestNG plus Mockito; place them in the matching module under `src/test/java`. Name classes `*Test` or
30+
`*IntegrationTest` so Gradle discovers them. Each module defines a `jacocoCoverageThreshold`; add or extend tests to
31+
keep coverage above those guards. For pulsar/kafka end-to-end flows, reuse the harness in `tests/venice-pulsar-test` and
32+
run via `./gradlew integrationTest -PtestProfiles=pulsar`.
33+
34+
## Commit & Pull Request Guidelines
35+
36+
Follow the history pattern: `[scope] Concise summary (#issue)` (for example, `[server] Guard admin topic switch`). Group
37+
related edits into single commits, run `./gradlew clean build` locally, and let the pre-commit hook finish before
38+
pushing. Pull requests should include: purpose and rollout notes, validation evidence (test command output or
39+
screenshots for UI changes), and links to tracking tickets. Flag breaking config changes and update `docs/` or `specs/`
40+
when altering public schemas or protocol fields.
41+
42+
## Environment & Tooling Tips
43+
44+
Run `./gradlew setupWorkspace` once to install repo git hooks and blame ignore rules. Local override configurations
45+
belong in `~/.gradle/gradle.properties`; never commit credentials—use the sample placeholders in `consumer.config` and
46+
`run-remote-cp.sh`. When cutting releases, coordinate with `make_tag.py` and document the tag in the internal release
47+
tracker.

clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -896,6 +896,18 @@ public static DaVinciBackend getBackend() {
896896
}
897897
}
898898

899+
/**
900+
* Nulls out the static DaVinci backend singleton for test cleanup. When ThreadTimeoutException
901+
* interrupts a test, the singleton may leak with stale state (e.g., cache config) that poisons
902+
* subsequent tests. The leaked backend's resources are cleaned up by test framework teardown.
903+
*/
904+
// Visible for testing
905+
public static void resetDaVinciBackendForTests() {
906+
synchronized (AvroGenericDaVinciClient.class) {
907+
daVinciBackend = null;
908+
}
909+
}
910+
899911
@Override
900912
public synchronized void start() {
901913
if (isReady()) {

clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerDaVinciRecordTransformerImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ public void testMaxBufferSize() throws NoSuchFieldException, IllegalAccessExcept
474474
}
475475

476476
TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> {
477-
verify(bufferIsFullCondition).signal();
477+
verify(bufferIsFullCondition, atLeastOnce()).signal();
478478
});
479479
}
480480

clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import com.linkedin.venice.utils.TestUtils;
7575
import com.linkedin.venice.utils.VeniceProperties;
7676
import com.linkedin.venice.utils.locks.ResourceAutoClosableLockManager;
77+
import com.linkedin.venice.utils.metrics.MetricsRepositoryUtils;
7778
import io.tehuti.metrics.MetricConfig;
7879
import io.tehuti.metrics.MetricsRepository;
7980
import io.tehuti.metrics.stats.AsyncGauge;
@@ -147,7 +148,7 @@ public void setUp() {
147148
mockMetadataRepo,
148149
mockSchemaRepo,
149150
mockLiveClusterConfigRepo,
150-
new MetricsRepository(),
151+
MetricsRepositoryUtils.createSingleThreadedMetricsRepository(),
151152
Optional.empty(),
152153
Optional.empty(),
153154
AvroProtocolDefinition.PARTITION_STATE.getSerializer(),
@@ -469,7 +470,7 @@ public void testStoreIngestionTaskShutdownLastPartition() {
469470
mockMetadataRepo,
470471
mockSchemaRepo,
471472
mockLiveClusterConfigRepo,
472-
new MetricsRepository(),
473+
MetricsRepositoryUtils.createSingleThreadedMetricsRepository(),
473474
Optional.empty(),
474475
Optional.empty(),
475476
AvroProtocolDefinition.PARTITION_STATE.getSerializer(),
@@ -793,7 +794,7 @@ public void testInitParticipantConsumptionTask() {
793794
mockServerConfig,
794795
Optional.of(mockClientConfig),
795796
mockClusterInfoProvider,
796-
new MetricsRepository(),
797+
MetricsRepositoryUtils.createSingleThreadedMetricsRepository(),
797798
mockIcProvider);
798799
assertNotNull(
799800
pct,
@@ -805,7 +806,7 @@ public void testInitParticipantConsumptionTask() {
805806
mockServerConfig,
806807
Optional.of(mockClientConfig),
807808
mockClusterInfoProvider,
808-
new MetricsRepository(),
809+
MetricsRepositoryUtils.createSingleThreadedMetricsRepository(),
809810
mockIcProvider);
810811
assertNull(
811812
pct,
@@ -817,7 +818,7 @@ public void testInitParticipantConsumptionTask() {
817818
mockServerConfig,
818819
Optional.empty(),
819820
mockClusterInfoProvider,
820-
new MetricsRepository(),
821+
MetricsRepositoryUtils.createSingleThreadedMetricsRepository(),
821822
mockIcProvider);
822823
assertNull(pct, "Participant consumption task should not be initialized when client config is not present");
823824
}

clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ParticipantStoreConsumptionTaskTest.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static com.linkedin.venice.stats.OpenTelemetryMetricsSetup.UNKNOWN_STORE_NAME;
44
import static org.mockito.ArgumentMatchers.any;
55
import static org.mockito.ArgumentMatchers.anyDouble;
6+
import static org.mockito.Mockito.atLeastOnce;
67
import static org.mockito.Mockito.doReturn;
78
import static org.mockito.Mockito.doThrow;
89
import static org.mockito.Mockito.mock;
@@ -20,10 +21,12 @@
2021
import com.linkedin.venice.participant.protocol.ParticipantMessageValue;
2122
import com.linkedin.venice.participant.protocol.enums.ParticipantMessageType;
2223
import com.linkedin.venice.utils.SleepStallingMockTime;
24+
import com.linkedin.venice.utils.TestUtils;
2325
import com.linkedin.venice.utils.Utils;
2426
import java.util.HashSet;
2527
import java.util.Set;
2628
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.TimeUnit;
2730
import java.util.function.Function;
2831
import org.testng.annotations.Test;
2932

@@ -215,11 +218,15 @@ public void testOuterCatchRecordsFailedConsumptionWithUnknownStore() {
215218
// Wait for the task to start and reach the sleep stall (heartbeat fires before sleep)
216219
verify(stats, timeout(WAIT).times(1)).recordHeartbeat();
217220

218-
// Now advance time to unblock the sleep — the task will call getIngestingTopicsWithVersionStatusNotOnline
219-
// which throws, hitting the outer catch that records with UNKNOWN_STORE_NAME
220-
time.advanceTime(1);
221-
222-
verify(stats, timeout(WAIT).times(1)).recordKillPushJobFailedConsumption(UNKNOWN_STORE_NAME);
221+
// Advance time to unblock the sleep — the task will call getIngestingTopicsWithVersionStatusNotOnline
222+
// which throws, hitting the outer catch that records with UNKNOWN_STORE_NAME.
223+
// Use waitForNonDeterministicAssertion because there is a small race window: if advanceTime(1)
224+
// is called before the task thread enters sleep(), the time advancement is consumed by the next
225+
// sleep call instead. Retrying advances time again to guarantee the task progresses.
226+
TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, true, () -> {
227+
time.advanceTime(1);
228+
verify(stats, atLeastOnce()).recordKillPushJobFailedConsumption(UNKNOWN_STORE_NAME);
229+
});
223230

224231
// Close the task before the negative assertions so no further loop iterations can fire.
225232
task.close();

0 commit comments

Comments
 (0)