Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 0 additions & 28 deletions bookkeeper-proto/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@
<artifactId>bookkeeper-proto</artifactId>
<name>Apache BookKeeper :: Protocols</name>
<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
Expand All @@ -42,40 +38,16 @@
<configuration>
<excludes>
<!-- exclude generated files //-->
<exclude>**/BookkeeperProtocol.java</exclude>
<exclude>target/generated-sources/lightproto/**</exclude>
<exclude>**/.checkstyle</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf-maven-plugin.version}</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
<checkStaleness>true</checkStaleness>
<includes>
<include>BookkeeperProtocol.proto</include>
</includes>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>io.streamnative.lightproto</groupId>
<artifactId>lightproto-maven-plugin</artifactId>
<version>${lightproto-maven-plugin.version}</version>
<configuration>
<sources>
<source>${project.basedir}/src/main/proto/DataFormats.proto</source>
<source>${project.basedir}/src/main/proto/DbLedgerStorageDataFormats.proto</source>
</sources>
<targetSourcesSubDir>generated-sources/lightproto/java</targetSourcesSubDir>
<generateTextFormat>true</generateTextFormat>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol;
import org.apache.bookkeeper.proto.GetBookieInfoRequest;
import org.apache.commons.collections4.CollectionUtils;

/**
Expand All @@ -46,8 +46,8 @@
@CustomLog
public class BookieInfoReader {
private static final long GET_BOOKIE_INFO_REQUEST_FLAGS =
BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE
| BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE;
GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE
| GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE;

private final ScheduledExecutorService scheduler;
private final BookKeeper bk;
Expand Down Expand Up @@ -329,8 +329,8 @@ synchronized void getReadWriteBookieInfo() {
}

BookieClient bkc = bk.getBookieClient();
final long requested = BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE
| BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE;
final long requested = GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE
| GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE;
totalSent = 0;
completedCnt = 0;
errorCnt = 0;
Expand Down Expand Up @@ -413,8 +413,8 @@ Map<BookieId, BookieInfo> getBookieInfo() throws BKException, InterruptedExcepti
final ConcurrentMap<BookieId, BookieInfo> map =
new ConcurrentHashMap<BookieId, BookieInfo>();
final CountDownLatch latch = new CountDownLatch(1);
long requested = BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE
| BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE;
long requested = GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE
| GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE;

Collection<BookieId> bookies;
bookies = bk.bookieWatcher.getBookies();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,25 +117,30 @@ public void setOutstanding() {

@Override
public void handleV2Response(
long ledgerId, long entryId, BookkeeperProtocol.StatusCode status,
long ledgerId, long entryId, StatusCode status,
BookieProtocol.Response response) {
perChannelBookieClient.addEntryOutstanding.dec();
handleResponse(ledgerId, entryId, status);
}

@Override
public void handleV3Response(
BookkeeperProtocol.Response response) {
Response response) {
perChannelBookieClient.addEntryOutstanding.dec();
BookkeeperProtocol.AddResponse addResponse = response.getAddResponse();
BookkeeperProtocol.StatusCode status = response.getStatus() == BookkeeperProtocol.StatusCode.EOK
? addResponse.getStatus() : response.getStatus();
handleResponse(addResponse.getLedgerId(), addResponse.getEntryId(),
status);
StatusCode status;
if (response.getStatus() == StatusCode.EOK && response.hasAddResponse()) {
status = response.getAddResponse().getStatus();
} else {
// Error responses (e.g. EUA from a rejected auth handshake) may not
// carry an AddResponse with ledgerId/entryId populated. Fall back to
// the values we recorded from the outgoing request.
status = response.getStatus();
}
handleResponse(ledgerId, entryId, status);
}

private void handleResponse(long ledgerId, long entryId,
BookkeeperProtocol.StatusCode status) {
StatusCode status) {
logEvent(status).log("Got response from bookie");

int rc = convertStatus(status, BKException.Code.WriteException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import static org.apache.bookkeeper.auth.AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME;

import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
Expand All @@ -40,7 +39,6 @@
import org.apache.bookkeeper.auth.BookieAuthProvider;
import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.NettyChannelUtil;

Expand Down Expand Up @@ -90,10 +88,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
BookieProtocol.AuthRequest req = (BookieProtocol.AuthRequest) msg;
assert (req.getOpCode() == BookieProtocol.AUTH);
if (checkAuthPlugin(req.getAuthMessage(), ctx.channel())) {
byte[] payload = req
.getAuthMessage()
.getPayload()
.toByteArray();
byte[] payload = req.getAuthMessage().getPayload();
authProvider.process(AuthToken.wrap(payload),
new AuthResponseCallbackLegacy(req, ctx.channel()));
} else {
Expand All @@ -114,26 +109,23 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
} else {
ctx.channel().close();
}
} else if (msg instanceof BookkeeperProtocol.Request) { // post-PB-client
BookkeeperProtocol.Request req = (BookkeeperProtocol.Request) msg;
if (req.getHeader().getOperation() == BookkeeperProtocol.OperationType.AUTH
} else if (msg instanceof Request) { // post-PB-client
Request req = (Request) msg;
if (req.getHeader().getOperation() == OperationType.AUTH
&& req.hasAuthRequest()
&& checkAuthPlugin(req.getAuthRequest(), ctx.channel())) {
byte[] payload = req
.getAuthRequest()
.getPayload()
.toByteArray();
byte[] payload = req.getAuthRequest().getPayload();
authProvider.process(AuthToken.wrap(payload),
new AuthResponseCallback(req, ctx.channel(), authProviderFactory.getPluginName()));
} else if (req.getHeader().getOperation() == BookkeeperProtocol.OperationType.START_TLS
} else if (req.getHeader().getOperation() == OperationType.START_TLS
&& req.hasStartTLSRequest()) {
super.channelRead(ctx, msg);
} else {
BookkeeperProtocol.Response.Builder builder = BookkeeperProtocol.Response.newBuilder()
.setHeader(req.getHeader())
.setStatus(BookkeeperProtocol.StatusCode.EUA);
Response response = new Response();
response.setHeader().copyFrom(req.getHeader());
response.setStatus(StatusCode.EUA);

NettyChannelUtil.writeAndFlushWithVoidPromise(ctx.channel(), builder.build());
NettyChannelUtil.writeAndFlushWithVoidPromise(ctx.channel(), response);
}
} else {
// close the channel, junk coming over it
Expand Down Expand Up @@ -171,45 +163,43 @@ public void operationComplete(int rc, AuthToken newam) {
channel.close();
return;
}
AuthMessage message = AuthMessage.newBuilder().setAuthPluginName(req.authMessage.getAuthPluginName())
.setPayload(ByteString.copyFrom(newam.getData())).build();
AuthMessage message = new AuthMessage()
.setAuthPluginName(req.authMessage.getAuthPluginName())
.setPayload(newam.getData());
final BookieProtocol.AuthResponse response =
new BookieProtocol.AuthResponse(req.getProtocolVersion(), message);
NettyChannelUtil.writeAndFlushWithVoidPromise(channel, response);
}
}

static class AuthResponseCallback implements AuthCallbacks.GenericCallback<AuthToken> {
final BookkeeperProtocol.Request req;
final Request req;
final Channel channel;
final String pluginName;

AuthResponseCallback(BookkeeperProtocol.Request req, Channel channel, String pluginName) {
AuthResponseCallback(Request req, Channel channel, String pluginName) {
this.req = req;
this.channel = channel;
this.pluginName = pluginName;
}

@Override
public void operationComplete(int rc, AuthToken newam) {
BookkeeperProtocol.Response.Builder builder = BookkeeperProtocol.Response.newBuilder()
.setHeader(req.getHeader());
Response response = new Response();
response.setHeader().copyFrom(req.getHeader());

if (rc != BKException.Code.OK) {
log.error("Error processing auth message, closing connection");

builder.setStatus(BookkeeperProtocol.StatusCode.EUA);
NettyChannelUtil.writeAndFlushWithClosePromise(
channel, builder.build()
);
response.setStatus(StatusCode.EUA);
NettyChannelUtil.writeAndFlushWithClosePromise(channel, response);
return;
} else {
AuthMessage message = AuthMessage.newBuilder().setAuthPluginName(pluginName)
.setPayload(ByteString.copyFrom(newam.getData())).build();
builder.setStatus(BookkeeperProtocol.StatusCode.EOK).setAuthResponse(message);
NettyChannelUtil.writeAndFlushWithVoidPromise(
channel, builder.build()
);
response.setStatus(StatusCode.EOK)
.setAuthResponse()
.setAuthPluginName(pluginName)
.setPayload(newam.getData());
NettyChannelUtil.writeAndFlushWithVoidPromise(channel, response);
}
}
}
Expand Down Expand Up @@ -272,8 +262,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

if (authenticated) {
super.channelRead(ctx, msg);
} else if (msg instanceof BookkeeperProtocol.Response) {
BookkeeperProtocol.Response resp = (BookkeeperProtocol.Response) msg;
} else if (msg instanceof Response) {
Response resp = (Response) msg;
if (null == resp.getHeader().getOperation()) {
log.info()
.attr("message", msg)
Expand All @@ -286,11 +276,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
super.channelRead(ctx, msg);
break;
case AUTH:
if (resp.getStatus() != BookkeeperProtocol.StatusCode.EOK) {
authenticationError(ctx, resp.getStatus().getNumber());
if (resp.getStatus() != StatusCode.EOK) {
authenticationError(ctx, resp.getStatus().getValue());
} else {
assert (resp.hasAuthResponse());
BookkeeperProtocol.AuthMessage am = resp.getAuthResponse();
AuthMessage am = resp.getAuthResponse();
if (AUTHENTICATION_DISABLED_PLUGIN_NAME.equals(am.getAuthPluginName())){
SocketAddress remote = ctx.channel().remoteAddress();
log.info().attr("client", remote)
Expand All @@ -299,7 +289,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
cb.operationComplete(BKException.Code.OK, null);
return;
}
byte[] payload = am.getPayload().toByteArray();
byte[] payload = am.getPayload();
authProvider.process(AuthToken.wrap(payload), new AuthRequestCallback(ctx,
authProviderFactory.getPluginName()));
}
Expand All @@ -321,7 +311,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (resp.errorCode != BookieProtocol.EOK) {
authenticationError(ctx, resp.errorCode);
} else {
BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse) resp).authMessage;
AuthMessage am = ((BookieProtocol.AuthResponse) resp).authMessage;
if (AUTHENTICATION_DISABLED_PLUGIN_NAME.equals(am.getAuthPluginName())) {
SocketAddress remote = ctx.channel().remoteAddress();
log.info().attr("client", remote)
Expand All @@ -330,7 +320,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
cb.operationComplete(BKException.Code.OK, null);
return;
}
byte[] payload = am.getPayload().toByteArray();
byte[] payload = am.getPayload();
authProvider.process(AuthToken.wrap(payload), new AuthRequestCallback(ctx,
authProviderFactory.getPluginName()));
}
Expand All @@ -353,12 +343,12 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
if (authenticated) {
super.write(ctx, msg, promise);
super.flush(ctx);
} else if (msg instanceof BookkeeperProtocol.Request) {
} else if (msg instanceof Request) {
// let auth messages through, queue the rest
BookkeeperProtocol.Request req = (BookkeeperProtocol.Request) msg;
Request req = (Request) msg;
if (req.getHeader().getOperation()
== BookkeeperProtocol.OperationType.AUTH
|| req.getHeader().getOperation() == BookkeeperProtocol.OperationType.START_TLS) {
== OperationType.AUTH
|| req.getHeader().getOperation() == OperationType.START_TLS) {
super.write(ctx, msg, promise);
super.flush(ctx);
} else {
Expand Down Expand Up @@ -421,22 +411,23 @@ public void operationComplete(int rc, AuthToken newam) {
return;
}

AuthMessage message = AuthMessage.newBuilder().setAuthPluginName(pluginName)
.setPayload(ByteString.copyFrom(newam.getData())).build();
AuthMessage message = new AuthMessage()
.setAuthPluginName(pluginName)
.setPayload(newam.getData());

if (isUsingV2Protocol) {
final BookieProtocol.AuthRequest msg =
new BookieProtocol.AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, message);
NettyChannelUtil.writeAndFlushWithVoidPromise(channel, msg);
} else {
// V3 protocol
BookkeeperProtocol.BKPacketHeader header = BookkeeperProtocol.BKPacketHeader.newBuilder()
.setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE)
.setOperation(BookkeeperProtocol.OperationType.AUTH).setTxnId(newTxnId()).build();
BookkeeperProtocol.Request.Builder builder = BookkeeperProtocol.Request.newBuilder()
.setHeader(header)
.setAuthRequest(message);
NettyChannelUtil.writeAndFlushWithVoidPromise(channel, builder.build());
Request request = new Request();
request.setHeader()
.setVersion(ProtocolVersion.VERSION_THREE)
.setOperation(OperationType.AUTH)
.setTxnId(newTxnId());
request.setAuthRequest().copyFrom(message);
NettyChannelUtil.writeAndFlushWithVoidPromise(channel, request);
}
}
}
Expand Down
Loading
Loading