Skip to content

Commit 9292ce9

Browse files
committed
[controller] Block pushes during rollback-origin retention; address Copilot
comments Adds `checkRollbackOriginVersionCapacityForNewPush` which rejects a new push while any ROLLED_BACK (or parent-side rollback-origin PARTIALLY_ONLINE) version is still within its retention window. The check runs on both the parent (`VeniceParentHelixAdmin.incrementVersionIdempotent`) and the child (`VeniceHelixAdmin.addVersion`), so the rejection surfaces synchronously to the VPJ instead of failing only in async admin-message consumption on the child. Integration tests cover both the block (within retention) and the release (after retention expires) paths. Also addresses Copilot comments on PR #2688: - #11 `assumeRolledBackIfUnreachable` is now empty for full-cluster rollbacks so unreachable regions don't inflate the ROLLED_BACK count. - #14 If the region filter contains only unknown regions, skip the parent status update instead of falling through into full-cluster behavior. - #15 If zero regions confirm ROLLED_BACK, leave parent status unchanged rather than downgrading to PARTIALLY_ONLINE on no evidence.
1 parent 52b8439 commit 9292ce9

5 files changed

Lines changed: 583 additions & 7 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package com.linkedin.venice.endToEnd;
2+
3+
import static com.linkedin.venice.ConfigKeys.CONTROLLER_ROLLED_BACK_VERSION_RETENTION_MS;
4+
import static com.linkedin.venice.ConfigKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS;
5+
import static com.linkedin.venice.ConfigKeys.DEFAULT_PARTITION_SIZE;
6+
import static com.linkedin.venice.ConfigKeys.TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS;
7+
import static com.linkedin.venice.utils.IntegrationTestPushUtils.createStoreForJob;
8+
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V3_SCHEMA;
9+
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
10+
import static org.testng.Assert.assertFalse;
11+
import static org.testng.Assert.assertTrue;
12+
13+
import com.linkedin.venice.controllerapi.ControllerClient;
14+
import com.linkedin.venice.controllerapi.ControllerResponse;
15+
import com.linkedin.venice.controllerapi.StoreResponse;
16+
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
17+
import com.linkedin.venice.meta.Version;
18+
import com.linkedin.venice.meta.VersionStatus;
19+
import com.linkedin.venice.utils.IntegrationTestPushUtils;
20+
import com.linkedin.venice.utils.TestUtils;
21+
import com.linkedin.venice.utils.TestWriteUtils;
22+
import com.linkedin.venice.utils.Utils;
23+
import java.io.File;
24+
import java.io.IOException;
25+
import java.util.Properties;
26+
import java.util.concurrent.TimeUnit;
27+
import org.testng.annotations.Test;
28+
29+
30+
/**
31+
* Companion to {@link TestPushBlockedByRollbackOriginGuard}: verifies that once the rolled-back
32+
* retention window elapses, a new push is allowed through and becomes the next current version.
33+
*
34+
* <p>Retention is tuned to 5 seconds so the test can wait past it within its lifetime.
35+
*/
36+
public class TestPushAllowedAfterRollbackRetentionExpires extends AbstractMultiRegionTest {
37+
private static final int TEST_TIMEOUT = 180_000; // ms
38+
private static final long ROLLED_BACK_RETENTION_MS = TimeUnit.SECONDS.toMillis(5);
39+
40+
@Override
41+
protected int getNumberOfRegions() {
42+
return 1;
43+
}
44+
45+
@Override
46+
protected int getNumberOfServers() {
47+
return 1;
48+
}
49+
50+
@Override
51+
protected int getReplicationFactor() {
52+
return 1;
53+
}
54+
55+
@Override
56+
protected Properties getExtraControllerProperties() {
57+
Properties controllerProps = new Properties();
58+
controllerProps.put(DEFAULT_MAX_NUMBER_OF_PARTITIONS, 1);
59+
controllerProps.put(DEFAULT_PARTITION_SIZE, 10);
60+
controllerProps
61+
.setProperty(TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS, String.valueOf(Long.MAX_VALUE));
62+
controllerProps.put(CONTROLLER_ROLLED_BACK_VERSION_RETENTION_MS, ROLLED_BACK_RETENTION_MS);
63+
return controllerProps;
64+
}
65+
66+
@Test(timeOut = TEST_TIMEOUT)
67+
public void testPushSucceedsAfterRolledBackRetentionExpires() throws IOException, InterruptedException {
68+
File inputDir = getTempDataDirectory();
69+
TestWriteUtils.writeSimpleAvroFileWithStringToV3Schema(inputDir, 100, 100);
70+
String inputDirPath = "file://" + inputDir.getAbsolutePath();
71+
String storeName = Utils.getUniqueString("pushAllowedAfterRetention");
72+
73+
Properties props =
74+
IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPath, storeName);
75+
try (ControllerClient parentControllerClient = createStoreForJob(
76+
CLUSTER_NAME,
77+
"\"string\"",
78+
NAME_RECORD_V3_SCHEMA.toString(),
79+
props,
80+
new UpdateStoreQueryParams())) {
81+
IntegrationTestPushUtils.runVPJ(props);
82+
TestUtils.waitForNonDeterministicPushCompletion(
83+
Version.composeKafkaTopic(storeName, 1),
84+
parentControllerClient,
85+
30,
86+
TimeUnit.SECONDS);
87+
IntegrationTestPushUtils.runVPJ(props);
88+
TestUtils.waitForNonDeterministicPushCompletion(
89+
Version.composeKafkaTopic(storeName, 2),
90+
parentControllerClient,
91+
30,
92+
TimeUnit.SECONDS);
93+
94+
// Rollback: v1 becomes current, v2 becomes ROLLED_BACK.
95+
ControllerResponse rollbackResponse = parentControllerClient.rollbackToBackupVersion(storeName);
96+
assertFalse(rollbackResponse.isError(), "rollback failed: " + rollbackResponse.getError());
97+
98+
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
99+
StoreResponse resp = parentControllerClient.getStore(storeName);
100+
assertFalse(resp.isError(), "getStore error: " + resp.getError());
101+
assertTrue(
102+
resp.getStore().getVersion(2).isPresent()
103+
&& resp.getStore().getVersion(2).get().getStatus() == VersionStatus.ROLLED_BACK,
104+
"Expected v2 to be ROLLED_BACK after rollback, got: " + resp.getStore().getVersion(2));
105+
});
106+
107+
// Wait past the rolled-back retention so the guard allows the next push through.
108+
// Buffer added to account for clock skew between latestVersionPromoteToCurrentTimestamp
109+
// (set at rollback time) and this wait's wall clock.
110+
Thread.sleep(ROLLED_BACK_RETENTION_MS + TimeUnit.SECONDS.toMillis(2));
111+
112+
// v3 push should now succeed; the rollback-origin guard must not block it past retention.
113+
IntegrationTestPushUtils.runVPJ(props);
114+
TestUtils.waitForNonDeterministicPushCompletion(
115+
Version.composeKafkaTopic(storeName, 3),
116+
parentControllerClient,
117+
60,
118+
TimeUnit.SECONDS);
119+
120+
StoreResponse finalStoreResponse = parentControllerClient.getStore(storeName);
121+
assertFalse(finalStoreResponse.isError());
122+
assertTrue(
123+
finalStoreResponse.getStore().getVersion(3).isPresent(),
124+
"Version 3 should have been created after retention expired");
125+
}
126+
}
127+
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package com.linkedin.venice.endToEnd;
2+
3+
import static com.linkedin.venice.ConfigKeys.CONTROLLER_ROLLED_BACK_VERSION_RETENTION_MS;
4+
import static com.linkedin.venice.ConfigKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS;
5+
import static com.linkedin.venice.ConfigKeys.DEFAULT_PARTITION_SIZE;
6+
import static com.linkedin.venice.ConfigKeys.TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS;
7+
import static com.linkedin.venice.utils.IntegrationTestPushUtils.createStoreForJob;
8+
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V3_SCHEMA;
9+
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
10+
import static org.testng.Assert.assertFalse;
11+
import static org.testng.Assert.assertTrue;
12+
import static org.testng.Assert.fail;
13+
14+
import com.linkedin.venice.controllerapi.ControllerClient;
15+
import com.linkedin.venice.controllerapi.ControllerResponse;
16+
import com.linkedin.venice.controllerapi.StoreResponse;
17+
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
18+
import com.linkedin.venice.hadoop.VenicePushJob;
19+
import com.linkedin.venice.meta.Version;
20+
import com.linkedin.venice.meta.VersionStatus;
21+
import com.linkedin.venice.utils.IntegrationTestPushUtils;
22+
import com.linkedin.venice.utils.TestUtils;
23+
import com.linkedin.venice.utils.TestWriteUtils;
24+
import com.linkedin.venice.utils.Utils;
25+
import java.io.File;
26+
import java.io.IOException;
27+
import java.util.Properties;
28+
import java.util.concurrent.TimeUnit;
29+
import org.testng.annotations.Test;
30+
31+
32+
/**
33+
* Integration test for {@link com.linkedin.venice.controller.VeniceHelixAdmin#checkRollbackOriginVersionCapacityForNewPush}.
34+
*
35+
* <p>After a rollback, v2 becomes ROLLED_BACK (on the parent, once propagated) and v1 becomes
36+
* current. A new push within the rolled-back retention window should be rejected with a message
37+
* that tells operators to wait for the rolled-back version to be cleaned up.
38+
*
39+
* <p>Config tuning: the min backup version cleanup delay is left at the default (0) so the
40+
* generic pending-deletion guard does NOT fire first. The rolled-back retention is set to 1 minute
41+
* so the rollback-origin guard reliably fires within the test lifetime.
42+
*
43+
* <p>This test is in its own class (separate from {@link TestPushBlockedWithinMinCleanupDelay}) so
44+
* the two guards can be exercised in isolation with their respective configs.
45+
*/
46+
public class TestPushBlockedByRollbackOriginGuard extends AbstractMultiRegionTest {
47+
private static final int TEST_TIMEOUT = 180_000; // ms
48+
49+
@Override
50+
protected int getNumberOfRegions() {
51+
return 1;
52+
}
53+
54+
@Override
55+
protected int getNumberOfServers() {
56+
return 1;
57+
}
58+
59+
@Override
60+
protected int getReplicationFactor() {
61+
return 1;
62+
}
63+
64+
@Override
65+
protected Properties getExtraControllerProperties() {
66+
Properties controllerProps = new Properties();
67+
controllerProps.put(DEFAULT_MAX_NUMBER_OF_PARTITIONS, 1);
68+
controllerProps.put(DEFAULT_PARTITION_SIZE, 10);
69+
controllerProps
70+
.setProperty(TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS, String.valueOf(Long.MAX_VALUE));
71+
// Override the default 24h retention so the rollback-origin guard reliably fires during the test.
72+
// CONTROLLER_BACKUP_VERSION_MIN_CLEANUP_DELAY_MS defaults to 0 in the test harness, which means
73+
// the other (min-delay) guard does not fire here — ensuring this test exercises the rollback-origin
74+
// guard and nothing else.
75+
controllerProps.put(CONTROLLER_ROLLED_BACK_VERSION_RETENTION_MS, TimeUnit.MINUTES.toMillis(30));
76+
return controllerProps;
77+
}
78+
79+
@Test(timeOut = TEST_TIMEOUT)
80+
public void testPushBlockedAfterRollbackWithinRolledBackRetention() throws IOException {
81+
File inputDir = getTempDataDirectory();
82+
TestWriteUtils.writeSimpleAvroFileWithStringToV3Schema(inputDir, 100, 100);
83+
String inputDirPath = "file://" + inputDir.getAbsolutePath();
84+
String storeName = Utils.getUniqueString("pushBlockedByRollbackOrigin");
85+
86+
Properties props =
87+
IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPath, storeName);
88+
try (ControllerClient parentControllerClient = createStoreForJob(
89+
CLUSTER_NAME,
90+
"\"string\"",
91+
NAME_RECORD_V3_SCHEMA.toString(),
92+
props,
93+
new UpdateStoreQueryParams())) {
94+
// Push v1, v2 so the store has two versions to roll back between.
95+
IntegrationTestPushUtils.runVPJ(props);
96+
TestUtils.waitForNonDeterministicPushCompletion(
97+
Version.composeKafkaTopic(storeName, 1),
98+
parentControllerClient,
99+
30,
100+
TimeUnit.SECONDS);
101+
IntegrationTestPushUtils.runVPJ(props);
102+
TestUtils.waitForNonDeterministicPushCompletion(
103+
Version.composeKafkaTopic(storeName, 2),
104+
parentControllerClient,
105+
30,
106+
TimeUnit.SECONDS);
107+
108+
// Rollback: v1 becomes current, v2 becomes ROLLED_BACK (PR #2688).
109+
ControllerResponse rollbackResponse = parentControllerClient.rollbackToBackupVersion(storeName);
110+
assertFalse(rollbackResponse.isError(), "rollback failed: " + rollbackResponse.getError());
111+
112+
// Wait for the parent's view to reflect v2 = ROLLED_BACK so the rollback-origin guard has
113+
// something to find when the VPJ calls addVersion. Without this wait the parent's in-memory
114+
// state can lag briefly behind the rollback admin message propagation.
115+
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
116+
StoreResponse resp = parentControllerClient.getStore(storeName);
117+
assertFalse(resp.isError(), "getStore error: " + resp.getError());
118+
assertTrue(
119+
resp.getStore().getVersion(2).isPresent()
120+
&& resp.getStore().getVersion(2).get().getStatus() == VersionStatus.ROLLED_BACK,
121+
"Expected v2 to be ROLLED_BACK after rollback, got: " + resp.getStore().getVersion(2));
122+
});
123+
124+
// Attempt v3 push; the parent-level rollback-origin guard should reject it synchronously
125+
// so the VPJ throws with the guard's distinctive message.
126+
try (VenicePushJob job = new VenicePushJob("venice-push-job-v3-" + storeName, props)) {
127+
job.run();
128+
fail("Expected VPJ to fail — rollback-origin guard should block v3 push within retention");
129+
} catch (Exception e) {
130+
String message = e.getMessage();
131+
assertTrue(
132+
message != null && message.contains("Retry after the rolled-back version is cleaned up"),
133+
"Expected rollback-origin guard message, got: " + message);
134+
}
135+
136+
StoreResponse finalStoreResponse = parentControllerClient.getStore(storeName);
137+
assertFalse(finalStoreResponse.isError());
138+
assertFalse(
139+
finalStoreResponse.getStore().getVersion(3).isPresent(),
140+
"Version 3 should NOT have been created after guard rejection");
141+
}
142+
}
143+
}

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static com.linkedin.venice.meta.VersionStatus.KILLED;
2323
import static com.linkedin.venice.meta.VersionStatus.NOT_CREATED;
2424
import static com.linkedin.venice.meta.VersionStatus.ONLINE;
25+
import static com.linkedin.venice.meta.VersionStatus.PARTIALLY_ONLINE;
2526
import static com.linkedin.venice.meta.VersionStatus.PUSHED;
2627
import static com.linkedin.venice.meta.VersionStatus.ROLLED_BACK;
2728
import static com.linkedin.venice.meta.VersionStatus.STARTED;
@@ -3238,6 +3239,11 @@ private Pair<Boolean, Version> addVersion(
32383239

32393240
// Block the push if backup versions are pending deletion but still within the min cleanup delay.
32403241
checkBackupVersionCleanupCapacityForNewPush(clusterName, storeName, store, store.getBackupStrategy());
3242+
3243+
// Block the push if any rollback-origin version (ROLLED_BACK, or rollback-origin
3244+
// PARTIALLY_ONLINE on the parent) is still within its retention window. Gives fat clients time
3245+
// to switch to the new version.
3246+
checkRollbackOriginVersionCapacityForNewPush(clusterName, storeName, store);
32413247
backupStrategy = store.getBackupStrategy();
32423248
offlinePushStrategy = store.getOffLinePushStrategy();
32433249

@@ -4574,6 +4580,55 @@ static void checkBackupVersionCleanupCapacityForNewPush(
45744580
}
45754581
}
45764582

