diff --git a/bookkeeper-proto/pom.xml b/bookkeeper-proto/pom.xml
index d997fbde438..20b1d28a6ea 100644
--- a/bookkeeper-proto/pom.xml
+++ b/bookkeeper-proto/pom.xml
@@ -25,10 +25,6 @@
bookkeeper-protoApache BookKeeper :: Protocols
-
- com.google.protobuf
- protobuf-java
- io.nettynetty-buffer
@@ -42,40 +38,16 @@
- **/BookkeeperProtocol.javatarget/generated-sources/lightproto/****/.checkstyle
-
- org.xolstice.maven.plugins
- protobuf-maven-plugin
- ${protobuf-maven-plugin.version}
-
- com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
- true
-
- BookkeeperProtocol.proto
-
-
-
-
-
- compile
-
-
-
- io.streamnative.lightprotolightproto-maven-plugin${lightproto-maven-plugin.version}
-
- ${project.basedir}/src/main/proto/DataFormats.proto
- ${project.basedir}/src/main/proto/DbLedgerStorageDataFormats.proto
- generated-sources/lightproto/javatrue
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java
index b0f9636534c..669d453c55a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java
@@ -35,7 +35,7 @@
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
-import org.apache.bookkeeper.proto.BookkeeperProtocol;
+import org.apache.bookkeeper.proto.GetBookieInfoRequest;
import org.apache.commons.collections4.CollectionUtils;
/**
@@ -46,8 +46,8 @@
@CustomLog
public class BookieInfoReader {
private static final long GET_BOOKIE_INFO_REQUEST_FLAGS =
- BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE
- | BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE;
+ GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE
+ | GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE;
private final ScheduledExecutorService scheduler;
private final BookKeeper bk;
@@ -329,8 +329,8 @@ synchronized void getReadWriteBookieInfo() {
}
BookieClient bkc = bk.getBookieClient();
- final long requested = BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE
- | BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE;
+ final long requested = GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE
+ | GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE;
totalSent = 0;
completedCnt = 0;
errorCnt = 0;
@@ -413,8 +413,8 @@ Map getBookieInfo() throws BKException, InterruptedExcepti
final ConcurrentMap map =
new ConcurrentHashMap();
final CountDownLatch latch = new CountDownLatch(1);
- long requested = BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE
- | BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE;
+ long requested = GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE
+ | GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE;
Collection bookies;
bookies = bk.bookieWatcher.getBookies();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AddCompletion.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AddCompletion.java
index 69409138b0c..1ed31b8dd09 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AddCompletion.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AddCompletion.java
@@ -117,7 +117,7 @@ public void setOutstanding() {
@Override
public void handleV2Response(
- long ledgerId, long entryId, BookkeeperProtocol.StatusCode status,
+ long ledgerId, long entryId, StatusCode status,
BookieProtocol.Response response) {
perChannelBookieClient.addEntryOutstanding.dec();
handleResponse(ledgerId, entryId, status);
@@ -125,17 +125,22 @@ public void handleV2Response(
@Override
public void handleV3Response(
- BookkeeperProtocol.Response response) {
+ Response response) {
perChannelBookieClient.addEntryOutstanding.dec();
- BookkeeperProtocol.AddResponse addResponse = response.getAddResponse();
- BookkeeperProtocol.StatusCode status = response.getStatus() == BookkeeperProtocol.StatusCode.EOK
- ? addResponse.getStatus() : response.getStatus();
- handleResponse(addResponse.getLedgerId(), addResponse.getEntryId(),
- status);
+ StatusCode status;
+ if (response.getStatus() == StatusCode.EOK && response.hasAddResponse()) {
+ status = response.getAddResponse().getStatus();
+ } else {
+ // Error responses (e.g. EUA from a rejected auth handshake) may not
+ // carry an AddResponse with ledgerId/entryId populated. Fall back to
+ // the values we recorded from the outgoing request.
+ status = response.getStatus();
+ }
+ handleResponse(ledgerId, entryId, status);
}
private void handleResponse(long ledgerId, long entryId,
- BookkeeperProtocol.StatusCode status) {
+ StatusCode status) {
logEvent(status).log("Got response from bookie");
int rc = convertStatus(status, BKException.Code.WriteException);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index ed2aa014b18..5bcb44b7ef5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -22,7 +22,6 @@
import static org.apache.bookkeeper.auth.AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME;
-import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
@@ -40,7 +39,6 @@
import org.apache.bookkeeper.auth.BookieAuthProvider;
import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.NettyChannelUtil;
@@ -90,10 +88,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
BookieProtocol.AuthRequest req = (BookieProtocol.AuthRequest) msg;
assert (req.getOpCode() == BookieProtocol.AUTH);
if (checkAuthPlugin(req.getAuthMessage(), ctx.channel())) {
- byte[] payload = req
- .getAuthMessage()
- .getPayload()
- .toByteArray();
+ byte[] payload = req.getAuthMessage().getPayload();
authProvider.process(AuthToken.wrap(payload),
new AuthResponseCallbackLegacy(req, ctx.channel()));
} else {
@@ -114,26 +109,23 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
} else {
ctx.channel().close();
}
- } else if (msg instanceof BookkeeperProtocol.Request) { // post-PB-client
- BookkeeperProtocol.Request req = (BookkeeperProtocol.Request) msg;
- if (req.getHeader().getOperation() == BookkeeperProtocol.OperationType.AUTH
+ } else if (msg instanceof Request) { // post-PB-client
+ Request req = (Request) msg;
+ if (req.getHeader().getOperation() == OperationType.AUTH
&& req.hasAuthRequest()
&& checkAuthPlugin(req.getAuthRequest(), ctx.channel())) {
- byte[] payload = req
- .getAuthRequest()
- .getPayload()
- .toByteArray();
+ byte[] payload = req.getAuthRequest().getPayload();
authProvider.process(AuthToken.wrap(payload),
new AuthResponseCallback(req, ctx.channel(), authProviderFactory.getPluginName()));
- } else if (req.getHeader().getOperation() == BookkeeperProtocol.OperationType.START_TLS
+ } else if (req.getHeader().getOperation() == OperationType.START_TLS
&& req.hasStartTLSRequest()) {
super.channelRead(ctx, msg);
} else {
- BookkeeperProtocol.Response.Builder builder = BookkeeperProtocol.Response.newBuilder()
- .setHeader(req.getHeader())
- .setStatus(BookkeeperProtocol.StatusCode.EUA);
+ Response response = new Response();
+ response.setHeader().copyFrom(req.getHeader());
+ response.setStatus(StatusCode.EUA);
- NettyChannelUtil.writeAndFlushWithVoidPromise(ctx.channel(), builder.build());
+ NettyChannelUtil.writeAndFlushWithVoidPromise(ctx.channel(), response);
}
} else {
// close the channel, junk coming over it
@@ -171,8 +163,9 @@ public void operationComplete(int rc, AuthToken newam) {
channel.close();
return;
}
- AuthMessage message = AuthMessage.newBuilder().setAuthPluginName(req.authMessage.getAuthPluginName())
- .setPayload(ByteString.copyFrom(newam.getData())).build();
+ AuthMessage message = new AuthMessage()
+ .setAuthPluginName(req.authMessage.getAuthPluginName())
+ .setPayload(newam.getData());
final BookieProtocol.AuthResponse response =
new BookieProtocol.AuthResponse(req.getProtocolVersion(), message);
NettyChannelUtil.writeAndFlushWithVoidPromise(channel, response);
@@ -180,11 +173,11 @@ public void operationComplete(int rc, AuthToken newam) {
}
static class AuthResponseCallback implements AuthCallbacks.GenericCallback {
- final BookkeeperProtocol.Request req;
+ final Request req;
final Channel channel;
final String pluginName;
- AuthResponseCallback(BookkeeperProtocol.Request req, Channel channel, String pluginName) {
+ AuthResponseCallback(Request req, Channel channel, String pluginName) {
this.req = req;
this.channel = channel;
this.pluginName = pluginName;
@@ -192,24 +185,21 @@ static class AuthResponseCallback implements AuthCallbacks.GenericCallback();
private final ClientAuthProvider.Factory authProviderFactory;
- private final ExtensionRegistry registry;
private final ClientConfiguration conf;
private final ClientConfiguration v3Conf;
@@ -116,7 +114,6 @@ public BookieClientImpl(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
this.closed = false;
this.closeLock = new ReentrantReadWriteLock();
this.bookieAddressResolver = bookieAddressResolver;
- this.registry = ExtensionRegistry.newInstance();
this.authProviderFactory = AuthProviderFactoryFactory.newClientAuthProviderFactory(conf);
this.statsLogger = statsLogger;
@@ -192,7 +189,7 @@ public PerChannelBookieClient create(BookieId address, PerChannelBookieClientPoo
clientConfiguration = v3Conf;
}
return new PerChannelBookieClient(clientConfiguration, executor, eventLoopGroup, allocator, address,
- statsLoggerForPCBC, authProviderFactory, registry, pcbcPool,
+ statsLoggerForPCBC, authProviderFactory, pcbcPool,
shFactory, bookieAddressResolver);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index bf1ca5168fb..cdb133e3fd8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -21,7 +21,6 @@
package org.apache.bookkeeper.proto;
import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ExtensionRegistry;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
@@ -109,7 +108,6 @@ class BookieNettyServer {
final InetSocketAddress bindAddress;
final BookieAuthProvider.Factory authProviderFactory;
- final ExtensionRegistry registry = ExtensionRegistry.newInstance();
private final ByteBufAllocator allocator;
@@ -382,8 +380,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
- pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.RequestDecoder(registry));
- pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.ResponseEncoder(registry));
+ pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.RequestDecoder());
+ pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.ResponseEncoder());
pipeline.addLast("bookieAuthHandler", new AuthHandler.ServerSideHandler(
contextHandler.getConnectionPeer(), authProviderFactory));
@@ -440,8 +438,8 @@ protected void initChannel(LocalChannel ch) throws Exception {
pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
- pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.RequestDecoder(registry));
- pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.ResponseEncoder(registry));
+ pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.RequestDecoder());
+ pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.ResponseEncoder());
pipeline.addLast("bookieAuthHandler", new AuthHandler.ServerSideHandler(
contextHandler.getConnectionPeer(), authProviderFactory));
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 403813be6e6..374548aa825 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -20,26 +20,17 @@
*/
package org.apache.bookkeeper.proto;
-import com.google.protobuf.CodedOutputStream;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.MessageLite;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.ByteBufInputStream;
-import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;
-import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import lombok.CustomLog;
import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
import org.apache.bookkeeper.proto.checksum.MacDigestManager;
import org.apache.bookkeeper.util.ByteBufList;
@@ -79,14 +70,12 @@ public interface EnDecoder {
* @throws Exception
*/
Object decode(ByteBuf packet) throws Exception;
-
}
/**
* An encoder/decoder for the Bookkeeper protocol before version 3.
*/
public static class RequestEnDeCoderPreV3 implements EnDecoder {
- final ExtensionRegistry extensionRegistry;
//This empty master key is used when an empty password is provided which is the hash of an empty string
private static final byte[] emptyPasswordMasterKey;
@@ -98,10 +87,6 @@ public static class RequestEnDeCoderPreV3 implements EnDecoder {
}
}
- public RequestEnDeCoderPreV3(ExtensionRegistry extensionRegistry) {
- this.extensionRegistry = extensionRegistry;
- }
-
@Override
public Object encode(Object msg, ByteBufAllocator allocator)
throws Exception {
@@ -151,14 +136,13 @@ public Object encode(Object msg, ByteBufAllocator allocator)
r.recycle();
return buf;
} else if (r instanceof BookieProtocol.AuthRequest) {
- BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthRequest) r).getAuthMessage();
+ AuthMessage am = ((BookieProtocol.AuthRequest) r).getAuthMessage();
int totalHeaderSize = 4; // for request type
int totalSize = totalHeaderSize + am.getSerializedSize();
ByteBuf buf = allocator.buffer(totalSize + 4 /* frame size */);
buf.writeInt(totalSize);
buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), r.getFlags()));
- ByteBufOutputStream bufStream = new ByteBufOutputStream(buf);
- am.writeTo(bufStream);
+ am.writeTo(buf);
return buf;
} else {
return msg;
@@ -219,9 +203,10 @@ public Object decode(ByteBuf packet)
requestId, maxCount, maxSize);
}
case BookieProtocol.AUTH:
- BookkeeperProtocol.AuthMessage.Builder builder = BookkeeperProtocol.AuthMessage.newBuilder();
- builder.mergeFrom(new ByteBufInputStream(packet), extensionRegistry);
- return new BookieProtocol.AuthRequest(version, builder.build());
+ AuthMessage am = new AuthMessage();
+ am.parseFrom(packet, packet.readableBytes());
+ am.materialize();
+ return new BookieProtocol.AuthRequest(version, am);
default:
throw new IllegalStateException("Received unknown request op code = " + opCode);
@@ -258,11 +243,6 @@ private static byte[] readMasterKey(ByteBuf packet) {
* A response encoder/decoder for the Bookkeeper protocol before version 3.
*/
public static class ResponseEnDeCoderPreV3 implements EnDecoder {
- final ExtensionRegistry extensionRegistry;
-
- public ResponseEnDeCoderPreV3(ExtensionRegistry extensionRegistry) {
- this.extensionRegistry = extensionRegistry;
- }
private static final int RESPONSE_HEADERS_SIZE = 24;
@@ -340,14 +320,14 @@ public Object encode(Object msg, ByteBufAllocator allocator)
buf.writeLong(r.getEntryId());
return buf;
} else if (msg instanceof BookieProtocol.AuthResponse) {
- BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse) r).getAuthMessage();
+ AuthMessage am = ((BookieProtocol.AuthResponse) r).getAuthMessage();
int payloadSize = 4 + am.getSerializedSize();
int bufferSize = payloadSize + 4 /* frame size */;
ByteBuf buf = allocator.buffer(bufferSize);
buf.writeInt(payloadSize);
buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), (short) 0));
- buf.writeBytes(am.toByteArray());
+ am.writeTo(buf);
return buf;
} else {
log.error().attr("type", msg.getClass().getName()).log("Cannot encode unknown response type");
@@ -400,10 +380,9 @@ public Object decode(ByteBuf buffer)
return new BookieProtocol.BatchedReadResponse(version, rc, ledgerId, entryId, requestId, data == null
? ByteBufList.get() : data.retain());
case BookieProtocol.AUTH:
- ByteBufInputStream bufStream = new ByteBufInputStream(buffer);
- BookkeeperProtocol.AuthMessage.Builder builder = BookkeeperProtocol.AuthMessage.newBuilder();
- builder.mergeFrom(bufStream, extensionRegistry);
- BookkeeperProtocol.AuthMessage am = builder.build();
+ AuthMessage am = new AuthMessage();
+ am.parseFrom(buffer, buffer.readableBytes());
+ am.materialize();
return new BookieProtocol.AuthResponse(version, am);
default:
throw new IllegalStateException("Received unknown response : op code = " + opCode);
@@ -423,21 +402,20 @@ public static void serializeAddResponseInto(int rc, BookieProtocol.ParsedAddRequ
* A request encoder/decoder for the Bookkeeper protocol version 3.
*/
public static class RequestEnDecoderV3 implements EnDecoder {
- final ExtensionRegistry extensionRegistry;
-
- public RequestEnDecoderV3(ExtensionRegistry extensionRegistry) {
- this.extensionRegistry = extensionRegistry;
- }
@Override
public Object decode(ByteBuf packet) throws Exception {
- return BookkeeperProtocol.Request.parseFrom(new ByteBufInputStream(packet), extensionRegistry);
+ Request request = new Request();
+ request.parseFrom(packet, packet.readableBytes());
+ // Lightproto keeps references to the source buffer for lazy field access.
+ // Materialize so the request stays valid after the buffer is released.
+ request.materialize();
+ return request;
}
@Override
public Object encode(Object msg, ByteBufAllocator allocator) throws Exception {
- BookkeeperProtocol.Request request = (BookkeeperProtocol.Request) msg;
- return serializeProtobuf(request, allocator);
+ return serializeProtobuf((Request) msg, allocator);
}
}
@@ -446,49 +424,33 @@ public Object encode(Object msg, ByteBufAllocator allocator) throws Exception {
* A response encoder/decoder for the Bookkeeper protocol version 3.
*/
public static class ResponseEnDecoderV3 implements EnDecoder {
- final ExtensionRegistry extensionRegistry;
-
- public ResponseEnDecoderV3(ExtensionRegistry extensionRegistry) {
- this.extensionRegistry = extensionRegistry;
- }
@Override
public Object decode(ByteBuf packet) throws Exception {
- return BookkeeperProtocol.Response.parseFrom(new ByteBufInputStream(packet),
- extensionRegistry);
+ Response response = new Response();
+ response.parseFrom(packet, packet.readableBytes());
+ // Lightproto keeps references to the source buffer for lazy field access.
+ // Materialize so the response stays valid after the buffer is released.
+ response.materialize();
+ return response;
}
@Override
public Object encode(Object msg, ByteBufAllocator allocator) throws Exception {
- BookkeeperProtocol.Response response = (BookkeeperProtocol.Response) msg;
- return serializeProtobuf(response, allocator);
+ return serializeProtobuf((Response) msg, allocator);
}
}
- private static ByteBuf serializeProtobuf(MessageLite msg, ByteBufAllocator allocator) {
+ private static ByteBuf serializeProtobuf(LightProtoCodec.LightProtoMessage msg, ByteBufAllocator allocator) {
int size = msg.getSerializedSize();
int frameSize = size + 4;
- // Protobuf serialization is the last step of the netty pipeline. We used to allocate
- // a heap buffer while serializing and pass it down to netty library.
- // In AbstractChannel#filterOutboundMessage(), netty copies that data to a direct buffer if
- // it is currently in heap (otherwise skips it and uses it directly).
- // Allocating a direct buffer reducing unnecessary CPU cycles for buffer copies in BK client
- // and also helps alleviate pressure off the GC, since there is less memory churn.
- // Bookies aren't usually CPU bound. This change improves READ_ENTRY code paths by a small factor as well.
+ // Protobuf serialization is the last step of the netty pipeline. We allocate
+ // a direct buffer to avoid an extra copy in netty's AbstractChannel#filterOutboundMessage().
ByteBuf buf = allocator.directBuffer(frameSize, frameSize);
buf.writeInt(size);
-
- try {
- msg.writeTo(CodedOutputStream.newInstance(buf.nioBuffer(buf.writerIndex(), size)));
- } catch (IOException e) {
- // This is in-memory serialization, should not fail
- throw new RuntimeException(e);
- }
-
- // Advance writer idx
- buf.writerIndex(frameSize);
+ msg.writeTo(buf);
return buf;
}
@@ -501,9 +463,9 @@ public static class RequestEncoder extends ChannelOutboundHandlerAdapter {
final EnDecoder reqPreV3;
final EnDecoder reqV3;
- public RequestEncoder(ExtensionRegistry extensionRegistry) {
- reqPreV3 = new RequestEnDeCoderPreV3(extensionRegistry);
- reqV3 = new RequestEnDecoderV3(extensionRegistry);
+ public RequestEncoder() {
+ reqPreV3 = new RequestEnDeCoderPreV3();
+ reqV3 = new RequestEnDecoderV3();
}
@Override
@@ -514,7 +476,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
.log("Encode request to channel");
if (msg instanceof ByteBuf || msg instanceof ByteBufList) {
ctx.write(msg, promise);
- } else if (msg instanceof BookkeeperProtocol.Request) {
+ } else if (msg instanceof Request) {
ctx.write(reqV3.encode(msg, ctx.alloc()), promise);
} else if (msg instanceof BookieProtocol.Request) {
ctx.write(reqPreV3.encode(msg, ctx.alloc()), promise);
@@ -537,9 +499,9 @@ public static class RequestDecoder extends ChannelInboundHandlerAdapter {
final EnDecoder reqV3;
boolean usingV3Protocol;
- RequestDecoder(ExtensionRegistry extensionRegistry) {
- reqPreV3 = new RequestEnDeCoderPreV3(extensionRegistry);
- reqV3 = new RequestEnDecoderV3(extensionRegistry);
+ public RequestDecoder() {
+ reqPreV3 = new RequestEnDeCoderPreV3();
+ reqV3 = new RequestEnDecoderV3();
usingV3Protocol = true;
}
@@ -564,7 +526,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (usingV3Protocol) {
try {
result = reqV3.decode(buffer);
- } catch (InvalidProtocolBufferException e) {
+ } catch (RuntimeException e) {
usingV3Protocol = false;
buffer.resetReaderIndex();
result = reqPreV3.decode(buffer);
@@ -587,9 +549,9 @@ public static class ResponseEncoder extends ChannelOutboundHandlerAdapter {
final EnDecoder repPreV3;
final EnDecoder repV3;
- ResponseEncoder(ExtensionRegistry extensionRegistry) {
- repPreV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
- repV3 = new ResponseEnDecoderV3(extensionRegistry);
+ public ResponseEncoder() {
+ repPreV3 = new ResponseEnDeCoderPreV3();
+ repV3 = new ResponseEnDecoderV3();
}
@Override
@@ -601,7 +563,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
if (msg instanceof ByteBuf) {
ctx.write(msg, promise);
- } else if (msg instanceof BookkeeperProtocol.Response) {
+ } else if (msg instanceof Response) {
ctx.write(repV3.encode(msg, ctx.alloc()), promise);
} else if (msg instanceof BookieProtocol.Response) {
ctx.write(repPreV3.encode(msg, ctx.alloc()), promise);
@@ -626,11 +588,9 @@ public static class ResponseDecoder extends ChannelInboundHandlerAdapter {
final boolean tlsEnabled;
boolean usingV3Protocol;
- ResponseDecoder(ExtensionRegistry extensionRegistry,
- boolean useV2Protocol,
- boolean tlsEnabled) {
- this.repPreV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
- this.repV3 = new ResponseEnDecoderV3(extensionRegistry);
+ public ResponseDecoder(boolean useV2Protocol, boolean tlsEnabled) {
+ this.repPreV3 = new ResponseEnDeCoderPreV3();
+ this.repV3 = new ResponseEnDecoderV3();
this.useV2Protocol = useV2Protocol;
this.tlsEnabled = tlsEnabled;
usingV3Protocol = true;
@@ -667,7 +627,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
usingV3Protocol = false;
log.debug("Degrade bookkeeper to v2 after starting TLS.");
}
- } catch (InvalidProtocolBufferException e) {
+ } catch (RuntimeException e) {
usingV3Protocol = false;
buffer.resetReaderIndex();
result = repPreV3.decode(buffer);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 6a93f8d2cc6..657d7fddb1d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -26,7 +26,6 @@
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
import org.apache.bookkeeper.util.ByteBufList;
/**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
index 999cca3a61d..90f0cd72d83 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
@@ -89,7 +89,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (!(msg instanceof BookkeeperProtocol.Request || msg instanceof BookieProtocol.Request)) {
+ if (!(msg instanceof Request || msg instanceof BookieProtocol.Request)) {
ctx.fireChannelRead(msg);
return;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 762f7220b43..2e1b0ab74fc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -27,7 +27,6 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
@@ -316,11 +315,11 @@ public void processRequest(Object msg, BookieRequestHandler requestHandler) {
Channel channel = requestHandler.ctx().channel();
// If we can decode this packet as a Request protobuf packet, process
// it as a version 3 packet. Else, just use the old protocol.
- if (msg instanceof BookkeeperProtocol.Request) {
- BookkeeperProtocol.Request r = (BookkeeperProtocol.Request) msg;
+ if (msg instanceof Request) {
+ Request r = (Request) msg;
restoreMdcContextFromRequest(r);
try {
- BookkeeperProtocol.BKPacketHeader header = r.getHeader();
+ BKPacketHeader header = r.getHeader();
switch (header.getOperation()) {
case ADD_ENTRY:
processAddRequestV3(r, requestHandler);
@@ -335,16 +334,12 @@ public void processRequest(Object msg, BookieRequestHandler requestHandler) {
log.info()
.attr("clientAddress", channel.remoteAddress())
.log("Ignoring auth operation from client");
- BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage
- .newBuilder()
+ Response authResponse = new Response();
+ authResponse.setHeader().copyFrom(r.getHeader());
+ authResponse.setStatus(StatusCode.EOK)
+ .setAuthResponse()
.setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
- .setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
- .build();
- final BookkeeperProtocol.Response authResponse = BookkeeperProtocol.Response
- .newBuilder().setHeader(r.getHeader())
- .setStatus(BookkeeperProtocol.StatusCode.EOK)
- .setAuthResponse(message)
- .build();
+ .setPayload(AuthToken.NULL.getData());
writeAndFlush(channel, authResponse);
break;
case WRITE_LAC:
@@ -364,10 +359,9 @@ public void processRequest(Object msg, BookieRequestHandler requestHandler) {
break;
default:
log.info().attr("operationType", header.getOperation()).log("Unknown operation type");
- final BookkeeperProtocol.Response response =
- BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
- .setStatus(BookkeeperProtocol.StatusCode.EBADREQ)
- .build();
+ Response response = new Response();
+ response.setHeader().copyFrom(r.getHeader());
+ response.setStatus(StatusCode.EBADREQ);
writeAndFlush(channel, response);
if (statsEnabled) {
bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
@@ -396,11 +390,9 @@ public void processRequest(Object msg, BookieRequestHandler requestHandler) {
case BookieProtocol.AUTH:
log.info().attr("clientAddress", requestHandler.ctx().channel().remoteAddress())
.log("Ignoring auth operation from client");
- BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage
- .newBuilder()
+ AuthMessage message = new AuthMessage()
.setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
- .setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
- .build();
+ .setPayload(AuthToken.NULL.getData());
final BookieProtocol.AuthResponse response = new BookieProtocol.AuthResponse(
BookieProtocol.CURRENT_PROTOCOL_VERSION, message);
@@ -419,36 +411,37 @@ public void processRequest(Object msg, BookieRequestHandler requestHandler) {
}
}
- private void restoreMdcContextFromRequest(BookkeeperProtocol.Request req) {
+ private void restoreMdcContextFromRequest(Request req) {
if (preserveMdcForTaskExecution) {
MDC.clear();
- for (BookkeeperProtocol.ContextPair pair: req.getRequestContextList()) {
+ for (int i = 0; i < req.getRequestContextsCount(); i++) {
+ ContextPair pair = req.getRequestContextAt(i);
MDC.put(pair.getKey(), pair.getValue());
}
}
}
- private void processWriteLacRequestV3(final BookkeeperProtocol.Request r,
+ private void processWriteLacRequestV3(final Request r,
final BookieRequestHandler requestHandler) {
WriteLacProcessorV3 writeLac = new WriteLacProcessorV3(r, requestHandler, this);
if (null == writeThreadPool) {
writeLac.run();
} else {
- writeThreadPool.executeOrdered(r.getAddRequest().getLedgerId(), writeLac);
+ writeThreadPool.executeOrdered(r.getWriteLacRequest().getLedgerId(), writeLac);
}
}
- private void processReadLacRequestV3(final BookkeeperProtocol.Request r,
+ private void processReadLacRequestV3(final Request r,
final BookieRequestHandler requestHandler) {
ReadLacProcessorV3 readLac = new ReadLacProcessorV3(r, requestHandler, this);
if (null == readThreadPool) {
readLac.run();
} else {
- readThreadPool.executeOrdered(r.getAddRequest().getLedgerId(), readLac);
+ readThreadPool.executeOrdered(r.getReadLacRequest().getLedgerId(), readLac);
}
}
- private void processAddRequestV3(final BookkeeperProtocol.Request r, final BookieRequestHandler requestHandler) {
+ private void processAddRequestV3(final Request r, final BookieRequestHandler requestHandler) {
WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, requestHandler, this);
final OrderedExecutor threadPool;
@@ -468,21 +461,19 @@ private void processAddRequestV3(final BookkeeperProtocol.Request r, final Booki
.attr("entryId", () -> r.getAddRequest().getEntryId())
.log("Failed to process request to add entry. Too many pending requests");
getRequestStats().getAddEntryRejectedCounter().inc();
- BookkeeperProtocol.AddResponse.Builder addResponse = BookkeeperProtocol.AddResponse.newBuilder()
+ Response resp = new Response();
+ resp.setHeader().copyFrom(write.getHeader());
+ resp.setStatus(StatusCode.ETOOMANYREQUESTS);
+ resp.setAddResponse()
.setLedgerId(r.getAddRequest().getLedgerId())
.setEntryId(r.getAddRequest().getEntryId())
- .setStatus(BookkeeperProtocol.StatusCode.ETOOMANYREQUESTS);
- BookkeeperProtocol.Response.Builder response = BookkeeperProtocol.Response.newBuilder()
- .setHeader(write.getHeader())
- .setStatus(addResponse.getStatus())
- .setAddResponse(addResponse);
- BookkeeperProtocol.Response resp = response.build();
- write.sendResponse(addResponse.getStatus(), resp, requestStats.getAddRequestStats());
+ .setStatus(StatusCode.ETOOMANYREQUESTS);
+ write.sendResponse(StatusCode.ETOOMANYREQUESTS, resp, requestStats.getAddRequestStats());
}
}
}
- private void processForceLedgerRequestV3(final BookkeeperProtocol.Request r,
+ private void processForceLedgerRequestV3(final Request r,
final BookieRequestHandler requestHandler) {
ForceLedgerProcessorV3 forceLedger = new ForceLedgerProcessorV3(r, requestHandler, this);
@@ -501,24 +492,21 @@ private void processForceLedgerRequestV3(final BookkeeperProtocol.Request r,
} catch (RejectedExecutionException e) {
log.debug().attr("ledgerId", () -> r.getForceLedgerRequest().getLedgerId())
.log("Failed to process request to force ledger. Too many pending requests");
- BookkeeperProtocol.ForceLedgerResponse.Builder forceLedgerResponse =
- BookkeeperProtocol.ForceLedgerResponse.newBuilder()
+ Response resp = new Response();
+ resp.setHeader().copyFrom(forceLedger.getHeader());
+ resp.setStatus(StatusCode.ETOOMANYREQUESTS);
+ resp.setForceLedgerResponse()
.setLedgerId(r.getForceLedgerRequest().getLedgerId())
- .setStatus(BookkeeperProtocol.StatusCode.ETOOMANYREQUESTS);
- BookkeeperProtocol.Response.Builder response = BookkeeperProtocol.Response.newBuilder()
- .setHeader(forceLedger.getHeader())
- .setStatus(forceLedgerResponse.getStatus())
- .setForceLedgerResponse(forceLedgerResponse);
- BookkeeperProtocol.Response resp = response.build();
+ .setStatus(StatusCode.ETOOMANYREQUESTS);
forceLedger.sendResponse(
- forceLedgerResponse.getStatus(),
+ StatusCode.ETOOMANYREQUESTS,
resp,
requestStats.getForceLedgerRequestStats());
}
}
}
- private void processReadRequestV3(final BookkeeperProtocol.Request r, final BookieRequestHandler requestHandler) {
+ private void processReadRequestV3(final Request r, final BookieRequestHandler requestHandler) {
ExecutorService fenceThread = null == highPriorityThreadPool ? null :
highPriorityThreadPool.chooseThread(requestHandler.ctx());
@@ -537,7 +525,7 @@ private void processReadRequestV3(final BookkeeperProtocol.Request r, final Book
// gets executed as fast as possible, so bypass the normal readThreadPool
// and execute in highPriorityThreadPool
boolean isHighPriority = RequestUtils.isHighPriority(r)
- || hasFlag(r.getReadRequest(), BookkeeperProtocol.ReadRequest.Flag.FENCE_LEDGER);
+ || hasFlag(r.getReadRequest(), ReadRequest.Flag.FENCE_LEDGER);
if (isHighPriority) {
threadPool = highPriorityThreadPool;
} else {
@@ -555,35 +543,32 @@ private void processReadRequestV3(final BookkeeperProtocol.Request r, final Book
.attr("entryId", () -> r.getReadRequest().getEntryId())
.log("Failed to process request to read entry. Too many pending requests");
getRequestStats().getReadEntryRejectedCounter().inc();
- BookkeeperProtocol.ReadResponse.Builder readResponse = BookkeeperProtocol.ReadResponse.newBuilder()
- .setLedgerId(r.getReadRequest().getLedgerId())
- .setEntryId(r.getReadRequest().getEntryId())
- .setStatus(BookkeeperProtocol.StatusCode.ETOOMANYREQUESTS);
- BookkeeperProtocol.Response.Builder response = BookkeeperProtocol.Response.newBuilder()
- .setHeader(read.getHeader())
- .setStatus(readResponse.getStatus())
- .setReadResponse(readResponse);
- BookkeeperProtocol.Response resp = response.build();
- read.sendResponse(readResponse.getStatus(), resp, requestStats.getReadRequestStats());
+ Response resp = new Response();
+ resp.setHeader().copyFrom(read.getHeader());
+ resp.setStatus(StatusCode.ETOOMANYREQUESTS);
+ resp.setReadResponse()
+ .setLedgerId(r.getReadRequest().getLedgerId())
+ .setEntryId(r.getReadRequest().getEntryId())
+ .setStatus(StatusCode.ETOOMANYREQUESTS);
+ read.sendResponse(StatusCode.ETOOMANYREQUESTS, resp, requestStats.getReadRequestStats());
onReadRequestFinish();
}
}
}
- private void processStartTLSRequestV3(final BookkeeperProtocol.Request r,
+ private void processStartTLSRequestV3(final Request r,
final BookieRequestHandler requestHandler) {
- BookkeeperProtocol.Response.Builder response = BookkeeperProtocol.Response.newBuilder();
- BookkeeperProtocol.BKPacketHeader.Builder header = BookkeeperProtocol.BKPacketHeader.newBuilder();
- header.setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE);
- header.setOperation(r.getHeader().getOperation());
- header.setTxnId(r.getHeader().getTxnId());
- response.setHeader(header.build());
+ Response response = new Response();
+ response.setHeader()
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(r.getHeader().getOperation())
+ .setTxnId(r.getHeader().getTxnId());
final Channel c = requestHandler.ctx().channel();
if (shFactory == null) {
log.error("Got StartTLS request but TLS not configured");
- response.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
- writeAndFlush(c, response.build());
+ response.setStatus(StatusCode.EBADREQ);
+ writeAndFlush(c, response);
} else {
log.info().attr("channel", c).log("Starting TLS handshake with client");
// there is no need to execute in a different thread as this operation is light
@@ -595,9 +580,8 @@ private void processStartTLSRequestV3(final BookkeeperProtocol.Request r,
c.pipeline().addFirst(TLS_HANDLER_NAME, sslHandler);
}
- response.setStatus(BookkeeperProtocol.StatusCode.EOK);
- BookkeeperProtocol.StartTLSResponse.Builder builder = BookkeeperProtocol.StartTLSResponse.newBuilder();
- response.setStartTLSResponse(builder.build());
+ response.setStatus(StatusCode.EOK);
+ response.setStartTLSResponse();
sslHandler.handshakeFuture().addListener(new GenericFutureListener>() {
@Override
public void operationComplete(Future future) throws Exception {
@@ -619,10 +603,9 @@ public void operationComplete(Future future) throws Exception {
} else {
log.error().exception(future.cause()).log("TLS Handshake failure");
}
- final BookkeeperProtocol.Response errResponse = BookkeeperProtocol.Response.newBuilder()
- .setHeader(r.getHeader())
- .setStatus(BookkeeperProtocol.StatusCode.EIO)
- .build();
+ Response errResponse = new Response();
+ errResponse.setHeader().copyFrom(r.getHeader());
+ errResponse.setStatus(StatusCode.EIO);
writeAndFlush(c, errResponse);
if (statsEnabled) {
bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
@@ -630,11 +613,11 @@ public void operationComplete(Future future) throws Exception {
}
}
});
- writeAndFlush(c, response.build());
+ writeAndFlush(c, response);
}
}
- private void processGetBookieInfoRequestV3(final BookkeeperProtocol.Request r,
+ private void processGetBookieInfoRequestV3(final Request r,
final BookieRequestHandler requestHandler) {
GetBookieInfoProcessorV3 getBookieInfo = new GetBookieInfoProcessorV3(r, requestHandler, this);
if (null == readThreadPool) {
@@ -644,7 +627,7 @@ private void processGetBookieInfoRequestV3(final BookkeeperProtocol.Request r,
}
}
- private void processGetListOfEntriesOfLedgerProcessorV3(final BookkeeperProtocol.Request r,
+ private void processGetListOfEntriesOfLedgerProcessorV3(final Request r,
final BookieRequestHandler requestHandler) {
GetListOfEntriesOfLedgerProcessorV3 getListOfEntriesOfLedger =
new GetListOfEntriesOfLedgerProcessorV3(r, requestHandler, this);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java
deleted file mode 100644
index b26ac7b36ab..00000000000
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
-
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.proto;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.UnsafeByteOperations;
-import io.netty.buffer.ByteBuf;
-import java.nio.ByteBuffer;
-import org.apache.bookkeeper.util.ByteBufList;
-
-public class ByteStringUtil {
-
- /**
- * Wrap the internal buffers of a ByteBufList into a single ByteString.
- * The lifecycle of the wrapped ByteString is tied to the ByteBufList.
- *
- * @param bufList ByteBufList to wrap
- * @return ByteString wrapping the internal buffers of the ByteBufList
- */
- public static ByteString byteBufListToByteString(ByteBufList bufList) {
- ByteString aggregated = null;
- for (int i = 0; i < bufList.size(); i++) {
- ByteBuf buffer = bufList.getBuffer(i);
- if (buffer.readableBytes() > 0) {
- aggregated = byteBufToByteString(aggregated, buffer);
- }
- }
- return aggregated != null ? aggregated : ByteString.EMPTY;
- }
-
- /**
- * Wrap the internal buffers of a ByteBuf into a single ByteString.
- * The lifecycle of the wrapped ByteString is tied to the ByteBuf.
- *
- * @param byteBuf ByteBuf to wrap
- * @return ByteString wrapping the internal buffers of the ByteBuf
- */
- public static ByteString byteBufToByteString(ByteBuf byteBuf) {
- return byteBufToByteString(null, byteBuf);
- }
-
- // internal method to aggregate a ByteBuf into a single aggregated ByteString
- private static ByteString byteBufToByteString(ByteString aggregated, ByteBuf byteBuf) {
- if (byteBuf.readableBytes() == 0) {
- return ByteString.EMPTY;
- }
- if (byteBuf.nioBufferCount() > 1) {
- for (ByteBuffer nioBuffer : byteBuf.nioBuffers()) {
- ByteString piece = UnsafeByteOperations.unsafeWrap(nioBuffer);
- aggregated = (aggregated == null) ? piece : aggregated.concat(piece);
- }
- } else {
- ByteString piece;
- if (byteBuf.hasArray()) {
- piece = UnsafeByteOperations.unsafeWrap(byteBuf.array(), byteBuf.arrayOffset() + byteBuf.readerIndex(),
- byteBuf.readableBytes());
- } else {
- piece = UnsafeByteOperations.unsafeWrap(byteBuf.nioBuffer());
- }
- aggregated = (aggregated == null) ? piece : aggregated.concat(piece);
- }
- return aggregated;
- }
-}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionKey.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionKey.java
index 0d543676e54..1fb2564be6b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionKey.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionKey.java
@@ -21,7 +21,6 @@
package org.apache.bookkeeper.proto;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
abstract class CompletionKey {
OperationType operationType;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionValue.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionValue.java
index 18bc7206bab..c01103fdcfb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionValue.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionValue.java
@@ -89,7 +89,7 @@ void timeout() {
TimeUnit.NANOSECONDS);
errorOut(BKException.Code.TimeoutException);
}
- protected Event logEvent(BookkeeperProtocol.StatusCode status) {
+ protected Event logEvent(StatusCode status) {
return log.debug()
.attr("operation", operationName)
.attr("bookieId", perChannelBookieClient.bookieId)
@@ -98,7 +98,7 @@ protected Event logEvent(BookkeeperProtocol.StatusCode status) {
.attr("status", status);
}
- protected int convertStatus(BookkeeperProtocol.StatusCode status, int defaultStatus) {
+ protected int convertStatus(StatusCode status, int defaultStatus) {
// convert to BKException code
int rcToRet = statusCodeToExceptionCode(status);
if (rcToRet == BKException.Code.UNINITIALIZED) {
@@ -119,7 +119,7 @@ protected int convertStatus(BookkeeperProtocol.StatusCode status, int defaultSta
* @param status
* @return {@link BKException.Code.UNINITIALIZED} if the statuscode is unknown.
*/
- private int statusCodeToExceptionCode(BookkeeperProtocol.StatusCode status) {
+ private int statusCodeToExceptionCode(StatusCode status) {
switch (status) {
case EOK:
return BKException.Code.OK;
@@ -171,13 +171,13 @@ protected void errorOutAndRunCallback(final Runnable callback) {
}
public void handleV2Response(
- long ledgerId, long entryId, BookkeeperProtocol.StatusCode status,
+ long ledgerId, long entryId, StatusCode status,
BookieProtocol.Response response) {
log.warn().attr("response", response).log("Unhandled V2 response");
}
public abstract void handleV3Response(
- BookkeeperProtocol.Response response);
+ Response response);
public void release() {}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/EntryCompletionKey.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/EntryCompletionKey.java
index 09f6413f7f3..f14fbeb2c98 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/EntryCompletionKey.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/EntryCompletionKey.java
@@ -29,7 +29,7 @@ class EntryCompletionKey extends CompletionKey {
long entryId;
static EntryCompletionKey acquireV2Key(long ledgerId, long entryId,
- BookkeeperProtocol.OperationType operationType) {
+ OperationType operationType) {
EntryCompletionKey key = V2_KEY_RECYCLER.get();
key.reset(ledgerId, entryId, operationType);
return key;
@@ -40,7 +40,7 @@ private EntryCompletionKey(Recycler.Handle handle) {
this.recyclerHandle = handle;
}
- void reset(long ledgerId, long entryId, BookkeeperProtocol.OperationType operationType) {
+ void reset(long ledgerId, long entryId, OperationType operationType) {
this.ledgerId = ledgerId;
this.entryId = entryId;
this.operationType = operationType;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerCompletion.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerCompletion.java
index 0e964c06589..34bd0384fb9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerCompletion.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerCompletion.java
@@ -55,11 +55,15 @@ public void errorOut(final int rc) {
}
@Override
- public void handleV3Response(BookkeeperProtocol.Response response) {
- BookkeeperProtocol.ForceLedgerResponse forceLedgerResponse = response.getForceLedgerResponse();
- BookkeeperProtocol.StatusCode status = response.getStatus() == BookkeeperProtocol.StatusCode.EOK
- ? forceLedgerResponse.getStatus() : response.getStatus();
- long ledgerId = forceLedgerResponse.getLedgerId();
+ public void handleV3Response(Response response) {
+ StatusCode status;
+ if (response.getStatus() == StatusCode.EOK && response.hasForceLedgerResponse()) {
+ status = response.getForceLedgerResponse().getStatus();
+ } else {
+ // Error responses may not carry a populated ForceLedgerResponse;
+ // fall back to the request's recorded ledgerId.
+ status = response.getStatus();
+ }
logEvent(status).log("Got response from bookie");
int rc = convertStatus(status, BKException.Code.WriteException);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
index 0c1a55e02ab..ef0fe82c8fb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
@@ -27,11 +27,6 @@
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerResponse;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
@CustomLog
@@ -48,11 +43,11 @@ private ForceLedgerResponse getForceLedgerResponse() {
ForceLedgerRequest forceLedgerRequest = request.getForceLedgerRequest();
long ledgerId = forceLedgerRequest.getLedgerId();
- final ForceLedgerResponse.Builder forceLedgerResponse = ForceLedgerResponse.newBuilder().setLedgerId(ledgerId);
+ final ForceLedgerResponse forceLedgerResponse = new ForceLedgerResponse().setLedgerId(ledgerId);
if (!isVersionCompatible()) {
forceLedgerResponse.setStatus(StatusCode.EBADVERSION);
- return forceLedgerResponse.build();
+ return forceLedgerResponse;
}
BookkeeperInternalCallbacks.WriteCallback wcb =
@@ -87,11 +82,10 @@ private ForceLedgerResponse getForceLedgerResponse() {
break;
}
forceLedgerResponse.setStatus(status);
- Response.Builder response = Response.newBuilder()
- .setHeader(getHeader())
- .setStatus(forceLedgerResponse.getStatus())
- .setForceLedgerResponse(forceLedgerResponse);
- Response resp = response.build();
+ Response resp = new Response();
+ resp.setHeader().copyFrom(getHeader());
+ resp.setStatus(forceLedgerResponse.getStatus());
+ resp.setForceLedgerResponse().copyFrom(forceLedgerResponse);
sendResponse(status, resp, requestProcessor.getRequestStats().getForceLedgerRequestStats());
};
StatusCode status = null;
@@ -111,7 +105,7 @@ private ForceLedgerResponse getForceLedgerResponse() {
// doesn't return a response back to the caller.
if (!status.equals(StatusCode.EOK)) {
forceLedgerResponse.setStatus(status);
- return forceLedgerResponse.build();
+ return forceLedgerResponse;
}
return null;
}
@@ -120,11 +114,10 @@ private ForceLedgerResponse getForceLedgerResponse() {
public void run() {
ForceLedgerResponse forceLedgerResponse = getForceLedgerResponse();
if (null != forceLedgerResponse) {
- Response.Builder response = Response.newBuilder()
- .setHeader(getHeader())
- .setStatus(forceLedgerResponse.getStatus())
- .setForceLedgerResponse(forceLedgerResponse);
- Response resp = response.build();
+ Response resp = new Response();
+ resp.setHeader().copyFrom(getHeader());
+ resp.setStatus(forceLedgerResponse.getStatus());
+ resp.setForceLedgerResponse().copyFrom(forceLedgerResponse);
sendResponse(
forceLedgerResponse.getStatus(),
resp,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoCompletion.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoCompletion.java
index 83feb969f1c..497bdf347e2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoCompletion.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoCompletion.java
@@ -64,9 +64,9 @@ public void errorOut(final int rc) {
}
@Override
- public void handleV3Response(BookkeeperProtocol.Response response) {
- BookkeeperProtocol.GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse();
- BookkeeperProtocol.StatusCode status = response.getStatus() == BookkeeperProtocol.StatusCode.EOK
+ public void handleV3Response(Response response) {
+ GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse();
+ StatusCode status = response.getStatus() == StatusCode.EOK
? getBookieInfoResponse.getStatus() : response.getStatus();
long freeDiskSpace = getBookieInfoResponse.getFreeDiskSpace();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
index 875a91fe012..6f7a862a513 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
@@ -24,11 +24,6 @@
import java.util.concurrent.TimeUnit;
import lombok.CustomLog;
import org.apache.bookkeeper.common.util.MathUtils;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoResponse;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
/**
* A processor class for v3 bookie metadata packets.
@@ -46,13 +41,13 @@ private GetBookieInfoResponse getGetBookieInfoResponse() {
GetBookieInfoRequest getBookieInfoRequest = request.getGetBookieInfoRequest();
long requested = getBookieInfoRequest.getRequested();
- GetBookieInfoResponse.Builder getBookieInfoResponse = GetBookieInfoResponse.newBuilder();
+ GetBookieInfoResponse getBookieInfoResponse = new GetBookieInfoResponse();
if (!isVersionCompatible()) {
getBookieInfoResponse.setStatus(StatusCode.EBADVERSION);
requestProcessor.getRequestStats().getGetBookieInfoStats()
.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
- return getBookieInfoResponse.build();
+ return getBookieInfoResponse;
}
log.debug().attr("request", request).log("Received new getBookieInfo request");
@@ -81,7 +76,7 @@ private GetBookieInfoResponse getGetBookieInfoResponse() {
}
getBookieInfoResponse.setStatus(status);
- return getBookieInfoResponse.build();
+ return getBookieInfoResponse;
}
@Override
@@ -91,12 +86,11 @@ public void run() {
}
private void sendResponse(GetBookieInfoResponse getBookieInfoResponse) {
- Response.Builder response = Response.newBuilder()
- .setHeader(getHeader())
- .setStatus(getBookieInfoResponse.getStatus())
- .setGetBookieInfoResponse(getBookieInfoResponse);
- sendResponse(response.getStatus(),
- response.build(),
+ Response response = new Response();
+ response.setHeader().copyFrom(getHeader());
+ response.setStatus(getBookieInfoResponse.getStatus());
+ response.setGetBookieInfoResponse().copyFrom(getBookieInfoResponse);
+ sendResponse(response.getStatus(), response,
requestProcessor.getRequestStats().getGetBookieInfoRequestStats());
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerCompletion.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerCompletion.java
index d500604ae86..4b0cedaba05 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerCompletion.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerCompletion.java
@@ -56,17 +56,17 @@ public void errorOut(final int rc) {
}
@Override
- public void handleV3Response(BookkeeperProtocol.Response response) {
- BookkeeperProtocol.GetListOfEntriesOfLedgerResponse getListOfEntriesOfLedgerResponse = response
+ public void handleV3Response(Response response) {
+ GetListOfEntriesOfLedgerResponse getListOfEntriesOfLedgerResponse = response
.getGetListOfEntriesOfLedgerResponse();
ByteBuf availabilityOfEntriesOfLedgerBuffer = Unpooled.EMPTY_BUFFER;
- BookkeeperProtocol.StatusCode status =
- response.getStatus() == BookkeeperProtocol.StatusCode.EOK ? getListOfEntriesOfLedgerResponse.getStatus()
+ StatusCode status =
+ response.getStatus() == StatusCode.EOK ? getListOfEntriesOfLedgerResponse.getStatus()
: response.getStatus();
if (getListOfEntriesOfLedgerResponse.hasAvailabilityOfEntriesOfLedger()) {
- availabilityOfEntriesOfLedgerBuffer = Unpooled.wrappedBuffer(
- getListOfEntriesOfLedgerResponse.getAvailabilityOfEntriesOfLedger().asReadOnlyByteBuffer());
+ availabilityOfEntriesOfLedgerBuffer =
+ getListOfEntriesOfLedgerResponse.getAvailabilityOfEntriesOfLedgerSlice();
}
logEvent(status).log("Got response from bookie");
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java
index b54a368f09c..d8185838003 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java
@@ -20,17 +20,11 @@
*/
package org.apache.bookkeeper.proto;
-import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import lombok.CustomLog;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.common.util.MathUtils;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.GetListOfEntriesOfLedgerRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.GetListOfEntriesOfLedgerResponse;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
/**
@@ -51,15 +45,14 @@ public GetListOfEntriesOfLedgerProcessorV3(Request request, BookieRequestHandler
private GetListOfEntriesOfLedgerResponse getListOfEntriesOfLedgerResponse() {
long startTimeNanos = MathUtils.nowInNano();
- GetListOfEntriesOfLedgerResponse.Builder getListOfEntriesOfLedgerResponse = GetListOfEntriesOfLedgerResponse
- .newBuilder();
+ GetListOfEntriesOfLedgerResponse getListOfEntriesOfLedgerResponse = new GetListOfEntriesOfLedgerResponse();
getListOfEntriesOfLedgerResponse.setLedgerId(ledgerId);
if (!isVersionCompatible()) {
getListOfEntriesOfLedgerResponse.setStatus(StatusCode.EBADVERSION);
requestProcessor.getRequestStats().getGetListOfEntriesOfLedgerStats()
.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
- return getListOfEntriesOfLedgerResponse.build();
+ return getListOfEntriesOfLedgerResponse;
}
log.debug().attr("request", request).log("Received new getListOfEntriesOfLedger request");
@@ -69,7 +62,7 @@ private GetListOfEntriesOfLedgerResponse getListOfEntriesOfLedgerResponse() {
availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(
requestProcessor.bookie.getListOfEntriesOfLedger(ledgerId));
getListOfEntriesOfLedgerResponse.setAvailabilityOfEntriesOfLedger(
- ByteString.copyFrom(availabilityOfEntriesOfLedger.serializeStateOfEntriesOfLedger()));
+ availabilityOfEntriesOfLedger.serializeStateOfEntriesOfLedger());
} catch (Bookie.NoLedgerException e) {
status = StatusCode.ENOLEDGER;
@@ -93,17 +86,17 @@ private GetListOfEntriesOfLedgerResponse getListOfEntriesOfLedgerResponse() {
}
// Finally set the status and return
getListOfEntriesOfLedgerResponse.setStatus(status);
- return getListOfEntriesOfLedgerResponse.build();
+ return getListOfEntriesOfLedgerResponse;
}
@Override
public void run() {
GetListOfEntriesOfLedgerResponse listOfEntriesOfLedgerResponse = getListOfEntriesOfLedgerResponse();
- Response.Builder response = Response.newBuilder().setHeader(getHeader())
- .setStatus(listOfEntriesOfLedgerResponse.getStatus())
- .setGetListOfEntriesOfLedgerResponse(listOfEntriesOfLedgerResponse);
- Response resp = response.build();
- sendResponse(listOfEntriesOfLedgerResponse.getStatus(), resp,
+ Response response = new Response();
+ response.setHeader().copyFrom(getHeader());
+ response.setStatus(listOfEntriesOfLedgerResponse.getStatus());
+ response.setGetListOfEntriesOfLedgerResponse().copyFrom(listOfEntriesOfLedgerResponse);
+ sendResponse(listOfEntriesOfLedgerResponse.getStatus(), response,
requestProcessor.getRequestStats().getListOfEntriesOfLedgerRequestStats);
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
index 67c82dda977..7e886b6e066 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
@@ -31,9 +31,6 @@
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.bookkeeper.common.util.Watcher;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
/**
* Processor handling long poll read entry request.
@@ -74,7 +71,7 @@ private synchronized boolean shouldReadEntry() {
}
@Override
- protected ReadResponse readEntry(ReadResponse.Builder readResponseBuilder,
+ protected ReadResponse readEntry(ReadResponse readResponseBuilder,
long entryId,
Stopwatch startTimeSw)
throws IOException, BookieException {
@@ -129,10 +126,10 @@ protected ReadResponse readEntry(ReadResponse.Builder readResponseBuilder,
}
private ReadResponse buildErrorResponse(StatusCode statusCode, Stopwatch sw) {
- ReadResponse.Builder builder = ReadResponse.newBuilder()
+ ReadResponse readResponse = new ReadResponse()
.setLedgerId(ledgerId)
.setEntryId(entryId);
- return buildResponse(builder, statusCode, sw);
+ return buildResponse(readResponse, statusCode, sw);
}
private ReadResponse getLongPollReadResponse() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
index b9f257ceca9..7d234c5ad3c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
@@ -26,10 +26,6 @@
import java.util.concurrent.TimeUnit;
import lombok.CustomLog;
import org.apache.bookkeeper.common.util.MathUtils;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.util.StringUtils;
@@ -125,11 +121,10 @@ protected boolean isVersionCompatible() {
* @return
*/
protected BKPacketHeader getHeader() {
- BKPacketHeader.Builder header = BKPacketHeader.newBuilder();
- header.setVersion(ProtocolVersion.VERSION_THREE);
- header.setOperation(request.getHeader().getOperation());
- header.setTxnId(request.getHeader().getTxnId());
- return header.build();
+ return new BKPacketHeader()
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(request.getHeader().getOperation())
+ .setTxnId(request.getHeader().getTxnId());
}
@Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index c68f7929646..e6380bff599 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -19,13 +19,11 @@
package org.apache.bookkeeper.proto;
import com.google.common.collect.Sets;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.UnsafeByteOperations;
import io.github.merlimat.slog.Logger;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -106,19 +104,6 @@
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.GetListOfEntriesOfLedgerRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -330,35 +315,33 @@ enum ConnectionState {
private final PerChannelBookieClientPool pcbcPool;
private final ClientAuthProvider.Factory authProviderFactory;
- private final ExtensionRegistry extRegistry;
private final SecurityHandlerFactory shFactory;
private volatile boolean isWritable = true;
private long lastBookieUnavailableLogTimestamp = 0;
public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup,
BookieId addr, BookieAddressResolver bookieAddressResolver) throws SecurityException {
- this(new ClientConfiguration(), executor, eventLoopGroup, addr, NullStatsLogger.INSTANCE, null, null,
+ this(new ClientConfiguration(), executor, eventLoopGroup, addr, NullStatsLogger.INSTANCE, null,
null, bookieAddressResolver);
}
public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup,
BookieId bookieId,
ClientAuthProvider.Factory authProviderFactory,
- ExtensionRegistry extRegistry, BookieAddressResolver bookieAddressResolver)
+ BookieAddressResolver bookieAddressResolver)
throws SecurityException {
this(new ClientConfiguration(), executor, eventLoopGroup, bookieId,
NullStatsLogger.INSTANCE,
- authProviderFactory, extRegistry, null, bookieAddressResolver);
+ authProviderFactory, null, bookieAddressResolver);
}
public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor,
EventLoopGroup eventLoopGroup, BookieId bookieId,
StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory,
- ExtensionRegistry extRegistry,
PerChannelBookieClientPool pcbcPool, BookieAddressResolver bookieAddressResolver)
throws SecurityException {
this(conf, executor, eventLoopGroup, UnpooledByteBufAllocator.DEFAULT, bookieId, NullStatsLogger.INSTANCE,
- authProviderFactory, extRegistry, pcbcPool, null, bookieAddressResolver);
+ authProviderFactory, pcbcPool, null, bookieAddressResolver);
}
public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor,
@@ -366,7 +349,6 @@ public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor
ByteBufAllocator allocator,
BookieId bookieId,
StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory,
- ExtensionRegistry extRegistry,
PerChannelBookieClientPool pcbcPool,
SecurityHandlerFactory shFactory,
BookieAddressResolver bookieAddressResolver) throws SecurityException {
@@ -395,7 +377,6 @@ public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor
this.preserveMdcForTaskExecution = conf.getPreserveMdcForTaskExecution();
this.authProviderFactory = authProviderFactory;
- this.extRegistry = extRegistry;
this.shFactory = shFactory;
if (shFactory != null) {
shFactory.init(NodeType.Client, conf, allocator);
@@ -609,10 +590,10 @@ protected void initChannel(Channel ch) throws Exception {
pipeline.addLast("bytebufList", ByteBufList.ENCODER);
pipeline.addLast("lengthbasedframedecoder",
new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
- pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder(extRegistry));
+ pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder());
pipeline.addLast(
"bookieProtoDecoder",
- new BookieProtoEncoding.ResponseDecoder(extRegistry, useV2WireProtocol, shFactory != null));
+ new BookieProtoEncoding.ResponseDecoder(useV2WireProtocol, shFactory != null));
pipeline.addLast("authHandler", new AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator,
connectionPeer, useV2WireProtocol));
pipeline.addLast("mainhandler", PerChannelBookieClient.this);
@@ -710,26 +691,21 @@ void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteB
ctx, ledgerId, this));
// Build the request
- BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ Request writeLacRequest = new Request();
+ writeLacRequest.setHeader()
.setVersion(ProtocolVersion.VERSION_THREE)
.setOperation(OperationType.WRITE_LAC)
.setTxnId(txnId);
- ByteString body = ByteStringUtil.byteBufListToByteString(toSend);
- toSend.retain();
- Runnable cleanupActionFailedBeforeWrite = toSend::release;
- Runnable cleanupActionAfterWrite = cleanupActionFailedBeforeWrite;
- WriteLacRequest.Builder writeLacBuilder = WriteLacRequest.newBuilder()
+ ByteBuf body = toSend.toByteBuf(PooledByteBufAllocator.DEFAULT);
+ Runnable releaseBody = body::release;
+ writeLacRequest.setWriteLacRequest()
.setLedgerId(ledgerId)
.setLac(lac)
- .setMasterKey(UnsafeByteOperations.unsafeWrap(masterKey))
+ .setMasterKey(masterKey)
.setBody(body);
+ withRequestContext(writeLacRequest);
- final Request writeLacRequest = withRequestContext(Request.newBuilder())
- .setHeader(headerBuilder)
- .setWriteLacRequest(writeLacBuilder)
- .build();
- writeAndFlush(channel, completionKey, writeLacRequest, false, cleanupActionFailedBeforeWrite,
- cleanupActionAfterWrite);
+ writeAndFlush(channel, completionKey, writeLacRequest, false, releaseBody, releaseBody);
}
void forceLedger(final long ledgerId, ForceLedgerCallback cb, Object ctx) {
@@ -749,17 +725,13 @@ void forceLedger(final long ledgerId, ForceLedgerCallback cb, Object ctx) {
ctx, ledgerId, this));
// Build the request
- BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ Request forceLedgerRequest = new Request();
+ forceLedgerRequest.setHeader()
.setVersion(ProtocolVersion.VERSION_THREE)
.setOperation(OperationType.FORCE_LEDGER)
.setTxnId(txnId);
- ForceLedgerRequest.Builder writeLacBuilder = ForceLedgerRequest.newBuilder()
- .setLedgerId(ledgerId);
-
- final Request forceLedgerRequest = withRequestContext(Request.newBuilder())
- .setHeader(headerBuilder)
- .setForceLedgerRequest(writeLacBuilder)
- .build();
+ forceLedgerRequest.setForceLedgerRequest().setLedgerId(ledgerId);
+ withRequestContext(forceLedgerRequest);
writeAndFlush(channel, completionKey, forceLedgerRequest);
}
@@ -813,38 +785,36 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen
completionKey = new TxnCompletionKey(txnId, OperationType.ADD_ENTRY);
// Build the request and calculate the total size to be included in the packet.
- BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ Request addEntryRequest = new Request();
+ BKPacketHeader header = addEntryRequest.setHeader()
.setVersion(ProtocolVersion.VERSION_THREE)
.setOperation(OperationType.ADD_ENTRY)
.setTxnId(txnId);
if (((short) options & BookieProtocol.FLAG_HIGH_PRIORITY) == BookieProtocol.FLAG_HIGH_PRIORITY) {
- headerBuilder.setPriority(DEFAULT_HIGH_PRIORITY_VALUE);
+ header.setPriority(DEFAULT_HIGH_PRIORITY_VALUE);
}
ByteBufList bufToSend = (ByteBufList) toSend;
- ByteString body = ByteStringUtil.byteBufListToByteString(bufToSend);
- bufToSend.retain();
- cleanupActionFailedBeforeWrite = bufToSend::release;
+ ByteBuf body = bufToSend.toByteBuf(PooledByteBufAllocator.DEFAULT);
+ cleanupActionFailedBeforeWrite = body::release;
cleanupActionAfterWrite = cleanupActionFailedBeforeWrite;
- AddRequest.Builder addBuilder = AddRequest.newBuilder()
+ AddRequest addRequestMsg = addEntryRequest.setAddRequest()
.setLedgerId(ledgerId)
.setEntryId(entryId)
- .setMasterKey(UnsafeByteOperations.unsafeWrap(masterKey))
+ .setMasterKey(masterKey)
.setBody(body);
if (((short) options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) {
- addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
+ addRequestMsg.setFlag(AddRequest.Flag.RECOVERY_ADD);
}
if (!writeFlags.isEmpty()) {
// add flags only if needed, in order to be able to talk with old bookies
- addBuilder.setWriteFlags(WriteFlag.getWriteFlagsValue(writeFlags));
+ addRequestMsg.setWriteFlags(WriteFlag.getWriteFlagsValue(writeFlags));
}
- request = withRequestContext(Request.newBuilder())
- .setHeader(headerBuilder)
- .setAddRequest(addBuilder)
- .build();
+ withRequestContext(addEntryRequest);
+ request = addEntryRequest;
}
putCompletionKeyValue(completionKey,
@@ -867,16 +837,14 @@ public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
completionKey = new TxnCompletionKey(txnId, OperationType.READ_LAC);
// Build the request and calculate the total size to be included in the packet.
- BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ Request readLacRequest = new Request();
+ readLacRequest.setHeader()
.setVersion(ProtocolVersion.VERSION_THREE)
.setOperation(OperationType.READ_LAC)
.setTxnId(txnId);
- ReadLacRequest.Builder readLacBuilder = ReadLacRequest.newBuilder()
- .setLedgerId(ledgerId);
- request = withRequestContext(Request.newBuilder())
- .setHeader(headerBuilder)
- .setReadLacRequest(readLacBuilder)
- .build();
+ readLacRequest.setReadLacRequest().setLedgerId(ledgerId);
+ withRequestContext(readLacRequest);
+ request = readLacRequest;
}
putCompletionKeyValue(completionKey,
new ReadLacCompletion(completionKey, cb,
@@ -891,14 +859,12 @@ public void getListOfEntriesOfLedger(final long ledgerId, GetListOfEntriesOfLedg
completionKey, cb, ledgerId, this));
// Build the request.
- BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder().setVersion(ProtocolVersion.VERSION_THREE)
- .setOperation(OperationType.GET_LIST_OF_ENTRIES_OF_LEDGER).setTxnId(txnId);
-
- GetListOfEntriesOfLedgerRequest.Builder getListOfEntriesOfLedgerRequestBuilder =
- GetListOfEntriesOfLedgerRequest.newBuilder().setLedgerId(ledgerId);
-
- final Request getListOfEntriesOfLedgerRequest = Request.newBuilder().setHeader(headerBuilder)
- .setGetListOfEntriesOfLedgerRequest(getListOfEntriesOfLedgerRequestBuilder).build();
+ Request getListOfEntriesOfLedgerRequest = new Request();
+ getListOfEntriesOfLedgerRequest.setHeader()
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(OperationType.GET_LIST_OF_ENTRIES_OF_LEDGER)
+ .setTxnId(txnId);
+ getListOfEntriesOfLedgerRequest.setGetListOfEntriesOfLedgerRequest().setLedgerId(ledgerId);
writeAndFlush(channel, completionKey, getListOfEntriesOfLedgerRequest);
}
@@ -952,20 +918,21 @@ private void readEntryInternal(final long ledgerId,
completionKey = new TxnCompletionKey(txnId, OperationType.READ_ENTRY);
// Build the request and calculate the total size to be included in the packet.
- BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ Request readEntryRequest = new Request();
+ BKPacketHeader header = readEntryRequest.setHeader()
.setVersion(ProtocolVersion.VERSION_THREE)
.setOperation(OperationType.READ_ENTRY)
.setTxnId(txnId);
if (((short) flags & BookieProtocol.FLAG_HIGH_PRIORITY) == BookieProtocol.FLAG_HIGH_PRIORITY) {
- headerBuilder.setPriority(DEFAULT_HIGH_PRIORITY_VALUE);
+ header.setPriority(DEFAULT_HIGH_PRIORITY_VALUE);
}
- ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
+ ReadRequest readRequestMsg = readEntryRequest.setReadRequest()
.setLedgerId(ledgerId)
.setEntryId(entryId);
if (null != previousLAC) {
- readBuilder = readBuilder.setPreviousLAC(previousLAC);
+ readRequestMsg.setPreviousLAC(previousLAC);
}
if (null != timeOutInMillis) {
@@ -975,7 +942,7 @@ private void readEntryInternal(final long ledgerId,
ledgerId, entryId, null, ctx);
return;
}
- readBuilder = readBuilder.setTimeOut(timeOutInMillis);
+ readRequestMsg.setTimeOut(timeOutInMillis);
}
if (piggyBackEntry) {
@@ -985,24 +952,22 @@ private void readEntryInternal(final long ledgerId,
ledgerId, entryId, null, ctx);
return;
}
- readBuilder = readBuilder.setFlag(ReadRequest.Flag.ENTRY_PIGGYBACK);
+ readRequestMsg.setFlag(ReadRequest.Flag.ENTRY_PIGGYBACK);
}
// Only one flag can be set on the read requests
if (((short) flags & BookieProtocol.FLAG_DO_FENCING) == BookieProtocol.FLAG_DO_FENCING) {
- readBuilder.setFlag(ReadRequest.Flag.FENCE_LEDGER);
+ readRequestMsg.setFlag(ReadRequest.Flag.FENCE_LEDGER);
if (masterKey == null) {
cb.readEntryComplete(BKException.Code.IncorrectParameterException,
ledgerId, entryId, null, ctx);
return;
}
- readBuilder.setMasterKey(ByteString.copyFrom(masterKey));
+ readRequestMsg.setMasterKey(masterKey);
}
- request = withRequestContext(Request.newBuilder())
- .setHeader(headerBuilder)
- .setReadRequest(readBuilder)
- .build();
+ withRequestContext(readEntryRequest);
+ request = readEntryRequest;
}
ReadCompletion readCompletion = new ReadCompletion(completionKey, cb, ctx, ledgerId, entryId, this);
@@ -1062,18 +1027,13 @@ public void getBookieInfo(final long requested, GetBookieInfoCallback cb, Object
completionKey, cb, ctx, this));
// Build the request and calculate the total size to be included in the packet.
- BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ Request getBookieInfoRequest = new Request();
+ getBookieInfoRequest.setHeader()
.setVersion(ProtocolVersion.VERSION_THREE)
.setOperation(OperationType.GET_BOOKIE_INFO)
.setTxnId(txnId);
-
- GetBookieInfoRequest.Builder getBookieInfoBuilder = GetBookieInfoRequest.newBuilder()
- .setRequested(requested);
-
- final Request getBookieInfoRequest = withRequestContext(Request.newBuilder())
- .setHeader(headerBuilder)
- .setGetBookieInfoRequest(getBookieInfoBuilder)
- .build();
+ getBookieInfoRequest.setGetBookieInfoRequest().setRequested(requested);
+ withRequestContext(getBookieInfoRequest);
writeAndFlush(channel, completionKey, getBookieInfoRequest);
}
@@ -1697,26 +1657,22 @@ private long getTxnId() {
return txnIdGenerator.incrementAndGet();
}
- Request.Builder withRequestContext(Request.Builder builder) {
+ Request withRequestContext(Request request) {
if (preserveMdcForTaskExecution) {
- return appendRequestContext(builder);
+ return appendRequestContext(request);
}
- return builder;
+ return request;
}
- static Request.Builder appendRequestContext(Request.Builder builder) {
+ static Request appendRequestContext(Request request) {
final Map mdcContextMap = MDC.getCopyOfContextMap();
if (mdcContextMap == null || mdcContextMap.isEmpty()) {
- return builder;
+ return request;
}
for (Map.Entry kv : mdcContextMap.entrySet()) {
- final BookkeeperProtocol.ContextPair context = BookkeeperProtocol.ContextPair.newBuilder()
- .setKey(kv.getKey())
- .setValue(kv.getValue())
- .build();
- builder.addRequestContext(context);
+ request.addRequestContext().setKey(kv.getKey()).setValue(kv.getValue());
}
- return builder;
+ return request;
}
ChannelFutureListener contextPreservingListener(ChannelFutureListener listener) {
@@ -1878,15 +1834,15 @@ private void initiateTLS() {
final CompletionKey completionKey = new TxnCompletionKey(txnId, OperationType.START_TLS);
completionObjects.put(completionKey,
new StartTLSCompletion(completionKey, this));
- BookkeeperProtocol.Request.Builder h = withRequestContext(BookkeeperProtocol.Request.newBuilder());
- BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ Request startTlsRequest = new Request();
+ startTlsRequest.setHeader()
.setVersion(ProtocolVersion.VERSION_THREE)
.setOperation(OperationType.START_TLS)
.setTxnId(txnId);
- h.setHeader(headerBuilder.build());
- h.setStartTLSRequest(BookkeeperProtocol.StartTLSRequest.newBuilder().build());
+ startTlsRequest.setStartTLSRequest();
+ withRequestContext(startTlsRequest);
state = ConnectionState.START_TLS;
- writeAndFlush(channel, completionKey, h.build());
+ writeAndFlush(channel, completionKey, startTlsRequest);
}
protected void failTLS(int rc) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadCompletion.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadCompletion.java
index b34d4e370a6..828bfccf577 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadCompletion.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadCompletion.java
@@ -67,7 +67,7 @@ public void setOutstanding() {
@Override
public void handleV2Response(long ledgerId, long entryId,
- BookkeeperProtocol.StatusCode status,
+ StatusCode status,
BookieProtocol.Response response) {
perChannelBookieClient.readEntryOutstanding.dec();
if (!(response instanceof BookieProtocol.ReadResponse)) {
@@ -79,33 +79,47 @@ public void handleV2Response(long ledgerId, long entryId,
}
@Override
- public void handleV3Response(BookkeeperProtocol.Response response) {
+ public void handleV3Response(Response response) {
perChannelBookieClient.readEntryOutstanding.dec();
- BookkeeperProtocol.ReadResponse readResponse = response.getReadResponse();
- BookkeeperProtocol.StatusCode status = response.getStatus() == BookkeeperProtocol.StatusCode.EOK
- ? readResponse.getStatus() : response.getStatus();
+ long respLedgerId = ledgerId;
+ long respEntryId = entryId;
+ StatusCode status;
ByteBuf buffer = Unpooled.EMPTY_BUFFER;
- if (readResponse.hasBody()) {
- buffer = Unpooled.wrappedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
- }
long maxLAC = INVALID_ENTRY_ID;
- if (readResponse.hasMaxLAC()) {
- maxLAC = readResponse.getMaxLAC();
- }
long lacUpdateTimestamp = -1L;
- if (readResponse.hasLacUpdateTimestamp()) {
- lacUpdateTimestamp = readResponse.getLacUpdateTimestamp();
+ if (response.getStatus() == StatusCode.EOK && response.hasReadResponse()) {
+ ReadResponse readResponse = response.getReadResponse();
+ status = readResponse.getStatus();
+ // For long-poll reads the request entryId is LAST_ADD_CONFIRMED
+ // and the server fills in the actual entry id alongside the body.
+ if (readResponse.hasLedgerId()) {
+ respLedgerId = readResponse.getLedgerId();
+ }
+ if (readResponse.hasEntryId()) {
+ respEntryId = readResponse.getEntryId();
+ }
+ if (readResponse.hasBody()) {
+ buffer = readResponse.getBodySlice();
+ }
+ if (readResponse.hasMaxLAC()) {
+ maxLAC = readResponse.getMaxLAC();
+ }
+ if (readResponse.hasLacUpdateTimestamp()) {
+ lacUpdateTimestamp = readResponse.getLacUpdateTimestamp();
+ }
+ } else {
+ // Error responses may not carry a populated ReadResponse;
+ // fall back to the request's recorded ledgerId/entryId.
+ status = response.getStatus();
}
- handleReadResponse(readResponse.getLedgerId(),
- readResponse.getEntryId(),
- status, buffer, maxLAC, lacUpdateTimestamp);
+ handleReadResponse(respLedgerId, respEntryId, status, buffer, maxLAC, lacUpdateTimestamp);
ReferenceCountUtil.release(
buffer); // meaningless using unpooled, but client may expect to hold the last reference
}
private void handleReadResponse(long ledgerId,
long entryId,
- BookkeeperProtocol.StatusCode status,
+ StatusCode status,
ByteBuf buffer,
long maxLAC, // max known lac piggy-back from bookies
long lacUpdateTimestamp) { // the timestamp when the lac is updated.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index 118d5a66da6..1bf99cbd0d6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -18,8 +18,8 @@
package org.apache.bookkeeper.proto;
import com.google.common.base.Stopwatch;
-import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
import io.netty.channel.Channel;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
@@ -31,11 +31,6 @@
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.common.util.MathUtils;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
import org.apache.bookkeeper.stats.OpStatsLogger;
@CustomLog
@@ -101,7 +96,7 @@ protected Long getPreviousLAC() {
*/
protected void handleReadResultForFenceRead(
final ByteBuf entryBody,
- final ReadResponse.Builder readResponseBuilder,
+ final ReadResponse readResponseBuilder,
final long entryId,
final Stopwatch startTimeSw) {
// reset last phase start time to measure fence result waiting time
@@ -150,7 +145,7 @@ public void onFailure(Throwable t) {
* @return read response or null if it is a fence read operation.
* @throws IOException
*/
- protected ReadResponse readEntry(ReadResponse.Builder readResponseBuilder,
+ protected ReadResponse readEntry(ReadResponse readResponseBuilder,
long entryId,
Stopwatch startTimeSw)
throws IOException, BookieException {
@@ -169,7 +164,7 @@ protected ReadResponse readEntry(ReadResponse.Builder readResponseBuilder,
* @return read response or null if it is a fence read operation.
* @throws IOException
*/
- protected ReadResponse readEntry(ReadResponse.Builder readResponseBuilder,
+ protected ReadResponse readEntry(ReadResponse readResponseBuilder,
long entryId,
boolean readLACPiggyBack,
Stopwatch startTimeSw)
@@ -180,7 +175,7 @@ protected ReadResponse readEntry(ReadResponse.Builder readResponseBuilder,
return null;
} else {
try {
- readResponseBuilder.setBody(ByteString.copyFrom(entryBody.nioBuffer()));
+ readResponseBuilder.setBody(ByteBufUtil.getBytes(entryBody));
if (readLACPiggyBack) {
readResponseBuilder.setEntryId(entryId);
} else {
@@ -189,7 +184,7 @@ protected ReadResponse readEntry(ReadResponse.Builder readResponseBuilder,
}
registerSuccessfulEvent(readStats, startTimeSw);
readResponseBuilder.setStatus(StatusCode.EOK);
- return readResponseBuilder.build();
+ return readResponseBuilder;
} finally {
ReferenceCountUtil.release(entryBody);
}
@@ -200,9 +195,9 @@ protected ReadResponse getReadResponse() {
final Stopwatch startTimeSw = Stopwatch.createStarted();
final Channel channel = requestHandler.ctx().channel();
- final ReadResponse.Builder readResponse = ReadResponse.newBuilder()
- .setLedgerId(ledgerId)
- .setEntryId(entryId);
+ final ReadResponse readResponse = new ReadResponse()
+ .setLedgerId(ledgerId)
+ .setEntryId(entryId);
try {
// handle fence request
if (RequestUtils.isFenceRequest(readRequest)) {
@@ -217,7 +212,7 @@ protected ReadResponse getReadResponse() {
.log("Fence ledger request received without master key");
throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
} else {
- byte[] masterKey = readRequest.getMasterKey().toByteArray();
+ byte[] masterKey = readRequest.getMasterKey();
fenceResult = requestProcessor.bookie.fenceLedger(ledgerId, masterKey);
}
}
@@ -282,11 +277,10 @@ public void run() {
}
if (!isVersionCompatible()) {
- ReadResponse readResponse = ReadResponse.newBuilder()
- .setLedgerId(ledgerId)
- .setEntryId(entryId)
- .setStatus(StatusCode.EBADVERSION)
- .build();
+ ReadResponse readResponse = new ReadResponse()
+ .setLedgerId(ledgerId)
+ .setEntryId(entryId)
+ .setStatus(StatusCode.EBADVERSION);
sendResponse(readResponse);
return;
}
@@ -301,7 +295,7 @@ protected void executeOp() {
}
}
- private void getFenceResponse(ReadResponse.Builder readResponse,
+ private void getFenceResponse(ReadResponse readResponse,
ByteBuf entryBody,
boolean fenceResult) {
StatusCode status;
@@ -310,7 +304,7 @@ private void getFenceResponse(ReadResponse.Builder readResponse,
registerFailedEvent(requestProcessor.getRequestStats().getFenceReadWaitStats(), lastPhaseStartTime);
} else {
status = StatusCode.EOK;
- readResponse.setBody(ByteString.copyFrom(entryBody.nioBuffer()));
+ readResponse.setBody(ByteBufUtil.getBytes(entryBody));
registerSuccessfulEvent(requestProcessor.getRequestStats().getFenceReadWaitStats(), lastPhaseStartTime);
}
@@ -321,7 +315,7 @@ private void getFenceResponse(ReadResponse.Builder readResponse,
readResponse.setStatus(status);
}
- private void sendFenceResponse(ReadResponse.Builder readResponse,
+ private void sendFenceResponse(ReadResponse readResponse,
ByteBuf entryBody,
boolean fenceResult,
Stopwatch startTimeSw) {
@@ -330,26 +324,24 @@ private void sendFenceResponse(ReadResponse.Builder readResponse,
// register fence read stat
registerEvent(!fenceResult, requestProcessor.getRequestStats().getFenceReadEntryStats(), startTimeSw);
// send the fence read response
- sendResponse(readResponse.build());
+ sendResponse(readResponse);
}
protected ReadResponse buildResponse(
- ReadResponse.Builder readResponseBuilder,
+ ReadResponse readResponseBuilder,
StatusCode statusCode,
Stopwatch startTimeSw) {
registerEvent(!statusCode.equals(StatusCode.EOK), readStats, startTimeSw);
readResponseBuilder.setStatus(statusCode);
- return readResponseBuilder.build();
+ return readResponseBuilder;
}
protected void sendResponse(ReadResponse readResponse) {
- Response.Builder response = Response.newBuilder()
- .setHeader(getHeader())
- .setStatus(readResponse.getStatus())
- .setReadResponse(readResponse);
- sendResponse(response.getStatus(),
- response.build(),
- reqStats);
+ Response response = new Response();
+ response.setHeader().copyFrom(getHeader());
+ response.setStatus(readResponse.getStatus());
+ response.setReadResponse().copyFrom(readResponse);
+ sendResponse(response.getStatus(), response, reqStats);
requestProcessor.onReadRequestFinish();
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacCompletion.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacCompletion.java
index f86afdd499f..73c734c6f24 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacCompletion.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacCompletion.java
@@ -61,19 +61,19 @@ public void errorOut(final int rc) {
}
@Override
- public void handleV3Response(BookkeeperProtocol.Response response) {
- BookkeeperProtocol.ReadLacResponse readLacResponse = response.getReadLacResponse();
+ public void handleV3Response(Response response) {
+ ReadLacResponse readLacResponse = response.getReadLacResponse();
ByteBuf lacBuffer = Unpooled.EMPTY_BUFFER;
ByteBuf lastEntryBuffer = Unpooled.EMPTY_BUFFER;
- BookkeeperProtocol.StatusCode status = response.getStatus() == BookkeeperProtocol.StatusCode.EOK
+ StatusCode status = response.getStatus() == StatusCode.EOK
? readLacResponse.getStatus() : response.getStatus();
if (readLacResponse.hasLacBody()) {
- lacBuffer = Unpooled.wrappedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
+ lacBuffer = readLacResponse.getLacBodySlice();
}
if (readLacResponse.hasLastEntryBody()) {
- lastEntryBuffer = Unpooled.wrappedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
+ lastEntryBuffer = readLacResponse.getLastEntryBodySlice();
}
logEvent(status).log("Got response from bookie");
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
index 0fb27e938e5..58e345a82e4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
@@ -20,8 +20,8 @@
*/
package org.apache.bookkeeper.proto;
-import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@@ -29,11 +29,6 @@
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.common.util.MathUtils;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacResponse;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
/**
* A read processor for v3 last add confirmed messages.
@@ -52,11 +47,11 @@ private ReadLacResponse getReadLacResponse() {
ReadLacRequest readLacRequest = request.getReadLacRequest();
long ledgerId = readLacRequest.getLedgerId();
- final ReadLacResponse.Builder readLacResponse = ReadLacResponse.newBuilder().setLedgerId(ledgerId);
+ final ReadLacResponse readLacResponse = new ReadLacResponse().setLedgerId(ledgerId);
if (!isVersionCompatible()) {
readLacResponse.setStatus(StatusCode.EBADVERSION);
- return readLacResponse.build();
+ return readLacResponse;
}
log.debug().attr("request", request).log("Received ReadLac request");
@@ -66,7 +61,7 @@ private ReadLacResponse getReadLacResponse() {
try {
lac = requestProcessor.bookie.getExplicitLac(ledgerId);
if (lac != null) {
- readLacResponse.setLacBody(ByteString.copyFrom(lac.nioBuffer()));
+ readLacResponse.setLacBody(ByteBufUtil.getBytes(lac));
}
} catch (Bookie.NoLedgerException e) {
status = StatusCode.ENOLEDGER;
@@ -93,7 +88,7 @@ private ReadLacResponse getReadLacResponse() {
try {
lastEntry = requestProcessor.bookie.readEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
if (lastEntry != null) {
- readLacResponse.setLastEntryBody(ByteString.copyFrom(lastEntry.nioBuffer()));
+ readLacResponse.setLastEntryBody(ByteBufUtil.getBytes(lastEntry));
}
} catch (Bookie.NoLedgerException e) {
status = StatusCode.ENOLEDGER;
@@ -130,7 +125,7 @@ private ReadLacResponse getReadLacResponse() {
}
// Finally set the status and return
readLacResponse.setStatus(status);
- return readLacResponse.build();
+ return readLacResponse;
}
@Override
@@ -140,12 +135,12 @@ public void run() {
}
private void sendResponse(ReadLacResponse readLacResponse) {
- Response.Builder response = Response.newBuilder()
- .setHeader(getHeader())
- .setStatus(readLacResponse.getStatus())
- .setReadLacResponse(readLacResponse);
+ Response response = new Response();
+ response.setHeader().copyFrom(getHeader());
+ response.setStatus(readLacResponse.getStatus());
+ response.setReadLacResponse().copyFrom(readLacResponse);
sendResponse(response.getStatus(),
- response.build(),
+ response,
requestProcessor.getRequestStats().getReadLacRequestStats());
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
index 9b8533da0c6..210ff41a71a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
@@ -24,27 +24,27 @@
*/
class RequestUtils {
- public static boolean isFenceRequest(BookkeeperProtocol.ReadRequest readRequest) {
- return hasFlag(readRequest, BookkeeperProtocol.ReadRequest.Flag.FENCE_LEDGER);
+ public static boolean isFenceRequest(ReadRequest readRequest) {
+ return hasFlag(readRequest, ReadRequest.Flag.FENCE_LEDGER);
}
- public static boolean isLongPollReadRequest(BookkeeperProtocol.ReadRequest readRequest) {
+ public static boolean isLongPollReadRequest(ReadRequest readRequest) {
return !isFenceRequest(readRequest) && readRequest.hasPreviousLAC();
}
- public static boolean isHighPriority(BookkeeperProtocol.Request request) {
+ public static boolean isHighPriority(Request request) {
return request.getHeader().getPriority() > 0;
}
- public static boolean shouldPiggybackEntry(BookkeeperProtocol.ReadRequest readRequest) {
- return hasFlag(readRequest, BookkeeperProtocol.ReadRequest.Flag.ENTRY_PIGGYBACK);
+ public static boolean shouldPiggybackEntry(ReadRequest readRequest) {
+ return hasFlag(readRequest, ReadRequest.Flag.ENTRY_PIGGYBACK);
}
- static boolean hasFlag(BookkeeperProtocol.ReadRequest request, BookkeeperProtocol.ReadRequest.Flag flag) {
+ static boolean hasFlag(ReadRequest request, ReadRequest.Flag flag) {
return request.hasFlag() && request.getFlag() == flag;
}
- static boolean hasFlag(BookkeeperProtocol.AddRequest request, BookkeeperProtocol.AddRequest.Flag flag) {
+ static boolean hasFlag(AddRequest request, AddRequest.Flag flag) {
return request.hasFlag() && request.getFlag() == flag;
}
@@ -53,11 +53,11 @@ static boolean hasFlag(BookkeeperProtocol.AddRequest request, BookkeeperProtocol
* masterKey contains the password of the ledger and body is customer data,
* so it is not appropriate to have these in logs or system output.
*/
- public static String toSafeString(BookkeeperProtocol.Request request) {
+ public static String toSafeString(Request request) {
MoreObjects.ToStringHelper stringHelper = MoreObjects.toStringHelper(request);
- BookkeeperProtocol.BKPacketHeader header = request.getHeader();
+ BKPacketHeader header = request.getHeader();
if (request.hasAddRequest()) {
- BookkeeperProtocol.AddRequest addRequest = request.getAddRequest();
+ AddRequest addRequest = request.getAddRequest();
includeHeaderFields(stringHelper, header);
stringHelper.add("ledgerId", addRequest.getLedgerId());
stringHelper.add("entryId", addRequest.getEntryId());
@@ -69,7 +69,7 @@ public static String toSafeString(BookkeeperProtocol.Request request) {
}
return stringHelper.toString();
} else if (request.hasReadRequest()) {
- BookkeeperProtocol.ReadRequest readRequest = request.getReadRequest();
+ ReadRequest readRequest = request.getReadRequest();
includeHeaderFields(stringHelper, header);
stringHelper.add("ledgerId", readRequest.getLedgerId());
stringHelper.add("entryId", readRequest.getEntryId());
@@ -84,13 +84,13 @@ public static String toSafeString(BookkeeperProtocol.Request request) {
}
return stringHelper.toString();
} else if (request.hasWriteLacRequest()) {
- BookkeeperProtocol.WriteLacRequest writeLacRequest = request.getWriteLacRequest();
+ WriteLacRequest writeLacRequest = request.getWriteLacRequest();
includeHeaderFields(stringHelper, header);
stringHelper.add("ledgerId", writeLacRequest.getLedgerId());
stringHelper.add("lac", writeLacRequest.getLac());
return stringHelper.toString();
} else if (request.hasForceLedgerRequest()) {
- BookkeeperProtocol.ForceLedgerRequest forceLedgerRequest = request.getForceLedgerRequest();
+ ForceLedgerRequest forceLedgerRequest = request.getForceLedgerRequest();
includeHeaderFields(stringHelper, header);
stringHelper.add("ledgerId", forceLedgerRequest.getLedgerId());
return stringHelper.toString();
@@ -100,7 +100,7 @@ public static String toSafeString(BookkeeperProtocol.Request request) {
}
private static void includeHeaderFields(MoreObjects.ToStringHelper stringHelper,
- BookkeeperProtocol.BKPacketHeader header) {
+ BKPacketHeader header) {
stringHelper.add("version", header.getVersion());
stringHelper.add("operation", header.getOperation());
stringHelper.add("txnId", header.getTxnId());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/StartTLSCompletion.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/StartTLSCompletion.java
index a90a1c90351..e537f2082e6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/StartTLSCompletion.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/StartTLSCompletion.java
@@ -52,8 +52,8 @@ public void errorOut(final int rc) {
}
@Override
- public void handleV3Response(BookkeeperProtocol.Response response) {
- BookkeeperProtocol.StatusCode status = response.getStatus();
+ public void handleV3Response(Response response) {
+ StatusCode status = response.getStatus();
logEvent(status).log("Got response from bookie");
@@ -65,7 +65,7 @@ public void handleV3Response(BookkeeperProtocol.Response response) {
if (perChannelBookieClient.state != PerChannelBookieClient.ConnectionState.START_TLS) {
log.error("Connection state changed before TLS response received");
perChannelBookieClient.failTLS(BKException.Code.BookieHandleNotAvailableException);
- } else if (status != BookkeeperProtocol.StatusCode.EOK) {
+ } else if (status != StatusCode.EOK) {
log.error().attr("status", status).log("Client received error during TLS negotiation");
perChannelBookieClient.failTLS(BKException.Code.SecurityException);
} else {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/TxnCompletionKey.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/TxnCompletionKey.java
index bbf64681e1d..b5c6cc6f23a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/TxnCompletionKey.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/TxnCompletionKey.java
@@ -24,7 +24,7 @@
class TxnCompletionKey extends CompletionKey {
final long txnId;
- public TxnCompletionKey(long txnId, BookkeeperProtocol.OperationType operationType) {
+ public TxnCompletionKey(long txnId, OperationType operationType) {
super(operationType);
this.txnId = txnId;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index 6bebfcc4b9e..7f3116c9d03 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -21,7 +21,6 @@
package org.apache.bookkeeper.proto;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.util.EnumSet;
import java.util.concurrent.TimeUnit;
@@ -31,11 +30,6 @@
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
import org.apache.bookkeeper.stats.OpStatsLogger;
@CustomLog
@@ -54,13 +48,13 @@ private AddResponse getAddResponse() {
long ledgerId = addRequest.getLedgerId();
long entryId = addRequest.getEntryId();
- final AddResponse.Builder addResponse = AddResponse.newBuilder()
+ final AddResponse addResponse = new AddResponse()
.setLedgerId(ledgerId)
.setEntryId(entryId);
if (!isVersionCompatible()) {
addResponse.setStatus(StatusCode.EBADVERSION);
- return addResponse.build();
+ return addResponse;
}
if (requestProcessor.getBookie().isReadOnly()
@@ -68,7 +62,7 @@ private AddResponse getAddResponse() {
&& requestProcessor.getBookie().isAvailableForHighPriorityWrites())) {
log.warn("BookieServer is running as readonly mode, so rejecting the request from the client!");
addResponse.setStatus(StatusCode.EREADONLY);
- return addResponse.build();
+ return addResponse;
}
BookkeeperInternalCallbacks.WriteCallback wcb = new BookkeeperInternalCallbacks.WriteCallback() {
@@ -96,11 +90,10 @@ public void writeComplete(int rc, long ledgerId, long entryId,
break;
}
addResponse.setStatus(status);
- Response.Builder response = Response.newBuilder()
- .setHeader(getHeader())
- .setStatus(addResponse.getStatus())
- .setAddResponse(addResponse);
- Response resp = response.build();
+ Response resp = new Response();
+ resp.setHeader().copyFrom(getHeader());
+ resp.setStatus(addResponse.getStatus());
+ resp.setAddResponse().copyFrom(addResponse);
sendResponse(status, resp, requestProcessor.getRequestStats().getAddRequestStats());
}
};
@@ -112,8 +105,8 @@ public void writeComplete(int rc, long ledgerId, long entryId,
}
final boolean ackBeforeSync = writeFlags.contains(WriteFlag.DEFERRED_SYNC);
StatusCode status = null;
- byte[] masterKey = addRequest.getMasterKey().toByteArray();
- ByteBuf entryToAdd = Unpooled.wrappedBuffer(addRequest.getBody().asReadOnlyByteBuffer());
+ byte[] masterKey = addRequest.getMasterKey();
+ ByteBuf entryToAdd = addRequest.getBodySlice();
try {
if (RequestUtils.hasFlag(addRequest, AddRequest.Flag.RECOVERY_ADD)) {
requestProcessor.getBookie().recoveryAddEntry(entryToAdd, wcb,
@@ -167,7 +160,7 @@ public void writeComplete(int rc, long ledgerId, long entryId,
// doesn't return a response back to the caller.
if (!status.equals(StatusCode.EOK)) {
addResponse.setStatus(status);
- return addResponse.build();
+ return addResponse;
}
return null;
}
@@ -179,11 +172,10 @@ public void run() {
AddResponse addResponse = getAddResponse();
if (null != addResponse) {
// This means there was an error and we should send this back.
- Response.Builder response = Response.newBuilder()
- .setHeader(getHeader())
- .setStatus(addResponse.getStatus())
- .setAddResponse(addResponse);
- Response resp = response.build();
+ Response resp = new Response();
+ resp.setHeader().copyFrom(getHeader());
+ resp.setStatus(addResponse.getStatus());
+ resp.setAddResponse().copyFrom(addResponse);
sendResponse(addResponse.getStatus(), resp,
requestProcessor.getRequestStats().getAddRequestStats());
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacCompletion.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacCompletion.java
index c5339dcd3e1..ffbefb0f9dd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacCompletion.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacCompletion.java
@@ -61,11 +61,15 @@ public void errorOut(final int rc) {
}
@Override
- public void handleV3Response(BookkeeperProtocol.Response response) {
- BookkeeperProtocol.WriteLacResponse writeLacResponse = response.getWriteLacResponse();
- BookkeeperProtocol.StatusCode status = response.getStatus() == BookkeeperProtocol.StatusCode.EOK
- ? writeLacResponse.getStatus() : response.getStatus();
- long ledgerId = writeLacResponse.getLedgerId();
+ public void handleV3Response(Response response) {
+ StatusCode status;
+ if (response.getStatus() == StatusCode.EOK && response.hasWriteLacResponse()) {
+ status = response.getWriteLacResponse().getStatus();
+ } else {
+ // Error responses may not carry a populated WriteLacResponse;
+ // fall back to the request's recorded ledgerId.
+ status = response.getStatus();
+ }
logEvent(status).log("Got response from bookie");
int rc = convertStatus(status, BKException.Code.WriteException);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
index b22fa5b8626..4da2129f874 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
@@ -20,19 +20,13 @@
*/
package org.apache.bookkeeper.proto;
-import io.netty.buffer.Unpooled;
+import io.netty.buffer.ByteBuf;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import lombok.CustomLog;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse;
@CustomLog
@@ -50,17 +44,17 @@ private WriteLacResponse getWriteLacResponse() {
long lac = writeLacRequest.getLac();
long ledgerId = writeLacRequest.getLedgerId();
- final WriteLacResponse.Builder writeLacResponse = WriteLacResponse.newBuilder().setLedgerId(ledgerId);
+ final WriteLacResponse writeLacResponse = new WriteLacResponse().setLedgerId(ledgerId);
if (!isVersionCompatible()) {
writeLacResponse.setStatus(StatusCode.EBADVERSION);
- return writeLacResponse.build();
+ return writeLacResponse;
}
if (requestProcessor.bookie.isReadOnly()) {
log.warn("BookieServer is running as readonly mode, so rejecting the request from the client!");
writeLacResponse.setStatus(StatusCode.EREADONLY);
- return writeLacResponse.build();
+ return writeLacResponse;
}
BookkeeperInternalCallbacks.WriteCallback writeCallback = new BookkeeperInternalCallbacks.WriteCallback() {
@@ -87,21 +81,20 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Ob
break;
}
writeLacResponse.setStatus(status);
- Response.Builder response = Response.newBuilder()
- .setHeader(getHeader())
- .setStatus(writeLacResponse.getStatus())
- .setWriteLacResponse(writeLacResponse);
- Response resp = response.build();
+ Response resp = new Response();
+ resp.setHeader().copyFrom(getHeader());
+ resp.setStatus(writeLacResponse.getStatus());
+ resp.setWriteLacResponse().copyFrom(writeLacResponse);
sendResponse(status, resp, requestProcessor.getRequestStats().getWriteLacRequestStats());
}
};
StatusCode status = null;
- ByteBuffer lacToAdd = writeLacRequest.getBody().asReadOnlyByteBuffer();
- byte[] masterKey = writeLacRequest.getMasterKey().toByteArray();
+ ByteBuf lacToAdd = writeLacRequest.getBodySlice();
+ byte[] masterKey = writeLacRequest.getMasterKey();
try {
- requestProcessor.bookie.setExplicitLac(Unpooled.wrappedBuffer(lacToAdd),
+ requestProcessor.bookie.setExplicitLac(lacToAdd,
writeCallback, requestHandler, masterKey);
status = StatusCode.EOK;
} catch (BookieException.LedgerFencedAndDeletedException e) {
@@ -149,7 +142,7 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Ob
requestProcessor.getRequestStats().getWriteLacStats()
.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
writeLacResponse.setStatus(status);
- return writeLacResponse.build();
+ return writeLacResponse;
}
return null;
}
@@ -158,11 +151,10 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Ob
public void run() {
WriteLacResponse writeLacResponse = getWriteLacResponse();
if (null != writeLacResponse) {
- Response.Builder response = Response.newBuilder()
- .setHeader(getHeader())
- .setStatus(writeLacResponse.getStatus())
- .setWriteLacResponse(writeLacResponse);
- Response resp = response.build();
+ Response resp = new Response();
+ resp.setHeader().copyFrom(getHeader());
+ resp.setStatus(writeLacResponse.getStatus());
+ resp.setWriteLacResponse().copyFrom(writeLacResponse);
sendResponse(
writeLacResponse.getStatus(),
resp,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
index 133a37f0379..3e7440b7974 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
@@ -22,6 +22,8 @@
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
@@ -246,6 +248,47 @@ public static ByteBuf coalesce(ByteBufList list) {
return res;
}
+ /**
+ * Returns a {@link ByteBuf} that views the contents of this {@code ByteBufList}
+ * without copying the bytes.
+ *
+ *
Ownership semantics: this {@code ByteBufList}'s reference count is
+ * not changed by this call — the returned {@link ByteBuf} holds
+ * its own retained references to the underlying buffers. The caller is
+ * responsible for releasing the returned {@link ByteBuf} when it is no longer
+ * needed; this {@code ByteBufList} continues to be owned by its caller and
+ * must be released independently.
+ *
+ *
If this list is empty, {@link Unpooled#EMPTY_BUFFER} is returned.
+ * If it contains exactly one buffer, a retained duplicate of that buffer is
+ * returned to avoid the overhead of wrapping it in a
+ * {@link CompositeByteBuf}.
+ *
+ * @param allocator the {@link ByteBufAllocator} used to allocate the
+ * {@link CompositeByteBuf} when more than one buffer is present
+ * @return a {@link ByteBuf} viewing the contents of this list
+ */
+ public ByteBuf toByteBuf(ByteBufAllocator allocator) {
+ final int size = buffers.size();
+ if (size == 0) {
+ return Unpooled.EMPTY_BUFFER;
+ }
+ if (size == 1) {
+ // Fast path: avoid wrapping a single buffer in a CompositeByteBuf.
+ // retainedDuplicate() gives the caller an independent ref count and
+ // reader/writer indexes.
+ return buffers.get(0).retainedDuplicate();
+ }
+
+ CompositeByteBuf composite = allocator.compositeBuffer(size);
+ for (int i = 0; i < size; i++) {
+ // Retain so that the composite owns its own reference to each buffer,
+ // independent of this ByteBufList's lifecycle.
+ composite.addComponent(true, buffers.get(i).retainedDuplicate());
+ }
+ return composite;
+ }
+
@Override
public ByteBufList retain() {
super.retain();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringEntryFormatter.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringEntryFormatter.java
index 14ef44b7812..a931cfed2b7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringEntryFormatter.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringEntryFormatter.java
@@ -21,8 +21,8 @@
package org.apache.bookkeeper.util;
-import com.google.protobuf.ByteString;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
/**
* A String-based entry formatter.
@@ -30,7 +30,7 @@
public class StringEntryFormatter extends EntryFormatter {
@Override
public void formatEntry(byte[] data) {
- System.out.println(ByteString.copyFrom(data).toStringUtf8());
+ System.out.println(new String(data, StandardCharsets.UTF_8));
}
@Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
index edbe49989d2..8922cd0d632 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
@@ -19,7 +19,8 @@
*/
import java.io.IOException;
-import org.apache.bookkeeper.proto.BookkeeperProtocol;
+import org.apache.bookkeeper.proto.BKPacketHeader;
+import org.apache.bookkeeper.proto.Request;
/**
* Provided utilities for parsing network addresses, ledger-id from node paths
@@ -169,8 +170,8 @@ public static long stringToHierarchicalLedgerId(String...levelNodes) throws IOEx
* @return string representation of request
*/
public static String requestToString(Object request) {
- if (request instanceof BookkeeperProtocol.Request) {
- BookkeeperProtocol.BKPacketHeader header = ((BookkeeperProtocol.Request) request).getHeader();
+ if (request instanceof Request) {
+ BKPacketHeader header = ((Request) request).getHeader();
return String.format("Req(txnId=%d,op=%s,version=%s)",
header.getTxnId(), header.getOperation(),
header.getVersion());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
index c402451588b..f9c7babc643 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
@@ -41,7 +41,7 @@
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieClientImpl;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
-import org.apache.bookkeeper.proto.BookkeeperProtocol;
+import org.apache.bookkeeper.proto.GetBookieInfoRequest;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.After;
@@ -110,8 +110,8 @@ public void testGetBookieInfoTimeout() throws Exception {
// try to get bookie info from the sleeping bookie. It should fail with timeout error
BookieClient bc = new BookieClientImpl(cConf, eventLoopGroup, UnpooledByteBufAllocator.DEFAULT, executor,
scheduler, NullStatsLogger.INSTANCE, bkc.getBookieAddressResolver());
- long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
- | BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
+ long flags = GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
+ | GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
class CallbackObj {
int rc;
@@ -133,10 +133,10 @@ public void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx) {
CallbackObj obj = (CallbackObj) ctx;
obj.rc = rc;
if (rc == Code.OK) {
- if ((obj.requested & BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE) != 0) {
+ if ((obj.requested & GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE) != 0) {
obj.freeDiskSpace = bInfo.getFreeDiskSpace();
}
- if ((obj.requested & BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE)
+ if ((obj.requested & GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE)
!= 0) {
obj.totalDiskCapacity = bInfo.getTotalDiskSpace();
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplicationWithMock.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplicationWithMock.java
index c618aa4a15b..9f1d7465e9d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplicationWithMock.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplicationWithMock.java
@@ -59,7 +59,7 @@ public void testRecoverLedgerFragmentEntrySendRightRequestWithFlag() throws Exce
doAnswer(invocationOnMock -> {
ByteBuf toSend = invocationOnMock.getArgument(4);
BookieProtoEncoding.RequestEnDeCoderPreV3 deCoderPreV3 =
- new BookieProtoEncoding.RequestEnDeCoderPreV3(null);
+ new BookieProtoEncoding.RequestEnDeCoderPreV3();
toSend.readerIndex(4);
BookieProtocol.ParsedAddRequest request = (BookieProtocol.ParsedAddRequest) deCoderPreV3.decode(toSend);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
index ba75e4b75a6..ad0376dca60 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
@@ -27,9 +27,6 @@
import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
@@ -40,13 +37,7 @@
import org.apache.bookkeeper.proto.BookieProtoEncoding.ResponseEnDeCoderPreV3;
import org.apache.bookkeeper.proto.BookieProtoEncoding.ResponseEnDecoderV3;
import org.apache.bookkeeper.proto.BookieProtocol.AddResponse;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest.Flag;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
import org.apache.bookkeeper.util.ByteBufList;
-import org.junit.Before;
import org.junit.Test;
/**
@@ -54,13 +45,6 @@
*/
public class BookieProtoEncodingTest {
- private ExtensionRegistry registry;
-
- @Before
- public void setup() {
- this.registry = ExtensionRegistry.newInstance();
- }
-
@Test
public void testV3ResponseDecoderNoFallback() throws Exception {
AddResponse v2Resp = AddResponse.create(
@@ -69,19 +53,16 @@ public void testV3ResponseDecoderNoFallback() throws Exception {
1L,
2L);
- BookkeeperProtocol.Response v3Resp = BookkeeperProtocol.Response.newBuilder()
- .setHeader(BKPacketHeader.newBuilder()
+ Response v3Resp = new Response();
+ v3Resp.setHeader()
.setVersion(ProtocolVersion.VERSION_THREE)
.setTxnId(1L)
- .setOperation(OperationType.ADD_ENTRY)
- .build())
- .setStatus(StatusCode.EOK)
- .setAddResponse(BookkeeperProtocol.AddResponse.newBuilder()
+ .setOperation(OperationType.ADD_ENTRY);
+ v3Resp.setStatus(StatusCode.EOK);
+ v3Resp.setAddResponse()
.setStatus(StatusCode.EOK)
.setLedgerId(1L)
- .setEntryId(2L)
- .build())
- .build();
+ .setEntryId(2L);
List