Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -168,7 +169,7 @@ private static Optional<EnsemblePlacementPolicyConfig> getEnsemblePlacementPolic

private static Pair<Set<String>, Set<String>> getIsolationGroup(
EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) {
MutablePair<Set<String>, Set<String>> pair = new MutablePair<>();
MutablePair<Set<String>, Set<String>> pair = new MutablePair<>(Collections.emptySet(), Collections.emptySet());
String className = IsolatedBookieEnsemblePlacementPolicy.class.getName();
if (ensemblePlacementPolicyConfig.getPolicyClass().getName().equals(className)) {
Map<String, Object> properties = ensemblePlacementPolicyConfig.getProperties();
Expand All @@ -178,13 +179,9 @@ private static Pair<Set<String>, Set<String>> getIsolationGroup(
.castToString(properties.getOrDefault(SECONDARY_ISOLATION_BOOKIE_GROUPS, ""));
if (!primaryIsolationGroupString.isEmpty()) {
pair.setLeft(Sets.newHashSet(primaryIsolationGroupString.split(",")));
} else {
pair.setLeft(Collections.emptySet());
}
if (!secondaryIsolationGroupString.isEmpty()) {
pair.setRight(Sets.newHashSet(secondaryIsolationGroupString.split(",")));
} else {
pair.setRight(Collections.emptySet());
}
}
return pair;
Expand All @@ -193,8 +190,14 @@ private static Pair<Set<String>, Set<String>> getIsolationGroup(
@VisibleForTesting
Set<BookieId> getExcludedBookiesWithIsolationGroups(int ensembleSize,
Pair<Set<String>, Set<String>> isolationGroups) {
Set<String> primaryIsolationGroup = Collections.emptySet();
Set<String> secondaryIsolationGroup = Collections.emptySet();
if (isolationGroups != null) {
primaryIsolationGroup = ObjectUtils.getIfNull(isolationGroups.getLeft(), Collections.emptySet());
secondaryIsolationGroup = ObjectUtils.getIfNull(isolationGroups.getRight(), Collections.emptySet());
}
Set<BookieId> excludedBookies = new HashSet<>();
if (isolationGroups != null && isolationGroups.getLeft().contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP)) {
if (primaryIsolationGroup.contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP)) {
return excludedBookies;
}
try {
Expand All @@ -215,13 +218,7 @@ Set<BookieId> getExcludedBookiesWithIsolationGroups(int ensembleSize,
return excludedBookies;
}
int totalAvailableBookiesInPrimaryGroup = 0;
Set<String> primaryIsolationGroup = Collections.emptySet();
Set<String> secondaryIsolationGroup = Collections.emptySet();
Set<BookieId> primaryGroupBookies = new HashSet<>();
if (isolationGroups != null) {
primaryIsolationGroup = isolationGroups.getLeft();
secondaryIsolationGroup = isolationGroups.getRight();
}
for (String group : allGroups) {
Set<String> bookiesInGroup = allGroupsBookieMapping.get(group).keySet();
if (!primaryIsolationGroup.contains(group)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the problematic behavior for the non-matching policyClass case. After this PR, getIsolationGroup() returns empty primary/secondary groups when the metadata policyClass is not
IsolatedBookieEnsemblePlacementPolicy. With primaryIsolationGroup empty, this condition is true for every configured group, so all grouped bookies are added to excludedBookies below. That avoids the NPE but can
still make replaceBookie fail with BKNotEnoughBookiesException. A non-matching policyClass should be ignored or fall back to defaultIsolationGroups, not be treated as an empty isolation group.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Shawyeok Please check this. I wonder if there's a test case that would cover the explained scenario?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.net.BookieId;
Expand Down Expand Up @@ -839,6 +840,62 @@ public void testGetExcludedBookiesWithIsolationGroups() throws Exception {
assertTrue(blacklist.isEmpty());
}

/**
* Regression test for the NPE reported in the stack trace below. When custom metadata carries an
* {@link EnsemblePlacementPolicyConfig} whose policy class does NOT match
* {@link IsolatedBookieEnsemblePlacementPolicy}, the old {@code getIsolationGroup()} returned a
* {@code MutablePair} with {@code null} left/right, which caused a {@link NullPointerException} in
* {@code getExcludedBookiesWithIsolationGroups} when {@code getLeft().contains(...)} was called.
*
* <pre>
* java.lang.NullPointerException: Cannot invoke "java.util.Set.contains(Object)"
* because the return value of "org.apache.commons.lang3.tuple.Pair.getLeft()" is null
* at IsolatedBookieEnsemblePlacementPolicy.getExcludedBookiesWithIsolationGroups(...)
* at IsolatedBookieEnsemblePlacementPolicy.getExcludedBookies(...)
* at IsolatedBookieEnsemblePlacementPolicy.replaceBookie(...)
* </pre>
*/
@Test
public void testReplaceBookieWithNonMatchingPolicyClassShouldNotThrowNPE() throws Exception {
Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
Map<String, BookieInfo> group1 = new HashMap<>();
group1.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
group1.put(BOOKIE2, BookieInfo.builder().rack("rack1").build());
group1.put(BOOKIE3, BookieInfo.builder().rack("rack0").build());
group1.put(BOOKIE4, BookieInfo.builder().rack("rack1").build());
bookieMapping.put("group1", group1);

store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
Optional.empty()).join();

IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new IsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "group1");
isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL,
NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);

// Use a policy class that does NOT match IsolatedBookieEnsemblePlacementPolicy.
// In the old code this caused getIsolationGroup() to return a MutablePair with null left/right,
// triggering NPE at the getLeft().contains() call in getExcludedBookiesWithIsolationGroups.
EnsemblePlacementPolicyConfig policyConfig = new EnsemblePlacementPolicyConfig(
RackawareEnsemblePlacementPolicy.class, Collections.emptyMap());
Map<String, byte[]> customMetadata = new HashMap<>();
customMetadata.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, policyConfig.encode());

BookieId bookie1Id = new BookieSocketAddress(BOOKIE1).toBookieId();
BookieId bookie2Id = new BookieSocketAddress(BOOKIE2).toBookieId();

try {
// Must not throw NullPointerException; BKNotEnoughBookiesException is acceptable.
isolationPolicy.replaceBookie(1, 1, 1, customMetadata,
Arrays.asList(bookie1Id, bookie2Id), bookie2Id, null);
} catch (BKNotEnoughBookiesException e) {
// acceptable: no bookies match the (empty) isolation group, but no NPE occurred
}
}

// The policy gets the bookie info asynchronously before each query or update, when putting the bookie info into
// the metadata store, the cache needs some time to receive the notification and update accordingly.
private void updateBookieInfo(IsolatedBookieEnsemblePlacementPolicy isolationPolicy, byte[] bookieInfo) {
Expand Down
Loading