From 217e62b2b2df79dc81be35d24722a54f481ce1a5 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 7 May 2026 16:27:08 -0700 Subject: [PATCH 1/3] Fix LedgerHandle.batchReadUnconfirmedAsync: use slog log instead of LOG The batchReadUnconfirmedAsync method added in #4739 calls LOG.error(...), but LedgerHandle was migrated to slog and only has a lowercase `log` field. Master fails to compile. Convert the call to the slog builder style used elsewhere in the file. --- .../main/java/org/apache/bookkeeper/client/LedgerHandle.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index e9e63282ffd..0a1005f6ce3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -982,7 +982,10 @@ public CompletableFuture batchReadAsync(long startEntry, int maxC public CompletableFuture batchReadUnconfirmedAsync(long startEntry, int maxCount, long maxSize) { // Little sanity check if (startEntry < 0 || maxCount < 0 || maxSize < 0) { - LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} when batch read", ledgerId, startEntry); + log.error() + .attr("ledgerId", ledgerId) + .attr("startEntry", startEntry) + .log("IncorrectParameterException when batch read"); return FutureUtils.exception(new BKIncorrectParameterException()); } From 5f41b362ce394e809408b77912290cdb9e81ce74 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 7 May 2026 16:07:05 -0700 Subject: [PATCH 2/3] Migrate stream non-gRPC protos to LightProto Migrates the stream module's non-gRPC proto definitions to LightProto: - stream/statelib/src/main/proto/kv.proto (KV state-store commands) - stream/proto/src/main/proto/cluster.proto (cluster metadata/assignment) Both files use proto3, including oneof (kv.proto) and map (cluster.proto), which LightProto handles directly. For stream/proto, cluster.proto is moved to a parallel proto-lightproto/ directory so the existing protobuf-maven-plugin keeps generating the rest of the protos (which still depend on the gRPC service stubs). Java sources that touched the migrated types are updated to use the LightProto mutable-instance API instead of the protobuf-java builder pattern. This change is independent of the other in-flight LightProto work in bookkeeper-proto and bookkeeper-server. gRPC service migration is left for a follow-up. --- pom.xml | 1 + stream/proto/pom.xml | 27 ++++++++++ .../{proto => proto-lightproto}/cluster.proto | 0 stream/statelib/pom.xml | 30 ++++++----- .../statelib/impl/kv/KVCommandProcessor.java | 15 +++--- .../bookkeeper/statelib/impl/kv/KVUtils.java | 52 +++++------------- .../statelib/impl/kv/RocksdbKVAsyncStore.java | 18 +++---- .../cluster/InMemClusterMetadataStore.java | 6 +-- .../impl/cluster/ZkClusterMetadataStore.java | 36 ++++++++----- .../sc/DefaultStorageContainerController.java | 54 +++++++++---------- .../impl/sc/ZkStorageContainerManager.java | 37 ++++++++----- .../InMemClusterMetadataStoreTest.java | 48 ++++++----------- .../cluster/ZkClusterMetadataStoreTest.java | 40 +++++--------- ...DefaultStorageContainerControllerTest.java | 52 ++++++++++-------- .../sc/ZkStorageContainerManagerTest.java | 38 ++++--------- 15 files changed, 215 insertions(+), 239 deletions(-) rename stream/proto/src/main/{proto => proto-lightproto}/cluster.proto (100%) diff --git a/pom.xml b/pom.xml index c7ae8ee8786..3a9f2a9af1d 100644 --- a/pom.xml +++ b/pom.xml @@ -220,6 +220,7 @@ 3.10.1 1.7.1 0.6.1 + 0.7.0 9.3 4.7.3.2 3.6.0 diff --git a/stream/proto/pom.xml b/stream/proto/pom.xml index 108d69aca63..d4e0fe6844d 100644 --- a/stream/proto/pom.xml +++ b/stream/proto/pom.xml @@ -69,6 +69,15 @@ + + org.apache.rat + apache-rat-plugin + + + target/generated-sources/lightproto/** + + + org.apache.maven.plugins maven-compiler-plugin @@ -96,6 +105,24 @@ + + io.streamnative.lightproto + lightproto-maven-plugin + ${lightproto-maven-plugin.version} + + + ${project.basedir}/src/main/proto-lightproto/cluster.proto + + generated-sources/lightproto/java + + + + + generate + + + + org.apache.maven.plugins maven-jar-plugin diff --git a/stream/proto/src/main/proto/cluster.proto b/stream/proto/src/main/proto-lightproto/cluster.proto similarity index 100% rename from stream/proto/src/main/proto/cluster.proto rename to stream/proto/src/main/proto-lightproto/cluster.proto diff --git a/stream/statelib/pom.xml b/stream/statelib/pom.xml index 73c1b349c12..804c22bad51 100644 --- a/stream/statelib/pom.xml +++ b/stream/statelib/pom.xml @@ -61,8 +61,8 @@ rocksdbjni - com.google.protobuf - protobuf-java + io.netty + netty-buffer org.apache.distributedlog @@ -104,25 +104,27 @@ - - - kr.motd.maven - os-maven-plugin - ${os-maven-plugin.version} - - - org.xolstice.maven.plugins - protobuf-maven-plugin - ${protobuf-maven-plugin.version} + org.apache.rat + apache-rat-plugin - com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier} + + target/generated-sources/lightproto/** + + + + + io.streamnative.lightproto + lightproto-maven-plugin + ${lightproto-maven-plugin.version} + + generated-sources/lightproto/java - compile + generate diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVCommandProcessor.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVCommandProcessor.java index 1eb9009a4c5..d5351069044 100644 --- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVCommandProcessor.java +++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVCommandProcessor.java @@ -20,7 +20,6 @@ import static org.apache.bookkeeper.statelib.impl.kv.KVUtils.newCommand; -import com.google.protobuf.InvalidProtocolBufferException; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import lombok.AccessLevel; @@ -48,8 +47,8 @@ public void applyCommand(long revision, ByteBuf cmdBuf, RocksdbKVStore store) { PutRequest putReq = command.getPutReq(); - byte[] keyBytes = putReq.getKey().toByteArray(); - byte[] valBytes = putReq.getValue().toByteArray(); + byte[] keyBytes = putReq.getKey(); + byte[] valBytes = putReq.getValue(); @Cleanup("release") ByteBuf serializedValBuf = KVUtils.serialize(valBytes, revision); byte[] serializedValBytes = ByteBufUtil.getBytes(serializedValBuf); store.put(keyBytes, serializedValBytes, revision); @@ -83,8 +82,8 @@ private void applyPutCommand(long revision, Command command, RocksdbKVStore store) { PutRequest putReq = command.getPutReq(); - byte[] keyBytes = putReq.getKey().toByteArray(); - byte[] valBytes = putReq.getValue().toByteArray(); + byte[] keyBytes = putReq.getKey(); + byte[] valBytes = putReq.getValue(); @Cleanup("release") ByteBuf serializedValBuf = KVUtils.serialize(valBytes, revision); byte[] serializedValBytes = ByteBufUtil.getBytes(serializedValBuf); store.putIfAbsent(keyBytes, serializedValBytes, revision); @@ -92,7 +91,7 @@ private void applyPutIfAbsentCommand(long revision, Command command, RocksdbKVSt private void applyDeleteCommand(long revision, Command command, RocksdbKVStore store) { DeleteRequest delReq = command.getDelReq(); - byte[] keyBytes = delReq.getKey().toByteArray(); + byte[] keyBytes = delReq.getKey(); store.delete(keyBytes, revision); } } diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVUtils.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVUtils.java index 337b403e136..12d8e52e3e9 100644 --- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVUtils.java +++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVUtils.java @@ -18,22 +18,14 @@ */ package org.apache.bookkeeper.statelib.impl.kv; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.UnsafeByteOperations; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.PooledByteBufAllocator; import io.netty.util.ReferenceCountUtil; -import java.io.IOException; import lombok.AccessLevel; import lombok.CustomLog; import lombok.NoArgsConstructor; import org.apache.bookkeeper.common.coder.Coder; import org.apache.bookkeeper.proto.statestore.kv.Command; -import org.apache.bookkeeper.proto.statestore.kv.DeleteRequest; -import org.apache.bookkeeper.proto.statestore.kv.NopRequest; -import org.apache.bookkeeper.proto.statestore.kv.PutIfAbsentRequest; -import org.apache.bookkeeper.proto.statestore.kv.PutRequest; /** * Utils for kv stores. @@ -42,9 +34,13 @@ @NoArgsConstructor(access = AccessLevel.PRIVATE) final class KVUtils { - static final Command NOP_CMD = Command.newBuilder() - .setNopReq(NopRequest.newBuilder().build()) - .build(); + static final Command NOP_CMD = newNopCommand(); + + private static Command newNopCommand() { + Command cmd = new Command(); + cmd.setNopReq(); + return cmd; + } static ByteBuf serialize(ByteBuf valBuf, long revision) { int serializedSize = valBuf.readableBytes() + Long.BYTES; @@ -68,41 +64,21 @@ static V deserialize(Coder valCoder, return valCoder.decode(valBuf); } - static Command newCommand(ByteBuf cmdBuf) throws InvalidProtocolBufferException { - return Command.parseFrom(cmdBuf.nioBuffer()); + static Command newCommand(ByteBuf cmdBuf) { + Command cmd = new Command(); + cmd.parseFrom(cmdBuf, cmdBuf.readableBytes()); + return cmd; } - static ByteBuf newCommandBuf(Command cmd) throws IOException { + static ByteBuf newCommandBuf(Command cmd) { ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(cmd.getSerializedSize()); try { - cmd.writeTo(new ByteBufOutputStream(buf)); - } catch (IOException e) { + cmd.writeTo(buf); + } catch (RuntimeException e) { ReferenceCountUtil.release(buf); throw e; } return buf; } - static PutRequest newPutRequest(byte[] keyBytes, - byte[] valBytes) { - return PutRequest.newBuilder() - .setKey(UnsafeByteOperations.unsafeWrap(keyBytes)) - .setValue(UnsafeByteOperations.unsafeWrap(valBytes)) - .build(); - } - - static PutIfAbsentRequest newPutIfAbsentRequest(byte[] keyBytes, - byte[] valBytes) { - return PutIfAbsentRequest.newBuilder() - .setKey(UnsafeByteOperations.unsafeWrap(keyBytes)) - .setValue(UnsafeByteOperations.unsafeWrap(valBytes)) - .build(); - } - - static DeleteRequest newDeleteRequest(byte[] keyBytes) { - return DeleteRequest.newBuilder() - .setKey(UnsafeByteOperations.unsafeWrap(keyBytes)) - .build(); - } - } diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.java index 63150d11d5e..dc982a9e9bc 100644 --- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.java +++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.java @@ -22,7 +22,6 @@ import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.util.ReferenceCountUtil; -import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import org.apache.bookkeeper.common.coder.Coder; @@ -91,9 +90,8 @@ public CompletableFuture put(K key, V value) { byte[] keyBytes = keyCoder.encode(key); byte[] valBytes = valCoder.encode(value); - Command command = Command.newBuilder() - .setPutReq(KVUtils.newPutRequest(keyBytes, valBytes)) - .build(); + Command command = new Command(); + command.setPutReq().setKey(keyBytes).setValue(valBytes); return writeCommandReturnTxId(command).thenApplyAsync((revision) -> { ByteBuf serializedBuf = KVUtils.serialize(valBytes, revision); try { @@ -112,9 +110,8 @@ public CompletableFuture putIfAbsent(K key, V value) { byte[] keyBytes = keyCoder.encode(key); byte[] valBytes = valCoder.encode(value); - Command command = Command.newBuilder() - .setPutIfAbsentReq(KVUtils.newPutIfAbsentRequest(keyBytes, valBytes)) - .build(); + Command command = new Command(); + command.setPutIfAbsentReq().setKey(keyBytes).setValue(valBytes); return writeCommandReturnTxId(command).thenApplyAsync((revision) -> { ByteBuf serializedBuf = KVUtils.serialize(valBytes, revision); try { @@ -135,9 +132,8 @@ public CompletableFuture putIfAbsent(K key, V value) { public CompletableFuture delete(K key) { byte[] keyBytes = keyCoder.encode(key); - Command command = Command.newBuilder() - .setDelReq(KVUtils.newDeleteRequest(keyBytes)) - .build(); + Command command = new Command(); + command.setDelReq().setKey(keyBytes); return writeCommandReturnTxId(command).thenApplyAsync((revision) -> { byte[] prevValue = localStore.delete(keyBytes, revision); if (null == prevValue) { @@ -171,7 +167,7 @@ private CompletableFuture writeCommandReturnTxId(Command command) { try { ByteBuf cmdBuf = KVUtils.newCommandBuf(command); return writeCommandBufReturnTxId(cmdBuf); - } catch (IOException e) { + } catch (RuntimeException e) { return FutureUtils.exception(e); } } diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java index bea4b23091e..645ced4505d 100644 --- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java @@ -56,10 +56,8 @@ synchronized int getNumWatchers() { @Override public synchronized boolean initializeCluster(int numStorageContainers, Optional segmentStorePath) { - this.metadata = ClusterMetadata.newBuilder() - .setNumStorageContainers(numStorageContainers) - .build(); - this.assignmentData = ClusterAssignmentData.newBuilder().build(); + this.metadata = new ClusterMetadata().setNumStorageContainers(numStorageContainers); + this.assignmentData = new ClusterAssignmentData(); return true; } diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java index e583fc29597..4f9f8f53c0d 100644 --- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java @@ -25,7 +25,6 @@ import static org.apache.bookkeeper.stream.storage.StorageConstants.getStoragePath; import static org.apache.bookkeeper.stream.storage.StorageConstants.getWritableServersPath; -import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -95,11 +94,8 @@ public void close() { @Override public boolean initializeCluster(int numStorageContainers, Optional segmentStorePath) { - ClusterMetadata metadata = ClusterMetadata.newBuilder() - .setNumStorageContainers(numStorageContainers) - .build(); - ClusterAssignmentData assignmentData = ClusterAssignmentData.newBuilder() - .build(); + ClusterMetadata metadata = new ClusterMetadata().setNumStorageContainers(numStorageContainers); + ClusterAssignmentData assignmentData = new ClusterAssignmentData(); try { // we are using dlog for the storage backend, so we need to initialize the dlog namespace BKDLConfig dlogConfig = new BKDLConfig( @@ -130,10 +126,16 @@ public boolean initializeCluster(int numStorageContainers, Optional segm public ClusterAssignmentData getClusterAssignmentData() { try { byte[] data = client.getData().forPath(zkClusterAssignmentPath); - return ClusterAssignmentData.parseFrom(data); - } catch (InvalidProtocolBufferException ie) { - throw new StorageRuntimeException("The cluster assignment data from zookeeper @" - + zkClusterAssignmentPath + " is corrupted", ie); + ClusterAssignmentData assignmentData = new ClusterAssignmentData(); + try { + assignmentData.parseFrom(data); + } catch (RuntimeException ie) { + throw new StorageRuntimeException("The cluster assignment data from zookeeper @" + + zkClusterAssignmentPath + " is corrupted", ie); + } + return assignmentData; + } catch (StorageRuntimeException e) { + throw e; } catch (Exception e) { throw new StorageRuntimeException("Failed to fetch cluster assignment data from zookeeper @" + zkClusterAssignmentPath, e); @@ -193,10 +195,16 @@ public void unwatchClusterAssignmentData(Consumer watcher) { public ClusterMetadata getClusterMetadata() { try { byte[] data = client.getData().forPath(zkClusterMetadataPath); - return ClusterMetadata.parseFrom(data); - } catch (InvalidProtocolBufferException ie) { - throw new StorageRuntimeException("The cluster metadata from zookeeper @" - + zkClusterMetadataPath + " is corrupted", ie); + ClusterMetadata metadata = new ClusterMetadata(); + try { + metadata.parseFrom(data); + } catch (RuntimeException ie) { + throw new StorageRuntimeException("The cluster metadata from zookeeper @" + + zkClusterMetadataPath + " is corrupted", ie); + } + return metadata; + } catch (StorageRuntimeException e) { + throw e; } catch (Exception e) { throw new StorageRuntimeException("Failed to fetch cluster metadata from zookeeper @" + zkClusterMetadataPath, e); diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerController.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerController.java index ecafa8fcf46..327499e065f 100644 --- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerController.java +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerController.java @@ -81,14 +81,15 @@ public ClusterAssignmentData computeIdealState(ClusterMetadata clusterMetadata, // 1. get current server assignments Map> currentServerAssignments; try { - currentServerAssignments = currentState.getServersMap() - .entrySet() - .stream() - .collect(Collectors.toMap(e1 -> { - return BookieId.parse(e1.getKey()); - }, - e2 -> e2.getValue().getContainersList().stream().collect(Collectors.toSet()) - )); + currentServerAssignments = Maps.newHashMap(); + Map> finalAssignments = currentServerAssignments; + currentState.forEachServers((server, assignment) -> { + Set containers = Sets.newHashSet(); + for (int i = 0; i < assignment.getContainersCount(); i++) { + containers.add(assignment.getContainerAt(i)); + } + finalAssignments.put(BookieId.parse(server), containers); + }); } catch (UncheckedExecutionException uee) { log.warn() .attr("currentState", currentState) @@ -181,15 +182,14 @@ public ClusterAssignmentData computeIdealState(ClusterMetadata clusterMetadata, } // 9. the new ideal state is computed, finalize it - Map newAssignmentMap = Maps.newHashMap(); - assignmentQueue.forEach(assignment -> newAssignmentMap.put( - assignment.getKey().toString(), - ServerAssignmentData.newBuilder() - .addAllContainers(assignment.getValue()) - .build())); - return ClusterAssignmentData.newBuilder() - .putAllServers(newAssignmentMap) - .build(); + ClusterAssignmentData result = new ClusterAssignmentData(); + for (Pair> assignment : assignmentQueue) { + ServerAssignmentData server = result.putServers(assignment.getKey().toString()); + for (long container : assignment.getValue()) { + server.addContainer(container); + } + } + return result; } static ClusterAssignmentData initializeIdealState(ClusterMetadata clusterMetadata, @@ -202,24 +202,20 @@ static ClusterAssignmentData initializeIdealState(ClusterMetadata clusterMetadat int numTotalContainers = (int) clusterMetadata.getNumStorageContainers(); int numContainersPerServer = numTotalContainers / currentCluster.size(); - Map assignmentMap = Maps.newHashMap(); + ClusterAssignmentData result = new ClusterAssignmentData(); for (int serverIdx = 0; serverIdx < serverList.size(); serverIdx++) { BookieId server = serverList.get(serverIdx); + ServerAssignmentData assignmentData = result.putServers(server.toString()); int finalServerIdx = serverIdx; - ServerAssignmentData assignmentData = ServerAssignmentData.newBuilder() - .addAllContainers( - LongStream.rangeClosed(0, numContainersPerServer).boxed() - .map(j -> j * numServers + finalServerIdx) - .filter(containerId -> containerId < numTotalContainers) - .collect(Collectors.toSet())) - .build(); - assignmentMap.put(server.toString(), assignmentData); + LongStream.rangeClosed(0, numContainersPerServer) + .map(j -> j * numServers + finalServerIdx) + .filter(containerId -> containerId < numTotalContainers) + .distinct() + .forEach(assignmentData::addContainer); } - return ClusterAssignmentData.newBuilder() - .putAllServers(assignmentMap) - .build(); + return result; } } diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManager.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManager.java index 6868103a580..02d1d69c1df 100644 --- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManager.java +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManager.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.time.Duration; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -33,7 +34,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.CustomLog; import lombok.Getter; @@ -59,6 +59,8 @@ public class ZkStorageContainerManager extends AbstractLifecycleComponent implements StorageContainerManager, Consumer { + private static final ServerAssignmentData EMPTY_SERVER_ASSIGNMENT = new ServerAssignmentData(); + private final Endpoint endpoint; private final ClusterMetadataStore metadataStore; private final StorageContainerRegistry registry; @@ -161,11 +163,9 @@ private boolean refreshMyAssignment() { return false; } - Map newAssignmentMap = clusterAssignmentData.getServersMap().entrySet() - .stream() - .collect(Collectors.toMap( - e -> NetUtils.parseEndpoint(e.getKey()), - e -> e.getValue())); + Map newAssignmentMap = new HashMap<>(); + clusterAssignmentData.forEachServers( + (server, assignment) -> newAssignmentMap.put(NetUtils.parseEndpoint(server), assignment)); Set oldAssignedServers = clusterAssignmentMap.keySet(); Set newAssignedServers = newAssignmentMap.keySet(); @@ -191,7 +191,9 @@ private void processServersJoined(Set serversJoined, serversJoined.forEach(ep -> { ServerAssignmentData sad = newAssignmentMap.get(ep); if (null != sad) { - sad.getContainersList().forEach(container -> containerAssignmentMap.put(container, ep)); + for (int i = 0; i < sad.getContainersCount(); i++) { + containerAssignmentMap.put(sad.getContainerAt(i), ep); + } } }); } @@ -204,7 +206,9 @@ private void processServersLeft(Set serversLeft, serversLeft.forEach(ep -> { ServerAssignmentData sad = oldAssignmentMap.get(ep); if (null != sad) { - sad.getContainersList().forEach(container -> containerAssignmentMap.remove(container, ep)); + for (int i = 0; i < sad.getContainersCount(); i++) { + containerAssignmentMap.remove(sad.getContainerAt(i), ep); + } } }); } @@ -214,8 +218,8 @@ private void processServersAssignmentChanged(Set commonServers, Map newAssignmentMap) { commonServers.forEach(ep -> { - ServerAssignmentData oldSad = oldAssignmentMap.getOrDefault(ep, ServerAssignmentData.getDefaultInstance()); - ServerAssignmentData newSad = newAssignmentMap.getOrDefault(ep, ServerAssignmentData.getDefaultInstance()); + ServerAssignmentData oldSad = oldAssignmentMap.getOrDefault(ep, EMPTY_SERVER_ASSIGNMENT); + ServerAssignmentData newSad = newAssignmentMap.getOrDefault(ep, EMPTY_SERVER_ASSIGNMENT); if (oldSad.equals(newSad)) { return; @@ -225,8 +229,12 @@ private void processServersAssignmentChanged(Set commonServers, .attr("oldAssignment", oldSad) .attr("newAssignment", newSad) .log("Server assignment is changed"); - oldSad.getContainersList().forEach(container -> containerAssignmentMap.remove(container, ep)); - newSad.getContainersList().forEach(container -> containerAssignmentMap.put(container, ep)); + for (int i = 0; i < oldSad.getContainersCount(); i++) { + containerAssignmentMap.remove(oldSad.getContainerAt(i), ep); + } + for (int i = 0; i < newSad.getContainersCount(); i++) { + containerAssignmentMap.put(newSad.getContainerAt(i), ep); + } } }); @@ -239,7 +247,10 @@ private void stopContainers() { } private void processMyAssignment(ServerAssignmentData myAssignment) { - Set assignedContainerSet = myAssignment.getContainersList().stream().collect(Collectors.toSet()); + Set assignedContainerSet = Sets.newHashSet(); + for (int i = 0; i < myAssignment.getContainersCount(); i++) { + assignedContainerSet.add(myAssignment.getContainerAt(i)); + } Set liveContainerSet = Sets.newHashSet(liveContainers.keySet()); Set containersToStart = diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStoreTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStoreTest.java index 447f934822c..cba8a53c00e 100644 --- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStoreTest.java +++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStoreTest.java @@ -59,10 +59,10 @@ public void teardown() { @Test public void testUninitialized() { assertEquals( - ClusterMetadata.newBuilder().setNumStorageContainers(NUM_STORAGE_CONTAINERS).build(), + new ClusterMetadata().setNumStorageContainers(NUM_STORAGE_CONTAINERS), store.getClusterMetadata()); assertEquals( - ClusterAssignmentData.newBuilder().build(), + new ClusterAssignmentData(), store.getClusterAssignmentData()); } @@ -71,47 +71,37 @@ public void testInitialize() { int numStorageContainers = 2048; store.initializeCluster(numStorageContainers); assertEquals( - ClusterMetadata.newBuilder().setNumStorageContainers(numStorageContainers).build(), + new ClusterMetadata().setNumStorageContainers(numStorageContainers), store.getClusterMetadata()); assertEquals( - ClusterAssignmentData.newBuilder().build(), + new ClusterAssignmentData(), store.getClusterAssignmentData()); } @Test public void testUpdateClusterMetadata() { int numStorageContainers = 4096; - ClusterMetadata metadata = ClusterMetadata.newBuilder() - .setNumStorageContainers(numStorageContainers) - .build(); + ClusterMetadata metadata = new ClusterMetadata().setNumStorageContainers(numStorageContainers); store.updateClusterMetadata(metadata); assertEquals(metadata, store.getClusterMetadata()); } @Test public void testUpdateClusterAssignmentData() { - ClusterAssignmentData assignmentData = ClusterAssignmentData.newBuilder() - .putServers( - "server-0", - ServerAssignmentData.newBuilder() - .addContainers(1L) - .addContainers(2L) - .build()) - .build(); + ClusterAssignmentData assignmentData = new ClusterAssignmentData(); + ServerAssignmentData server0 = assignmentData.putServers("server-0"); + server0.addContainer(1L); + server0.addContainer(2L); store.updateClusterAssignmentData(assignmentData); assertEquals(assignmentData, store.getClusterAssignmentData()); } @Test public void testWatchClusterAssignmentData() { - ClusterAssignmentData assignmentData = ClusterAssignmentData.newBuilder() - .putServers( - "server-0", - ServerAssignmentData.newBuilder() - .addContainers(1L) - .addContainers(2L) - .build()) - .build(); + ClusterAssignmentData assignmentData = new ClusterAssignmentData(); + ServerAssignmentData server0 = assignmentData.putServers("server-0"); + server0.addContainer(1L); + server0.addContainer(2L); @Cleanup("shutdown") ExecutorService executor = Executors.newSingleThreadExecutor(); @@ -129,14 +119,10 @@ public void testWatchClusterAssignmentData() { @Test public void testUnwatchClusterAssignmentData() throws Exception { - ClusterAssignmentData assignmentData = ClusterAssignmentData.newBuilder() - .putServers( - "server-0", - ServerAssignmentData.newBuilder() - .addContainers(1L) - .addContainers(2L) - .build()) - .build(); + ClusterAssignmentData assignmentData = new ClusterAssignmentData(); + ServerAssignmentData server0 = assignmentData.putServers("server-0"); + server0.addContainer(1L); + server0.addContainer(2L); @Cleanup("shutdown") ExecutorService executor = Executors.newSingleThreadExecutor(); diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStoreTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStoreTest.java index bcf34331e98..4e87451d617 100644 --- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStoreTest.java +++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStoreTest.java @@ -114,37 +114,27 @@ public void testInitialize() { @Test public void testUpdateClusterMetadata() { int numStorageContainers = 4096; - ClusterMetadata metadata = ClusterMetadata.newBuilder() - .setNumStorageContainers(numStorageContainers) - .build(); + ClusterMetadata metadata = new ClusterMetadata().setNumStorageContainers(numStorageContainers); store.updateClusterMetadata(metadata); assertEquals(metadata, store.getClusterMetadata()); } @Test public void testUpdateClusterAssignmentData() { - ClusterAssignmentData assignmentData = ClusterAssignmentData.newBuilder() - .putServers( - "server-0", - ServerAssignmentData.newBuilder() - .addContainers(1L) - .addContainers(2L) - .build()) - .build(); + ClusterAssignmentData assignmentData = new ClusterAssignmentData(); + ServerAssignmentData server0 = assignmentData.putServers("server-0"); + server0.addContainer(1L); + server0.addContainer(2L); store.updateClusterAssignmentData(assignmentData); assertEquals(assignmentData, store.getClusterAssignmentData()); } @Test public void testWatchClusterAssignmentData() { - ClusterAssignmentData assignmentData = ClusterAssignmentData.newBuilder() - .putServers( - "server-0", - ServerAssignmentData.newBuilder() - .addContainers(1L) - .addContainers(2L) - .build()) - .build(); + ClusterAssignmentData assignmentData = new ClusterAssignmentData(); + ServerAssignmentData server0 = assignmentData.putServers("server-0"); + server0.addContainer(1L); + server0.addContainer(2L); @Cleanup("shutdown") ExecutorService executor = Executors.newSingleThreadExecutor(); @@ -162,14 +152,10 @@ public void testWatchClusterAssignmentData() { @Test public void testUnwatchClusterAssignmentData() throws Exception { - ClusterAssignmentData assignmentData = ClusterAssignmentData.newBuilder() - .putServers( - "server-0", - ServerAssignmentData.newBuilder() - .addContainers(1L) - .addContainers(2L) - .build()) - .build(); + ClusterAssignmentData assignmentData = new ClusterAssignmentData(); + ServerAssignmentData server0 = assignmentData.putServers("server-0"); + server0.addContainer(1L); + server0.addContainer(2L); @Cleanup("shutdown") ExecutorService executor = Executors.newSingleThreadExecutor(); diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerControllerTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerControllerTest.java index f6befb0487e..009bbd9926f 100644 --- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerControllerTest.java +++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerControllerTest.java @@ -57,16 +57,26 @@ public class DefaultStorageContainerControllerTest { public DefaultStorageContainerControllerTest() { this.controller = new DefaultStorageContainerController(); - this.clusterMetadata = ClusterMetadata.newBuilder() - .setNumStorageContainers(NUM_STORAGE_CONTAINERS) - .build(); - this.currentAssignment = ClusterAssignmentData.newBuilder() - .putServers("default-server", ServerAssignmentData.newBuilder() - .addContainers(0L) - .addContainers(1L) - .addContainers(3L) - .build()) - .build(); + this.clusterMetadata = new ClusterMetadata().setNumStorageContainers(NUM_STORAGE_CONTAINERS); + this.currentAssignment = new ClusterAssignmentData(); + ServerAssignmentData defaultServer = currentAssignment.putServers("default-server"); + defaultServer.addContainer(0L); + defaultServer.addContainer(1L); + defaultServer.addContainer(3L); + } + + private static List containersOf(ServerAssignmentData serverData) { + List containers = Lists.newArrayListWithCapacity(serverData.getContainersCount()); + for (int i = 0; i < serverData.getContainersCount(); i++) { + containers.add(serverData.getContainerAt(i)); + } + return containers; + } + + private static Map serversMap(ClusterAssignmentData assignment) { + Map map = new java.util.LinkedHashMap<>(); + assignment.forEachServers(map::put); + return map; } @Test @@ -129,7 +139,7 @@ private static void verifyAssignmentData(ClusterAssignmentData newAssignment, int numContainersPerServer = NUM_STORAGE_CONTAINERS / numServers; int serverIdx = 0; - for (Map.Entry entry : newAssignment.getServersMap().entrySet()) { + for (Map.Entry entry : serversMap(newAssignment).entrySet()) { log.info().attr("server", entry.getKey()).attr("assignment", entry.getValue()) .log("Check assignment for server"); @@ -139,7 +149,7 @@ private static void verifyAssignmentData(ClusterAssignmentData newAssignment, ServerAssignmentData serverData = entry.getValue(); assertEquals(numContainersPerServer, serverData.getContainersCount()); - List containers = Lists.newArrayList(serverData.getContainersList()); + List containers = containersOf(serverData); Collections.sort(containers); assignedContainers.addAll(containers); @@ -173,7 +183,7 @@ private static void verifyAssignmentDataWhenHasMoreServers(ClusterAssignmentData int numEmptyServers = 0; int numAssignedServers = 0; int serverIdx = 0; - for (Map.Entry entry : newAssignment.getServersMap().entrySet()) { + for (Map.Entry entry : serversMap(newAssignment).entrySet()) { log.info().attr("server", entry.getKey()).attr("assignment", entry.getValue()) .log("Check assignment for server"); @@ -188,7 +198,7 @@ private static void verifyAssignmentDataWhenHasMoreServers(ClusterAssignmentData } else { ++numEmptyServers; } - List containers = Lists.newArrayList(serverData.getContainersList()); + List containers = containersOf(serverData); Collections.sort(containers); assignedContainers.addAll(containers); @@ -209,7 +219,7 @@ private static void verifyAssignmentDataWhenHasMoreServers(ClusterAssignmentData @Test public void testComputeIdealStateFromEmptyAssignment() throws Exception { - ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build(); + ClusterAssignmentData emptyAssignment = new ClusterAssignmentData(); int numServers = 8; Set currentCluster = newCluster(numServers); @@ -224,7 +234,7 @@ public void testComputeIdealStateFromEmptyAssignment() throws Exception { @Test public void testComputeIdealStateIfClusterUnchanged() throws Exception { - ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build(); + ClusterAssignmentData emptyAssignment = new ClusterAssignmentData(); int numServers = 8; Set currentCluster = newCluster(numServers); @@ -245,7 +255,7 @@ public void testComputeIdealStateIfClusterUnchanged() throws Exception { @Test public void testComputeIdealStateWhenHostsRemoved() throws Exception { - ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build(); + ClusterAssignmentData emptyAssignment = new ClusterAssignmentData(); int numServers = 8; Set currentCluster = newCluster(numServers); @@ -268,7 +278,7 @@ public void testComputeIdealStateWhenHostsRemoved() throws Exception { @Test public void testComputeIdealStateWhenHostsAdded() throws Exception { - ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build(); + ClusterAssignmentData emptyAssignment = new ClusterAssignmentData(); int numServers = 4; Set currentCluster = newCluster(numServers); @@ -291,7 +301,7 @@ public void testComputeIdealStateWhenHostsAdded() throws Exception { @Test public void testComputeIdealStateWhenHostsRemovedAdded() throws Exception { - ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build(); + ClusterAssignmentData emptyAssignment = new ClusterAssignmentData(); int numServers = 4; Set currentCluster = newCluster(numServers); @@ -318,7 +328,7 @@ public void testComputeIdealStateWhenHostsRemovedAdded() throws Exception { @Test public void testComputeIdealStateWhenHasMoreServers() throws Exception { - ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build(); + ClusterAssignmentData emptyAssignment = new ClusterAssignmentData(); int numServers = 2 * NUM_STORAGE_CONTAINERS; Set currentCluster = newCluster(numServers); @@ -332,7 +342,7 @@ public void testComputeIdealStateWhenHasMoreServers() throws Exception { @Test public void testComputeIdealStateWhenScaleToMoreServers() throws Exception { - ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build(); + ClusterAssignmentData emptyAssignment = new ClusterAssignmentData(); int numServers = 4; Set currentCluster = newCluster(numServers); diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java index bd33392a571..ebd324ae157 100644 --- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java +++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java @@ -150,13 +150,8 @@ public void testBasicOps() throws Exception { .thenReturn(mockSc2); // update assignment map - ClusterAssignmentData cad = ClusterAssignmentData.newBuilder() - .putServers( - NetUtils.endpointToString(myEndpoint), - ServerAssignmentData.newBuilder() - .addContainers(containerId) - .build()) - .build(); + ClusterAssignmentData cad = new ClusterAssignmentData(); + cad.putServers(NetUtils.endpointToString(myEndpoint)).addContainer(containerId); clusterMetadataStore.updateClusterAssignmentData(cad); // notify the container to complete startup @@ -170,13 +165,8 @@ public void testBasicOps() throws Exception { // update assignment map to remove containerId and add containerId2 - ClusterAssignmentData newCad = ClusterAssignmentData.newBuilder() - .putServers( - NetUtils.endpointToString(myEndpoint), - ServerAssignmentData.newBuilder() - .addContainers(22L) - .build()) - .build(); + ClusterAssignmentData newCad = new ClusterAssignmentData(); + newCad.putServers(NetUtils.endpointToString(myEndpoint)).addContainer(22L); clusterMetadataStore.updateClusterAssignmentData(newCad); // notify the container1 to stop and container2 to start @@ -210,13 +200,8 @@ public void testShutdownPendingStartStorageContainer() throws Exception { .thenReturn(mockSc); // update assignment map - ClusterAssignmentData cad = ClusterAssignmentData.newBuilder() - .putServers( - NetUtils.endpointToString(myEndpoint), - ServerAssignmentData.newBuilder() - .addContainers(containerId) - .build()) - .build(); + ClusterAssignmentData cad = new ClusterAssignmentData(); + cad.putServers(NetUtils.endpointToString(myEndpoint)).addContainer(containerId); clusterMetadataStore.updateClusterAssignmentData(cad); // wait until container start is called @@ -226,7 +211,7 @@ public void testShutdownPendingStartStorageContainer() throws Exception { assertTrue(scManager.getPendingStartStopContainers().contains(containerId)); // now shutting the manager down - cad = ClusterAssignmentData.newBuilder().build(); + cad = new ClusterAssignmentData(); clusterMetadataStore.updateClusterAssignmentData(cad); // the container should not be stopped since it is pending starting. @@ -282,13 +267,8 @@ public void testStartContainerOnFailures() throws Exception { scManager.start(); // update assignment map - ClusterAssignmentData cad = ClusterAssignmentData.newBuilder() - .putServers( - NetUtils.endpointToString(myEndpoint), - ServerAssignmentData.newBuilder() - .addContainers(containerId) - .build()) - .build(); + ClusterAssignmentData cad = new ClusterAssignmentData(); + cad.putServers(NetUtils.endpointToString(myEndpoint)).addContainer(containerId); clusterMetadataStore.updateClusterAssignmentData(cad); // wait until container start is called and verify it is not started. From 30687097585125e34151ce99b07d9ffddc932aee Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 7 May 2026 17:48:20 -0700 Subject: [PATCH 3/3] Fix checkstyle: remove unused ServerAssignmentData import --- .../stream/storage/impl/sc/ZkStorageContainerManagerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java index ebd324ae157..69cdf762576 100644 --- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java +++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java @@ -37,7 +37,6 @@ import org.apache.bookkeeper.common.testing.MoreAsserts; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData; -import org.apache.bookkeeper.stream.proto.cluster.ServerAssignmentData; import org.apache.bookkeeper.stream.proto.common.Endpoint; import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer; import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerFactory;