diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java new file mode 100644 index 00000000000..0b20a53a7bc --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.exception; + +/** + * CAS write to the legacy {@code /phoenix/ha} CRR znode failed (BadVersion or NodeExists); the + * caller can re-read and retry if needed. Analog of {@link StaleHAGroupStoreRecordVersionException}. + */ +public class StaleClusterRoleRecordVersionException extends Exception { + private static final long serialVersionUID = 1L; + + public StaleClusterRoleRecordVersionException(String msg) { + super(msg); + } + + public StaleClusterRoleRecordVersionException(String msg, Throwable cause) { + super(msg, cause); + } +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java index 0ba6b312d7b..327db3eaa5e 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Objects; import java.util.Optional; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; @@ -126,31 +127,50 @@ public enum RegistryType { private final long version; /** - * To handle backward compatibility with old ClusterRoleRecords which had zk1 and zk2 as keys for - * zk urls, This constructor is only being used {@link ClusterRoleRecord#fromJson} when we - * deserialize Cluster Role Record read from ZooKeeper ZNode. If CRR is in old format we will read - * zk1 and zk2 and url1 and url2 will be null and if it is in new format zk1 and zk2 will be null - * in both cases final url is being stored in url1 and url2 url will be stored in normalized forms - * which looks like zk1\\:port1,zk2\\:port2,zk3\\:port3, zk4\\:port4,zk5\\:port5::znode or - * master1\\:port1,master2\\:port2,master3\\:port3, master4\\:port4,master5\\:port5 + * Convenience constructor: defaults {@code registryType} to + * {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC). Use the explicit overload below to write + * {@link RegistryType#ZK} records for the legacy {@code /phoenix/ha} znode. * @param haGroupName HighAvailability Group name / CRR name - * @param policy Policy used by give CRR - * @param url1 ZK/HMaster url based on registry type for first cluster - * @param role1 {@link ClusterRole} which describes the current state of first cluster - * @param url2 ZK/HMaster url based on registry type for second cluster - * @param role2 {@link ClusterRole} which describes the current state of second cluster - * @param version version of a given CRR + * @param policy Policy used by the given CRR + * @param url1 ZK/HMaster url for the first cluster + * @param role1 {@link ClusterRole} describing the current state of the first cluster + * @param url2 ZK/HMaster url for the second cluster + * @param role2 {@link ClusterRole} describing the current state of the second cluster + * @param version monotonic version of this CRR + */ + public ClusterRoleRecord(String haGroupName, HighAvailabilityPolicy policy, String url1, + ClusterRole role1, String url2, ClusterRole role2, long version) { + this(haGroupName, policy, DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE, url1, role1, url2, role2, + version); + } + + /** + * Canonical constructor; also the {@code @JsonCreator} entry point so the persisted + * {@code registryType} round-trips correctly. Records persisted before {@code registryType} was + * added as a JSON field pass {@code null} here and default to + * {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC). URLs are normalized to the + * {@code registryType}-specific canonical form for accurate comparisons; the resulting + * {@code url1}/{@code url2} are stored as {@code zk1\:port1,zk2\:port2,...::znode} for ZK or + * {@code master1\:port1,master2\:port2,...} for RPC/MASTER. + * @param haGroupName HighAvailability Group name / CRR name + * @param policy Policy used by the given CRR + * @param registryType {@link RegistryType} for URL normalization; {@code null} defaults to + * {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC) + * @param url1 ZK/HMaster url for the first cluster + * @param role1 {@link ClusterRole} describing the current state of the first cluster + * @param url2 ZK/HMaster url for the second cluster + * @param role2 {@link ClusterRole} describing the current state of the second cluster + * @param version monotonic version of this CRR */ @JsonCreator public ClusterRoleRecord(@JsonProperty("haGroupName") String haGroupName, - @JsonProperty("policy") HighAvailabilityPolicy policy, @JsonProperty("url1") String url1, + @JsonProperty("policy") HighAvailabilityPolicy policy, + @JsonProperty("registryType") RegistryType registryType, @JsonProperty("url1") String url1, @JsonProperty("role1") ClusterRole role1, @JsonProperty("url2") String url2, @JsonProperty("role2") ClusterRole role2, @JsonProperty("version") long version) { this.haGroupName = haGroupName; this.policy = policy; - - // Default registry type is RPC from Consistent Cluster Failover onwards - this.registryType = DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE; + this.registryType = registryType != null ? registryType : DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE; // Do we really need to normalize here ? // We are normalizing to have urls in specific formats for each registryType for getting @@ -221,6 +241,20 @@ public boolean hasSameInfo(ClusterRoleRecord other) { return haGroupName.equals(other.haGroupName) && policy.equals(other.policy); } + /** + * Equality on the six identity/role fields ({@code haGroupName, policy, url1, url2, role1, + * role2}); ignores {@code version} (always bumps) and {@code registryType} (avoids RPC->ZK + * thrash). Returns {@code false} if {@code other} is {@code null}. + */ + public boolean isLogicallyEqualIgnoringVersionAndRegistry(ClusterRoleRecord other) { + if (other == null) { + return false; + } + return Objects.equals(haGroupName, other.haGroupName) && Objects.equals(policy, other.policy) + && Objects.equals(url1, other.url1) && Objects.equals(url2, other.url2) + && role1 == other.role1 && role2 == other.role2; + } + /** Returns true if CRR has any url in UNKNOWN role/state. */ public boolean hasUnknownRole() { return role1 == ClusterRole.UNKNOWN || role2 == ClusterRole.UNKNOWN; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java index fe299d14fbb..c25336a3d7d 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_1; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_2; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_URL_1; @@ -34,7 +35,11 @@ import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl; import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath; import static org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_SYNC_INTERVAL_SECONDS; +import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS; +import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK; @@ -62,11 +67,13 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.exception.InvalidClusterRoleTransitionException; +import org.apache.phoenix.exception.StaleClusterRoleRecordVersionException; import org.apache.phoenix.exception.StaleHAGroupStoreRecordVersionException; import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; import org.apache.phoenix.jdbc.ClusterRoleRecord.RegistryType; @@ -80,6 +87,7 @@ import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.MoreExecutors; /** * Main implementation of HAGroupStoreClient with peer support. Write-through cache for HAGroupStore @@ -98,8 +106,13 @@ public class HAGroupStoreClient implements Closeable { public static final double ZK_SESSION_TIMEOUT_MULTIPLIER = 1.1; // Maximum jitter in seconds for sync job start time (10 seconds) private static final long SYNC_JOB_MAX_JITTER_SECONDS = 10; + // Exclusive upper bound for initial-delay jitter on the periodic reconciler (0..30s). + private static final long LEGACY_CRR_SYNC_JOB_MAX_JITTER_SECONDS = 31; private PhoenixHAAdmin phoenixHaAdmin; private PhoenixHAAdmin peerPhoenixHaAdmin; + // Admin + NodeCache on /phoenix/ha; null when feature disabled. + private volatile PhoenixHAAdmin legacyHaAdmin; + private volatile NodeCache legacyCrrNodeCache; private static final Logger LOGGER = LoggerFactory.getLogger(HAGroupStoreClient.class); // Map of > private static final Map> instances = @@ -132,6 +145,10 @@ public class HAGroupStoreClient implements Closeable { CopyOnWriteArraySet> targetStateSubscribers = new ConcurrentHashMap<>(); // Scheduled executor for periodic sync job private ScheduledExecutorService syncExecutor; + // Legacy CRR sync state. + private final boolean legacyCrrSyncEnabled; + private final Object legacyCrrSyncLock = new Object(); + private volatile ScheduledExecutorService legacyCrrSyncExecutor; public static HAGroupStoreClient getInstance(Configuration conf, String haGroupName) throws SQLException { @@ -224,6 +241,8 @@ public static List getHAGroupNames(String zkUrl) throws SQLException { conf.getLong(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT) * ZK_SESSION_TIMEOUT_MULTIPLIER); this.rotationTimeMs = conf.getLong(QueryServices.REPLICATION_LOG_ROTATION_TIME_MS_KEY, QueryServicesOptions.DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS); + this.legacyCrrSyncEnabled = conf.getBoolean(PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED, + DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED); // Custom Event Listener this.peerCustomPathChildrenCacheListener = peerPathChildrenCacheListener; try { @@ -246,6 +265,18 @@ public static List getHAGroupNames(String zkUrl) throws SQLException { // Start periodic sync job startPeriodicSyncJob(); + // Opt-in legacy /phoenix/ha sync. Setup failures propagate so the client is marked + // unhealthy rather than silently dropping the legacy znode out of sync. + if (legacyCrrSyncEnabled && this.isHealthy) { + this.legacyHaAdmin = + new PhoenixHAAdmin(this.zkUrl, conf, PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE); + this.legacyCrrNodeCache = + new NodeCache(this.legacyHaAdmin.getCurator(), toPath(haGroupName)); + this.legacyCrrNodeCache.start(true); + syncLegacyCRRIfRoleChanged(); + startLegacyCrrReconciliation(); + } + } catch (Exception e) { this.isHealthy = false; close(); @@ -886,9 +917,11 @@ private PathChildrenCacheListener createCacheListener(CountDownLatch latch, if (cacheType == ClusterType.LOCAL) { maybeInitializePeerPathChildrenCache(); } + syncLegacyCRRIfRoleChanged(); } break; case CHILD_REMOVED: + // No-op: the legacy /phoenix/ha znode is never deleted by this client. break; case INITIALIZED: latch.countDown(); @@ -985,30 +1018,41 @@ private void closePeerConnection() { */ private void shutdownSyncExecutor() { if (syncExecutor != null) { - syncExecutor.shutdown(); - try { - if (!syncExecutor.awaitTermination(5, TimeUnit.SECONDS)) { - syncExecutor.shutdownNow(); - } - } catch (InterruptedException e) { - syncExecutor.shutdownNow(); - Thread.currentThread().interrupt(); - } + MoreExecutors.shutdownAndAwaitTermination(syncExecutor, 5, TimeUnit.SECONDS); syncExecutor = null; } } + private void shutdownLegacyCrrSyncExecutor() { + if (legacyCrrSyncExecutor != null) { + MoreExecutors.shutdownAndAwaitTermination(legacyCrrSyncExecutor, 5, TimeUnit.SECONDS); + legacyCrrSyncExecutor = null; + } + } + @Override public void close() { try { LOGGER.info("Closing HAGroupStoreClient"); - // Shutdown sync executor + // Executors -> caches -> admins. Null-before-close on legacy resources so a racing + // listener sees either a live or null reference, never half-closed. shutdownSyncExecutor(); + shutdownLegacyCrrSyncExecutor(); if (pathChildrenCache != null) { pathChildrenCache.close(); pathChildrenCache = null; } closePeerConnection(); + NodeCache nodeCache = this.legacyCrrNodeCache; + this.legacyCrrNodeCache = null; + if (nodeCache != null) { + nodeCache.close(); + } + PhoenixHAAdmin admin = this.legacyHaAdmin; + this.legacyHaAdmin = null; + if (admin != null) { + admin.close(); + } LOGGER.info("Closed HAGroupStoreClient"); } catch (IOException e) { LOGGER.error("Exception closing HAGroupStoreClient", e); @@ -1047,6 +1091,121 @@ private long validateTransitionAndGetWaitTime(HAGroupStoreRecord.HAGroupState cu return Math.max(0, remainingTime); } + // ========== Legacy /phoenix/ha CRR Sync ========== + + /** + * Derives the combined CRR from local + peer records and CAS-writes it to {@code /phoenix/ha}. + * CAS losses are logged and skipped; the NodeCache watcher and next periodic cycle reconverge. + */ + private void syncLegacyCRRIfRoleChanged() { + if (!legacyCrrSyncEnabled) { + return; + } + if (!isHealthy || legacyHaAdmin == null || legacyCrrNodeCache == null) { + LOGGER.debug( + "Skipping legacy CRR sync for HA group {}: isHealthy={}, legacyHaAdmin={}, " + + "legacyCrrNodeCache={}", + haGroupName, isHealthy, legacyHaAdmin != null, legacyCrrNodeCache != null); + return; + } + synchronized (legacyCrrSyncLock) { + try { + HAGroupStoreRecord local = getHAGroupStoreRecord(); + if (local == null) { + LOGGER.debug("Skipping legacy CRR sync for HA group {}: no local consistentHA record", + haGroupName); + return; + } + HAGroupStoreRecord peer = getHAGroupStoreRecordFromPeer(); + // NodeCache is eventually consistent; on apparent absence, fall back to an + // authoritative ZK read so the equality check and CAS both see consistent state. + Pair snapshot = readLegacyCrrSnapshot(); + if (snapshot.getRight() == null) { + snapshot = legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName); + } + ClusterRoleRecord existing = snapshot.getLeft(); + Stat existingStat = snapshot.getRight(); + ClusterRoleRecord desired = buildDesiredLegacyCrr(local, peer, existing); + if (desired.isLogicallyEqualIgnoringVersionAndRegistry(existing)) { + LOGGER.debug("Legacy CRR for HA group {} already up to date at version {}", haGroupName, + existing.getVersion()); + return; + } + try { + if (existingStat == null) { + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired, + PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, /* ignored */ 0); + } else { + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired, + PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, existingStat.getVersion()); + } + LOGGER.info("Synced legacy CRR for HA group {} (version {} -> {})", haGroupName, + existing != null ? existing.getVersion() : -1L, desired.getVersion()); + } catch (StaleClusterRoleRecordVersionException stale) { + // CAS lost; next event/periodic cycle reconverges. + LOGGER.info("Legacy CRR CAS lost for HA group {} at expected stat version {}", + haGroupName, existingStat != null ? existingStat.getVersion() : -1); + } + } catch (Exception e) { + LOGGER.warn( + "Legacy CRR sync failed for HA group {}; will be retried by next event/periodic cycle", + haGroupName, e); + } + } + } + + /** Builds the desired legacy CRR, always stamped {@link RegistryType#ZK}. */ + private ClusterRoleRecord buildDesiredLegacyCrr(HAGroupStoreRecord local, HAGroupStoreRecord peer, + ClusterRoleRecord existing) { + ClusterRole role2 = (peer != null) ? peer.getClusterRole() : ClusterRole.UNKNOWN; + long peerAdminVersion = (peer != null) ? peer.getAdminCRRVersion() : 0L; + long baseVersion = Math.max(existing != null ? existing.getVersion() : 0L, + Math.max(local.getAdminCRRVersion(), peerAdminVersion)); + return new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.valueOf(local.getPolicy()), + RegistryType.ZK, this.zkUrl, local.getClusterRole(), local.getPeerZKUrl(), role2, + baseVersion + 1); + } + + /** + * NodeCache snapshot of the legacy CRR. {@code (null, null)} on cache miss; callers must confirm + * absence with an authoritative ZK read since the cache is eventually consistent. + */ + private Pair readLegacyCrrSnapshot() { + ChildData current = legacyCrrNodeCache.getCurrentData(); + if (current == null) { + return Pair.of(null, null); + } + ClusterRoleRecord record = ClusterRoleRecord.fromJson(current.getData()).orElse(null); + return Pair.of(record, current.getStat()); + } + + /** Schedules the periodic reconciler; no-op when {@code intervalSec <= 0}. */ + private void startLegacyCrrReconciliation() { + long intervalSec = conf.getLong(PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS, + DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS); + if (intervalSec <= 0) { + LOGGER.info("Legacy CRR periodic reconciliation disabled (interval={}s) for HA group {}", + intervalSec, haGroupName); + return; + } + legacyCrrSyncExecutor = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "HAGroupStoreClient-LegacyCRRSync-" + haGroupName); + t.setDaemon(true); + return t; + }); + long jitterSec = + ThreadLocalRandom.current().nextLong(0, LEGACY_CRR_SYNC_JOB_MAX_JITTER_SECONDS); + LOGGER.info("Starting legacy CRR reconciliation for HA group {} with initial delay {}s, " + + "then every {}s", haGroupName, jitterSec, intervalSec); + legacyCrrSyncExecutor.scheduleAtFixedRate(() -> { + try { + syncLegacyCRRIfRoleChanged(); + } catch (Throwable t) { + LOGGER.warn("Periodic legacy CRR reconciliation failed for HA group {}", haGroupName, t); + } + }, jitterSec, intervalSec, TimeUnit.SECONDS); + } + // ========== HA Group State Change Subscription Methods ========== /** diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java index f0fa7189520..ad15607201c 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java @@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.phoenix.exception.StaleClusterRoleRecordVersionException; import org.apache.phoenix.exception.StaleHAGroupStoreRecordVersionException; import org.apache.phoenix.util.JDBCUtil; import org.apache.zookeeper.CreateMode; @@ -83,6 +84,22 @@ public CuratorFramework getCurator(String zkUrl, Properties properties, String n private static final Logger LOG = LoggerFactory.getLogger(PhoenixHAAdmin.class); + /** ZK's wildcard for {@code setData()/delete()}: bypasses version check. */ + static final int ZK_MATCH_ANY_VERSION = -1; + + /** + * Write mode for {@link #createOrUpdateClusterRoleRecordWithCAS}. The accompanying + * {@code expectedStatVersion} argument is interpreted only for {@link #CAS_WITH_VERSION}. + */ + public enum LegacyCrrWriteMode { + /** Create the znode; no prior version expected. */ + CREATE_NEW, + /** Unconditional overwrite (no CAS). For operator/migration tooling only. */ + FORCE_OVERWRITE, + /** CAS update; {@code expectedStatVersion} must be {@code >= 0}. */ + CAS_WITH_VERSION + } + /** The fully qualified ZK URL for an HBase cluster in format host:port:/hbase */ private final String zkUrl; /** Configuration of this command line tool. */ @@ -524,17 +541,18 @@ public void updateHAGroupStoreRecordInZooKeeper(String haGroupName, } /** - * Gets the HAGroupStoreRecord and Stat from ZooKeeper. + * Gets the HAGroupStoreRecord and Stat from ZooKeeper. Reads (record, stat) atomically via + * {@code storingStatIn} so the returned stat version always corresponds to the returned bytes. * @param haGroupName the HA group name - * @return a pair of HAGroupStoreRecord and Stat + * @return a pair of HAGroupStoreRecord and Stat; both {@code null} if the znode does not exist * @throws IOException if any error occurs during the retrieval */ public Pair getHAGroupStoreRecordInZooKeeper(String haGroupName) throws IOException { try { - byte[] data = getCurator().getData().forPath(toPath(haGroupName)); + Stat stat = new Stat(); + byte[] data = getCurator().getData().storingStatIn(stat).forPath(toPath(haGroupName)); HAGroupStoreRecord record = HAGroupStoreRecord.fromJson(data).orElse(null); - Stat stat = getCurator().checkExists().forPath(toPath(haGroupName)); return Pair.of(record, stat); } catch (KeeperException.NoNodeException nne) { LOG.warn("No HAGroupStoreRecord for HA group {} in ZK", haGroupName, nne); @@ -559,6 +577,71 @@ public void deleteHAGroupStoreRecordInZooKeeper(String haGroupName) throws IOExc } } + // ----- Legacy /phoenix/ha ClusterRoleRecord sync helpers ----- + + /** + * Atomic read of (record, stat) on the legacy CRR znode. Returns {@code (null, null)} if the + * znode does not exist. + */ + public Pair getClusterRoleRecordAndStatInZooKeeper(String haGroupName) + throws IOException { + try { + Stat stat = new Stat(); + byte[] data = getCurator().getData().storingStatIn(stat).forPath(toPath(haGroupName)); + ClusterRoleRecord record = ClusterRoleRecord.fromJson(data).orElse(null); + return Pair.of(record, stat); + } catch (KeeperException.NoNodeException nne) { + return Pair.of(null, null); + } catch (Exception e) { + LOG.error("Failed to get ClusterRoleRecord for HA group {}", haGroupName, e); + throw new IOException("Failed to get ClusterRoleRecord for HA group " + haGroupName, e); + } + } + + /** + * Writes {@code newRecord} per {@code mode}. {@code expectedStatVersion} is used only for + * {@link LegacyCrrWriteMode#CAS_WITH_VERSION} (must be {@code >= 0}). Both BadVersion and + * NodeExists surface as {@link StaleClusterRoleRecordVersionException}. + */ + public void createOrUpdateClusterRoleRecordWithCAS(String haGroupName, + ClusterRoleRecord newRecord, LegacyCrrWriteMode mode, int expectedStatVersion) + throws IOException, StaleClusterRoleRecordVersionException { + Preconditions.checkNotNull(mode, "mode"); + if (mode == LegacyCrrWriteMode.CAS_WITH_VERSION) { + Preconditions.checkArgument(expectedStatVersion >= 0, + "CAS_WITH_VERSION requires expectedStatVersion >= 0; got " + expectedStatVersion); + } + try { + byte[] data = ClusterRoleRecord.toJson(newRecord); + switch (mode) { + case CREATE_NEW: + getCurator().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) + .forPath(toPath(haGroupName), data); + break; + case FORCE_OVERWRITE: + getCurator().setData().withVersion(ZK_MATCH_ANY_VERSION).forPath(toPath(haGroupName), + data); + break; + case CAS_WITH_VERSION: + getCurator().setData().withVersion(expectedStatVersion).forPath(toPath(haGroupName), + data); + break; + default: + throw new IllegalStateException("Unhandled LegacyCrrWriteMode: " + mode); + } + } catch (KeeperException.BadVersionException e) { + throw new StaleClusterRoleRecordVersionException( + "CAS failed for HA group " + haGroupName + " at expectedStatVersion " + expectedStatVersion, + e); + } catch (KeeperException.NodeExistsException e) { + throw new StaleClusterRoleRecordVersionException( + "Create failed for HA group " + haGroupName + ": node already exists", e); + } catch (Exception e) { + LOG.error("Failed to write ClusterRoleRecord for HA group {}", haGroupName, e); + throw new IOException("Failed to write ClusterRoleRecord for HA group " + haGroupName, e); + } + } + public String getZkUrl() { return zkUrl; } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index 72075853aa8..8d4a8e4c7ff 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -647,6 +647,15 @@ public interface QueryServices extends SQLCloseable { // HA Group Store sync job interval in seconds String HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = "phoenix.ha.group.store.sync.interval.seconds"; + // Master switch for syncing the legacy /phoenix/ha CRR from /phoenix/consistentHA. + // When false, no legacy znode is read, written, or deleted by HAGroupStoreClient. + String PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED = "phoenix.ha.legacy.crr.sync.enabled"; + + // Periodic reconciliation interval for the legacy /phoenix/ha CRR sync, in seconds. + // 0 disables the periodic loop only; event-driven sync still runs. + String PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS = + "phoenix.ha.legacy.crr.reconciliation.interval.seconds"; + String REPLICATION_LOG_ROTATION_TIME_MS_KEY = "phoenix.replication.log.rotation.time.ms"; /** diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 339b6763a45..3e99b9fff68 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -92,6 +92,8 @@ import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK; import static org.apache.phoenix.query.QueryServices.PHOENIX_ACLS_ENABLED; +import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS; +import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED; import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME; import static org.apache.phoenix.query.QueryServices.QUEUE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.REGIONSERVER_INFO_PORT_ATTRIB; @@ -517,6 +519,13 @@ public class QueryServicesOptions { // Default HA Group Store sync job interval in seconds (15 minutes = 900 seconds) public static final int DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = 900; + // Legacy /phoenix/ha CRR sync is opt-in (default off). + public static final boolean DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED = false; + + // Periodic reconciliation interval for legacy /phoenix/ha CRR sync, in seconds. + // 0 disables the periodic loop only; event-driven sync still runs. + public static final long DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS = 60L; + public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 * 1000L; private final Configuration config; @@ -643,7 +652,10 @@ public static QueryServicesOptions withDefaults() { .setIfUnset(HA_GROUP_STORE_SYNC_INTERVAL_SECONDS, DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS) .setIfUnset(HA_GROUP_STORE_CLIENT_PREWARM_ENABLED, - DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED); + DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED) + .setIfUnset(PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED, DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED) + .setIfUnset(PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS, + DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS); // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index edb38da970c..1f57187a2ca 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -4738,9 +4738,9 @@ public MutationState addColumn(PTable table, List origColumnDefs, /** * To check if TTL is defined at any of the child below we are checking it at * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl#mutateColumn(List, ColumnMutator, int, PTable, PTable, boolean)} - * level where in function - * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl# validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[], byte[], byte[], List, int)} - * we are already traversing through allDescendantViews. + * level where in function {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl# + * validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[], byte[], + * byte[], List, int)} we are already traversing through allDescendantViews. */ } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java index ba667876740..a532cc5f174 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java @@ -18,9 +18,12 @@ package org.apache.phoenix.jdbc; import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_HA_GROUP_NAME; import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl; import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath; +import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS; +import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED; import static org.apache.phoenix.replication.reader.ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK; @@ -47,13 +50,16 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.exception.InvalidClusterRoleTransitionException; +import org.apache.phoenix.exception.StaleClusterRoleRecordVersionException; import org.apache.phoenix.util.HAGroupStoreTestUtil; +import org.apache.phoenix.util.JDBCUtil; import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.Before; @@ -72,6 +78,8 @@ public class HAGroupStoreClientIT extends HABaseIT { private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 5000L; private PhoenixHAAdmin haAdmin; private PhoenixHAAdmin peerHaAdmin; + // Admin on the legacy /phoenix/ha namespace; used to inspect/seed/corrupt the legacy znode. + private PhoenixHAAdmin legacyHaAdmin; private String zkUrl; private String peerZKUrl; private String masterUrl; @@ -95,8 +103,11 @@ public void before() throws Exception { ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE); peerHaAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(), ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE); + legacyHaAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(), + PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE); haAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); peerHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); + legacyHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); zkUrl = getLocalZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration()); // Clean existing records in system table List haGroupNames = HAGroupStoreClient.getHAGroupNames(zkUrl); @@ -117,8 +128,10 @@ public void before() throws Exception { public void after() throws Exception { haAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); peerHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); + legacyHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); haAdmin.close(); peerHaAdmin.close(); + legacyHaAdmin.close(); } @Test @@ -1203,4 +1216,469 @@ public void testPeriodicSyncJobExecutorStartsAndSyncsData() throws Exception { assertTrue("Sync executor should be shutdown after close", syncExecutor.isShutdown()); } } + + // ============================================================================================ + // Legacy /phoenix/ha CRR sync tests + // Verify feature-flag gating, derivation, monotonic version, registry-type preservation, + // deletion mirroring, and short-circuit behavior of HAGroupStoreClient's legacy sync path. + // ============================================================================================ + + @Test + public void testLegacyCrrSyncFeatureOffByDefault_NoLegacyZnodeWritten() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(/* legacyEnabled */ false, /* periodicSec */ 60); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + Pair legacy = readLegacyCrr(haGroupName); + assertNull("Legacy CRR must not exist when feature is off", legacy.getLeft()); + assertNull("Legacy znode stat must be null when feature is off", legacy.getRight()); + } + + @Test + public void testLegacyCrrSyncFeatureOn_InitialSyncCreatesZkRegistryLegacyZnode() + throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 60); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + Pair legacy = awaitLegacyCrrPresent(haGroupName); + ClusterRoleRecord crr = legacy.getLeft(); + assertEquals("Legacy CRR must use ZK registry type for backward compatibility", + ClusterRoleRecord.RegistryType.ZK, crr.getRegistryType()); + assertEquals(haGroupName, crr.getHaGroupName()); + assertEquals(HighAvailabilityPolicy.FAILOVER, crr.getPolicy()); + // Local cluster role is ACTIVE per System Table seed. + assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE, + crr.getRole(formattedZkUrlFor(ClusterType.LOCAL))); + assertTrue("CRR version must be > 0 after initial sync", crr.getVersion() > 0); + } + + @Test + public void testLegacyCrrSyncRoleChangePropagatesAndIsNewerThanWorks() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 60); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + ClusterRoleRecord initial = awaitLegacyCrrPresent(haGroupName).getLeft(); + long initialVersion = initial.getVersion(); + + // ACTIVE_IN_SYNC -> ACTIVE_IN_SYNC_TO_STANDBY (role change ACTIVE -> ACTIVE_TO_STANDBY). + client.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY); + Pair updated = awaitLegacyCrrRole(haGroupName, ClusterType.LOCAL, + ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY); + ClusterRoleRecord updatedCRR = updated.getLeft(); + assertTrue("Version must monotonically increase after role change", + updatedCRR.getVersion() > initialVersion); + assertTrue("isNewerThan must return true for the updated record", + updatedCRR.isNewerThan(initial)); + assertEquals(ClusterRoleRecord.RegistryType.ZK, updatedCRR.getRegistryType()); + } + + @Test + public void testLegacyCrrSyncStateOnlyChangeDoesNotRewriteLegacy() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 60); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + Pair initial = awaitLegacyCrrPresent(haGroupName); + int initialZkVersion = initial.getRight().getVersion(); + long initialCrrVersion = initial.getLeft().getVersion(); + + // ACTIVE_IN_SYNC -> ACTIVE_NOT_IN_SYNC: ClusterRole stays ACTIVE. + client.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + Pair after = readLegacyCrr(haGroupName); + assertNotNull(after.getLeft()); + assertEquals("Legacy CRR ZK stat version must not change on state-only transitions", + initialZkVersion, after.getRight().getVersion()); + assertEquals("Legacy CRR logical version must not change on state-only transitions", + initialCrrVersion, after.getLeft().getVersion()); + } + + /** LOCAL CHILD_REMOVED on consistentHA does not delete the legacy znode. */ + @Test + public void testLegacyCrrSyncLocalChildRemovedDoesNotDeleteLegacy() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 60); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + Pair initial = awaitLegacyCrrPresent(haGroupName); + long initialVersion = initial.getLeft().getVersion(); + + haAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName); + + // Wait long enough for any potential event-driven delete to have fired. + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + Pair after = readLegacyCrr(haGroupName); + assertNotNull("Legacy znode must NOT be deleted on LOCAL CHILD_REMOVED", after.getLeft()); + assertTrue("Legacy CRR version must not regress after LOCAL CHILD_REMOVED", + after.getLeft().getVersion() >= initialVersion); + } + + @Test + public void testLegacyCrrSyncPeriodicDisabledStillSyncsViaEvents() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 0); // periodic disabled + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + ClusterRoleRecord initial = awaitLegacyCrrPresent(haGroupName).getLeft(); + assertEquals("Initial local role should be ACTIVE per @Before seed", + ClusterRoleRecord.ClusterRole.ACTIVE, initial.getRole(formattedZkUrlFor(ClusterType.LOCAL))); + assertEquals("Initial registry type must be ZK", ClusterRoleRecord.RegistryType.ZK, + initial.getRegistryType()); + + client.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY); + ClusterRoleRecord updated = awaitLegacyCrrRole(haGroupName, ClusterType.LOCAL, + ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).getLeft(); + assertTrue("Updated version must monotonically advance past the initial version", + updated.getVersion() > initial.getVersion()); + assertTrue("isNewerThan must return true for the post-event record", + updated.isNewerThan(initial)); + assertEquals("Registry type must remain ZK after an event-driven sync", + ClusterRoleRecord.RegistryType.ZK, updated.getRegistryType()); + } + + /** PEER CHILD_REMOVED on consistentHA does not delete the legacy znode. */ + @Test + public void testLegacyCrrSyncPeerChildRemovedDoesNotDeleteLegacy() throws Exception { + String haGroupName = testName.getMethodName(); + // Seed a peer record so that PEER cache initializes and PEER CHILD_REMOVED can fire later. + HAGroupStoreRecord peerRecord = + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, this.peerMasterUrl, this.masterUrl, + CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L); + createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, peerRecord); + + Configuration conf = legacyCrrConf(true, 0); // periodic disabled to isolate event behavior + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + Pair initial = awaitLegacyCrrPresent(haGroupName); + long initialCrrVersion = initial.getLeft().getVersion(); + + peerHaAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + Pair after = readLegacyCrr(haGroupName); + assertNotNull("Legacy znode must NOT be deleted on PEER CHILD_REMOVED", after.getLeft()); + assertTrue("Legacy CRR version must not regress after PEER CHILD_REMOVED", + after.getLeft().getVersion() >= initialCrrVersion); + } + + /** + * Each {@link PhoenixHAAdmin.LegacyCrrWriteMode}: error mapping (BadVersion + NodeExists -> + * {@link StaleClusterRoleRecordVersionException}), unconditional FORCE_OVERWRITE, and + * CAS_WITH_VERSION rejecting negative versions. Sequential: ZK serializes versioned writes + * server-side, so the client retry path is identical to a real race. + */ + @Test + public void testLegacyCrrCasErrorMappingAndModeDispatch() throws Exception { + String haGroupName = testName.getMethodName(); + + // Create. + ClusterRoleRecord initial = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.ACTIVE, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.STANDBY, 1L); + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, initial, + PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, 0); + + Pair existing = + legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName); + assertNotNull(existing.getLeft()); + int sharedVersion = existing.getRight().getVersion(); + + // CAS winner. + ClusterRoleRecord writerA = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, + ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, this.peerZKUrl, + ClusterRoleRecord.ClusterRole.STANDBY, 2L); + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, writerA, + PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, sharedVersion); + + // CAS loser: same expected version -> BadVersion -> StaleClusterRoleRecordVersionException. + ClusterRoleRecord writerB = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.STANDBY, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.ACTIVE, 2L); + assertThrows(StaleClusterRoleRecordVersionException.class, + () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, writerB, + PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, sharedVersion)); + + Pair winner = + legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName); + assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, + winner.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + assertTrue(winner.getRight().getVersion() > sharedVersion); + + // CREATE_NEW on an existing znode -> NodeExists -> StaleClusterRoleRecordVersionException. + ClusterRoleRecord raceCreate = + new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.STANDBY, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.STANDBY, 3L); + assertThrows(StaleClusterRoleRecordVersionException.class, + () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, raceCreate, + PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, 0)); + + // FORCE_OVERWRITE bypasses CAS and bumps the stat version. + Stat statBeforeOverwrite = + legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName).getRight(); + ClusterRoleRecord overwrite = + new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.STANDBY, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.STANDBY, 4L); + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, overwrite, + PhoenixHAAdmin.LegacyCrrWriteMode.FORCE_OVERWRITE, 0); + Pair afterOverwrite = + legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName); + assertEquals(ClusterRoleRecord.ClusterRole.STANDBY, + afterOverwrite.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + assertTrue(afterOverwrite.getRight().getVersion() > statBeforeOverwrite.getVersion()); + + // CAS_WITH_VERSION rejects negative expectedStatVersion before any ZK call. + ClusterRoleRecord illegalRecord = + new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.OFFLINE, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.OFFLINE, 5L); + assertThrows(IllegalArgumentException.class, + () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, illegalRecord, + PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, -1)); + assertThrows(IllegalArgumentException.class, + () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, illegalRecord, + PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, Integer.MIN_VALUE)); + } + + /** Peer-side role flip propagates to role2 in the local legacy CRR. */ + @Test + public void testLegacyCrrSyncPeerRoleFlipUpdatesLegacyRole2() throws Exception { + String haGroupName = testName.getMethodName(); + // Seed peer with STANDBY before client starts so the initial sync sees role2=STANDBY. + HAGroupStoreRecord peerStandby = + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, this.peerMasterUrl, this.masterUrl, + CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L); + createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, peerStandby); + + Configuration conf = legacyCrrConf(true, 0); // periodic disabled to isolate event-driven path + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + awaitLegacyCrrRole(haGroupName, ClusterType.PEER, ClusterRoleRecord.ClusterRole.STANDBY); + + // Flip the peer record to a state whose cluster role is ACTIVE_TO_STANDBY. + HAGroupStoreRecord peerFlipped = new HAGroupStoreRecord("v1.0", haGroupName, + HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, this.peerMasterUrl, this.masterUrl, + CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L); + createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, peerFlipped); + + Pair after = awaitLegacyCrrRole(haGroupName, ClusterType.PEER, + ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY); + assertEquals("Registry type must remain ZK after a peer-driven role flip", + ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType()); + } + + /** Absent peer record yields role2=UNKNOWN; converges when the peer record appears. */ + @Test + public void testLegacyCrrSyncPeerAbsentYieldsUnknownAndConvergesOnRecovery() throws Exception { + String haGroupName = testName.getMethodName(); + // No peer record seeded: peer cache is empty so getHAGroupStoreRecordFromPeer() returns null + // and role2 falls through to UNKNOWN. + Configuration conf = legacyCrrConf(true, 0); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + Pair initial = + awaitLegacyCrrRole(haGroupName, ClusterType.PEER, ClusterRoleRecord.ClusterRole.UNKNOWN); + long initialVersion = initial.getLeft().getVersion(); + + // Peer "recovers" by writing its consistentHA record. The PEER CHILD_ADDED event triggers + // the legacy sync to update role2. + HAGroupStoreRecord peerRecord = + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, this.peerMasterUrl, this.masterUrl, + CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L); + createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, peerRecord); + + Pair recovered = + awaitLegacyCrrRole(haGroupName, ClusterType.PEER, ClusterRoleRecord.ClusterRole.STANDBY); + assertTrue("Version must bump when role2 transitions UNKNOWN -> STANDBY", + recovered.getLeft().getVersion() > initialVersion); + } + + /** registryType stays ZK across multiple sync cycles (never reverts to RPC). */ + @Test + public void testLegacyCrrSyncRegistryTypePreservedAcrossMultipleCycles() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 0); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + Pair initial = awaitLegacyCrrPresent(haGroupName); + assertEquals(ClusterRoleRecord.RegistryType.ZK, initial.getLeft().getRegistryType()); + assertEquals("Initial local role should be ACTIVE per @Before seed", + ClusterRoleRecord.ClusterRole.ACTIVE, + initial.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + long lastVersion = initial.getLeft().getVersion(); + + // Drive a sequence of distinct peer states; each event drives a sync that rewrites the + // legacy znode (or short-circuits if logically equal). Direct ZK writes intentionally + // bypass setHAGroupStatusIfNeeded's transition guard. + HAGroupStoreRecord.HAGroupState[] cycle = + new HAGroupStoreRecord.HAGroupState[] { HAGroupStoreRecord.HAGroupState.STANDBY, + HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, + HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE, HAGroupStoreRecord.HAGroupState.OFFLINE, + HAGroupStoreRecord.HAGroupState.STANDBY }; + for (HAGroupStoreRecord.HAGroupState state : cycle) { + HAGroupStoreRecord peer = new HAGroupStoreRecord("v1.0", haGroupName, state, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, this.peerMasterUrl, this.masterUrl, + CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L); + createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, peer); + Pair after = + awaitLegacyCrrRole(haGroupName, ClusterType.PEER, state.getClusterRole()); + assertEquals( + "Local role must remain ACTIVE across peer-driven cycles (peer state=" + state + ")", + ClusterRoleRecord.ClusterRole.ACTIVE, + after.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + assertEquals("Registry type must remain ZK after a sync cycle (peer state=" + state + ")", + ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType()); + assertTrue( + "Logical version must monotonically increase across distinct sync cycles (peer state=" + + state + ")", + after.getLeft().getVersion() > lastVersion); + lastVersion = after.getLeft().getVersion(); + } + } + + /** Periodic loop repairs an external divergence with no consistentHA event. */ + @Test + public void testLegacyCrrSyncPeriodicReconciliationRecoversAfterDivergence() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 2); // 2s interval; jitter is 0-30s on first run + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + Pair initial = awaitLegacyCrrPresent(haGroupName); + assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE, + initial.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + + // Externally corrupt the legacy znode; no consistentHA event fires, so only the periodic + // reconciler can recover. + ClusterRoleRecord corrupt = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.STANDBY, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.ACTIVE, initial.getLeft().getVersion() + 10); + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, corrupt, + PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, initial.getRight().getVersion()); + Pair corrupted = readLegacyCrr(haGroupName); + assertEquals(ClusterRoleRecord.ClusterRole.STANDBY, + corrupted.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + + // Worst-case wait: jitter up to 30s + 2s interval; allow 40s. + long deadline = System.currentTimeMillis() + 40_000L; + Pair after = readLegacyCrr(haGroupName); + while ( + (after.getLeft() == null || after.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL)) + != ClusterRoleRecord.ClusterRole.ACTIVE) + && System.currentTimeMillis() < deadline + ) { + Thread.sleep(500); + after = readLegacyCrr(haGroupName); + } + assertNotNull(after.getLeft()); + assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE, + after.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + assertTrue(after.getLeft().getVersion() > corrupted.getLeft().getVersion()); + assertEquals(ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType()); + } + + /** Client overwrites a pre-seeded, logically-stale legacy znode on initial sync. */ + @Test + public void testLegacyCrrSyncOverwritesPreSeededLogicallyStaleZnode() throws Exception { + String haGroupName = testName.getMethodName(); + // Pre-seed role2=OFFLINE; desired CRR is role2=UNKNOWN (no peer record), so logically stale. + ClusterRoleRecord preSeed = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.ACTIVE, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.OFFLINE, 5L); + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, preSeed, + PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, 0); + Pair seeded = readLegacyCrr(haGroupName); + assertNotNull(seeded.getLeft()); + assertEquals(ClusterRoleRecord.ClusterRole.OFFLINE, + seeded.getLeft().getRole(formattedZkUrlFor(ClusterType.PEER))); + + Configuration conf = legacyCrrConf(true, 0); // periodic disabled; rely on initial sync only + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + // Wait for the initial sync to converge role2 away from OFFLINE. + long deadline = System.currentTimeMillis() + ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS; + Pair after = readLegacyCrr(haGroupName); + while ( + after.getLeft() != null && after.getLeft().getRole(formattedZkUrlFor(ClusterType.PEER)) + == ClusterRoleRecord.ClusterRole.OFFLINE + && System.currentTimeMillis() < deadline + ) { + Thread.sleep(100); + after = readLegacyCrr(haGroupName); + } + assertNotNull(after.getLeft()); + assertEquals(ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType()); + assertTrue(after.getLeft().getVersion() > seeded.getLeft().getVersion()); + // role2 must no longer be OFFLINE (UNKNOWN vs STANDBY both acceptable; depends on peer + // cache init timing). + assertFalse(after.getLeft().getRole(formattedZkUrlFor(ClusterType.PEER)) + == ClusterRoleRecord.ClusterRole.OFFLINE); + } + + // ---------- Legacy CRR sync test helpers ---------- + + /** Configuration clone with the legacy CRR flag and reconciliation interval set. */ + private Configuration legacyCrrConf(boolean legacyEnabled, long periodicSec) { + Configuration src = CLUSTERS.getHBaseCluster1().getConfiguration(); + Configuration cloned = new Configuration(src); + cloned.setBoolean(PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED, legacyEnabled); + cloned.setLong(PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS, periodicSec); + return cloned; + } + + private Pair readLegacyCrr(String haGroupName) throws IOException { + return legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName); + } + + /** Polls the legacy CRR until {@code condition} matches or the propagation deadline elapses. */ + private Pair awaitLegacyCrr(String haGroupName, + Predicate condition, String description) throws Exception { + long deadline = System.currentTimeMillis() + ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS; + Pair legacy = readLegacyCrr(haGroupName); + while ( + (legacy.getLeft() == null || !condition.test(legacy.getLeft())) + && System.currentTimeMillis() < deadline + ) { + Thread.sleep(100); + legacy = readLegacyCrr(haGroupName); + } + assertNotNull("Legacy znode missing while awaiting: " + description, legacy.getLeft()); + assertTrue("Legacy CRR condition not met within timeout: " + description, + condition.test(legacy.getLeft())); + return legacy; + } + + private Pair awaitLegacyCrrPresent(String haGroupName) throws Exception { + return awaitLegacyCrr(haGroupName, crr -> true, "znode present"); + } + + /** Polls until the LOCAL or PEER role in the legacy CRR matches {@code expectedRole}. */ + private Pair awaitLegacyCrrRole(String haGroupName, + ClusterType clusterType, ClusterRoleRecord.ClusterRole expectedRole) throws Exception { + String url = formattedZkUrlFor(clusterType); + return awaitLegacyCrr(haGroupName, crr -> crr.getRole(url) == expectedRole, + clusterType + " role == " + expectedRole); + } + + /** LOCAL or PEER ZK URL in the canonical ZK-registry form used by the legacy sync. */ + private String formattedZkUrlFor(ClusterType clusterType) { + String raw = (clusterType == ClusterType.LOCAL) ? zkUrl : peerZKUrl; + return JDBCUtil.formatUrl(raw, ClusterRoleRecord.RegistryType.ZK); + } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java index e6e71d86a3f..1dfe3dae960 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java @@ -229,6 +229,53 @@ public void testClusterRoleFromInvalidBytes() { assertEquals(ClusterRole.UNKNOWN, role); } + /** JSON without a {@code registryType} field defaults to RPC on deserialization. */ + @Test + public void testFromJsonWithoutRegistryTypeDefaultsToRpc() throws IOException { + byte[] data = readFile("json/test_role_record_no_registry_type.json"); + Optional opt = ClusterRoleRecord.fromJson(data); + assertTrue(opt.isPresent()); + ClusterRoleRecord record = opt.get(); + assertEquals(ClusterRoleRecord.RegistryType.RPC, record.getRegistryType()); + assertEquals(HighAvailabilityPolicy.FAILOVER, record.getPolicy()); + assertEquals(7L, record.getVersion()); + assertEquals(ClusterRole.ACTIVE, record.getRole1()); + assertEquals(ClusterRole.STANDBY, record.getRole2()); + } + + /** Explicit {@code registryType=RPC} must round-trip as RPC. */ + @Test + public void testFromJsonExplicitRpcRegistryTypeRoundTrips() throws IOException { + Optional opt = + ClusterRoleRecord.fromJson(readFile("json/test_role_record_explicit_rpc.json")); + assertTrue(opt.isPresent()); + assertEquals(ClusterRoleRecord.RegistryType.RPC, opt.get().getRegistryType()); + assertEquals(11L, opt.get().getVersion()); + } + + /** Explicit {@code registryType=ZK} round-trips as ZK. */ + @Test + public void testFromJsonExplicitZkRegistryTypeRoundTrips() throws IOException { + Optional opt = + ClusterRoleRecord.fromJson(readFile("json/test_role_record_explicit_zk.json")); + assertTrue(opt.isPresent()); + assertEquals(ClusterRoleRecord.RegistryType.ZK, opt.get().getRegistryType()); + assertEquals(13L, opt.get().getVersion()); + } + + /** Round-trip: ZK-registry CRR -> JSON -> CRR preserves {@code registryType}. */ + @Test + public void testToFromJsonPreservesZkRegistryTypeAcrossRoundTrip() throws IOException { + ClusterRoleRecord written = new ClusterRoleRecord(testName.getMethodName(), + HighAvailabilityPolicy.FAILOVER, ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase", + ClusterRole.ACTIVE, "zk2\\:2181::/hbase", ClusterRole.STANDBY, 42L); + Optional read = + ClusterRoleRecord.fromJson(ClusterRoleRecord.toJson(written)); + assertTrue(read.isPresent()); + assertEquals(ClusterRoleRecord.RegistryType.ZK, read.get().getRegistryType()); + assertEquals(written, read.get()); + } + // Private Helper Methods private ClusterRoleRecord getClusterRoleRecord(String name, HighAvailabilityPolicy policy, diff --git a/phoenix-core/src/test/resources/json/test_role_record_explicit_rpc.json b/phoenix-core/src/test/resources/json/test_role_record_explicit_rpc.json new file mode 100644 index 00000000000..e2b7269b48b --- /dev/null +++ b/phoenix-core/src/test/resources/json/test_role_record_explicit_rpc.json @@ -0,0 +1,10 @@ +{ + "haGroupName" : "testFromJsonExplicitRpcRoundTrips", + "policy" : "FAILOVER", + "registryType" : "RPC", + "url1" : "url1\\:2181", + "role1" : "ACTIVE", + "url2" : "url2\\:2181", + "role2" : "STANDBY", + "version" : 11 +} diff --git a/phoenix-core/src/test/resources/json/test_role_record_explicit_zk.json b/phoenix-core/src/test/resources/json/test_role_record_explicit_zk.json new file mode 100644 index 00000000000..1e9ce174ef7 --- /dev/null +++ b/phoenix-core/src/test/resources/json/test_role_record_explicit_zk.json @@ -0,0 +1,10 @@ +{ + "haGroupName" : "testFromJsonExplicitZkRoundTrips", + "policy" : "FAILOVER", + "registryType" : "ZK", + "url1" : "zk1\\:2181::/hbase", + "role1" : "ACTIVE", + "url2" : "zk2\\:2181::/hbase", + "role2" : "STANDBY", + "version" : 13 +} diff --git a/phoenix-core/src/test/resources/json/test_role_record_no_registry_type.json b/phoenix-core/src/test/resources/json/test_role_record_no_registry_type.json new file mode 100644 index 00000000000..517e061b654 --- /dev/null +++ b/phoenix-core/src/test/resources/json/test_role_record_no_registry_type.json @@ -0,0 +1,9 @@ +{ + "haGroupName" : "testFromJsonWithoutRegistryTypeDefaultsToRpc", + "policy" : "FAILOVER", + "url1" : "url1\\:2181", + "role1" : "ACTIVE", + "url2" : "url2\\:2181", + "role2" : "STANDBY", + "version" : 7 +}