4583+
/**
4584+
* Throws if starting a new push would violate the retention window for a rollback-origin version.
4585+
* Blocks when a {@code ROLLED_BACK} version exists, or when a {@code PARTIALLY_ONLINE} version
4586+
* sits above the current version (parent controller, region-filtered rollback).
4587+
* The block lifts once {@code latestVersionPromoteToCurrentTimestamp + rolledBackVersionRetentionMs}
4588+
* elapses; past that point, the version will be swept on the next SOP deletion pass.
4589+
*/
4590+
private void checkRollbackOriginVersionCapacityForNewPush(String clusterName, String storeName, Store store) {
4591+
checkRollbackOriginVersionCapacityForNewPush(
4592+
clusterName,
4593+
storeName,
4594+
store,
4595+
multiClusterConfigs.getRolledBackVersionRetentionMs(),
4596+
System.currentTimeMillis());
4597+
}
4598+
4599+
/** Testable variant — config and time passed in. */
4600+
static void checkRollbackOriginVersionCapacityForNewPush(
4601+
String clusterName,
4602+
String storeName,
4603+
Store store,
4604+
long rolledBackVersionRetentionMs,
4605+
long currentTimeMs) {
4606+
long retentionExpiresAt = store.getLatestVersionPromoteToCurrentTimestamp() + rolledBackVersionRetentionMs;
4607+
if (currentTimeMs > retentionExpiresAt) {
4608+
// Past retention — SOP deletion will clean up any rollback-origin versions.
4609+
return;
4610+
}
4611+
int currentVersion = store.getCurrentVersion();
4612+
for (Version v: store.getVersions()) {
4613+
VersionStatus status = v.getStatus();
4614+
// PARTIALLY_ONLINE with number > currentVersion indicates a rollback-origin state
4615+
// (region-filtered rollback on the parent). Push-origin PARTIALLY_ONLINE has number == currentVersion.
4616+
boolean isRollbackOrigin =
4617+
status == ROLLED_BACK || (status == PARTIALLY_ONLINE && v.getNumber() > currentVersion);
4618+
if (isRollbackOrigin) {
4619+
throw new VeniceException(
4620+
String.format(
4621+
"Cannot start new push for store %s in cluster %s: version %d is %s from a rollback; "
4622+
+ "retention expires in %dms. Retry after the rolled-back version is cleaned up.",
4623+
storeName,
4624+
clusterName,
4625+
v.getNumber(),
4626+
status,
4627+
retentionExpiresAt - currentTimeMs));
4628+
}
4629+
}
4630+
}
4631+
45774632
/**
45784633
* For a given store, determine its versions to delete based on the {@linkplain BackupStrategy} settings and execute
45794634
* the deletion in the cluster (including all its resources). It also truncates Kafka topics and Helix resources.
@@ -4599,7 +4654,25 @@ public void retireOldStoreVersions(
45994654
* Clamp to at least 1 since retrieveVersionsToDelete rejects values < 1.
46004655
*/
46014656
int numVersionToPreserve = Math.max(1, minNumberOfStoreVersionsToPreserve - (deleteBackupOnStartPush ? 1 : 0));
4602-
List<Version> versionsToDelete = store.retrieveVersionsToDelete(numVersionToPreserve);
4657+
List<Version> versionsToDelete = new ArrayList<>(store.retrieveVersionsToDelete(numVersionToPreserve));
4658+
4659+
// Include rollback-origin versions whose retention window has elapsed. retrieveVersionsToDelete
4660+
// deliberately skips ROLLED_BACK (handled by StoreBackupVersionCleanupService), but once past
4661+
// retention we can reclaim them at SOP as well.
4662+
long rolledBackRetentionExpiresAt =
4663+
store.getLatestVersionPromoteToCurrentTimestamp() + multiClusterConfigs.getRolledBackVersionRetentionMs();
4664+
if (System.currentTimeMillis() > rolledBackRetentionExpiresAt) {
4665+
int currentVersion = store.getCurrentVersion();
4666+
for (Version v: store.getVersions()) {
4667+
VersionStatus status = v.getStatus();
4668+
boolean isRollbackOrigin =
4669+
status == ROLLED_BACK || (status == PARTIALLY_ONLINE && v.getNumber() > currentVersion);
4670+
if (isRollbackOrigin && !versionsToDelete.contains(v)) {
4671+
versionsToDelete.add(v);
4672+
}
4673+
}
4674+
}
4675+
46034676
if (versionsToDelete.isEmpty()) {
46044677
return;
46054678
}

0 commit comments

Comments
 (0)