Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.bookkeeper.api.kv.PTable;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.grpc.RootRangeServiceFutureStub;
import org.apache.bookkeeper.clients.impl.kv.ByteBufTableImpl;
import org.apache.bookkeeper.clients.impl.kv.PByteBufSimpleTableImpl;
import org.apache.bookkeeper.clients.utils.GrpcUtils;
Expand All @@ -43,8 +44,6 @@
import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
import org.apache.bookkeeper.stream.proto.StorageType;
import org.apache.bookkeeper.stream.proto.StreamProperties;
import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceFutureStub;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;

/**
Expand All @@ -63,7 +62,7 @@ public SimpleStorageClientImpl(String namespaceName,
super(settings);
this.defaultNamespace = namespaceName;
this.rootRangeService = GrpcUtils.configureGrpcStub(
RootRangeServiceGrpc.newFutureStub(channel),
RootRangeServiceFutureStub.newFutureStub(channel),
Optional.empty());
}

Expand All @@ -74,7 +73,7 @@ public SimpleStorageClientImpl(String namespaceName,
super(settings, schedulerResource, channel, false);
this.defaultNamespace = namespaceName;
this.rootRangeService = GrpcUtils.configureGrpcStub(
RootRangeServiceGrpc.newFutureStub(channel),
RootRangeServiceFutureStub.newFutureStub(channel),
Optional.empty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.bookkeeper.clients.SimpleClientBase;
import org.apache.bookkeeper.clients.SimpleStorageClientImpl;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.grpc.RootRangeServiceFutureStub;
import org.apache.bookkeeper.clients.utils.ClientResources;
import org.apache.bookkeeper.clients.utils.GrpcUtils;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
Expand All @@ -43,8 +44,6 @@
import org.apache.bookkeeper.stream.proto.NamespaceProperties;
import org.apache.bookkeeper.stream.proto.StreamConfiguration;
import org.apache.bookkeeper.stream.proto.StreamProperties;
import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceFutureStub;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;

/**
Expand All @@ -66,7 +65,7 @@ public SimpleStorageAdminClientImpl(StorageClientSettings settings,
Resource<OrderedScheduler> schedulerResource) {
super(settings, schedulerResource);
this.rootRangeService = GrpcUtils.configureGrpcStub(
RootRangeServiceGrpc.newFutureStub(channel),
RootRangeServiceFutureStub.newFutureStub(channel),
Optional.empty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,31 @@ public class StorageClientImplTest extends GrpcClientTestBase {

private static final String NAMESPACE = "test-namespace";
private static final String STREAM_NAME = "test-stream-name";
private static final StreamProperties STREAM_PROPERTIES = StreamProperties.newBuilder()
.setStreamId(1234L)
.setStreamConf(DEFAULT_STREAM_CONF)
.setStreamName(STREAM_NAME)
.setStorageContainerId(16)
.build();
private static final StreamProperties STREAM_PROPERTIES = newStreamProperties();

private static StreamProperties newStreamProperties() {
StreamProperties props = new StreamProperties()
.setStreamId(1234L)
.setStreamName(STREAM_NAME)
.setStorageContainerId(16);
props.setStreamConf().copyFrom(DEFAULT_STREAM_CONF);
return props;
}

private static StreamProperties newStreamPropsForTable(String name) {
StreamProperties props = new StreamProperties().copyFrom(STREAM_PROPERTIES);
props.setStreamName(name);
StreamConfiguration conf = props.setStreamConf();
conf.copyFrom(DEFAULT_STREAM_CONF).setStorageType(StorageType.TABLE);
return props;
}

private static StreamProperties newStreamPropsAsTable() {
StreamProperties props = new StreamProperties().copyFrom(STREAM_PROPERTIES);
StreamConfiguration conf = props.setStreamConf();
conf.copyFrom(DEFAULT_STREAM_CONF).setStorageType(StorageType.TABLE);
return props;
}

private StorageClientImpl client;

Expand All @@ -82,11 +101,7 @@ protected void doTeardown() {
@SuppressWarnings("unchecked")
@Test
public void testOpenPTable() throws Exception {
StreamProperties streamProps = StreamProperties.newBuilder(STREAM_PROPERTIES)
.setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
.setStorageType(StorageType.TABLE)
.build())
.build();
StreamProperties streamProps = newStreamPropsAsTable();
when(client.getStreamProperties(anyString(), anyString()))
.thenReturn(FutureUtils.value(streamProps));

Expand All @@ -105,21 +120,11 @@ public void testOpenPTable() throws Exception {
@SuppressWarnings("unchecked")
@Test
public void testOpenPTableDifferentNamespace() throws Exception {
StreamProperties tableProps1 = StreamProperties.newBuilder(STREAM_PROPERTIES)
.setStreamName("table1")
.setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
.setStorageType(StorageType.TABLE)
.build())
.build();
StreamProperties tableProps1 = newStreamPropsForTable("table1");
when(client.getStreamProperties(eq(NAMESPACE), eq("table1")))
.thenReturn(FutureUtils.value(tableProps1));

StreamProperties tableProps2 = StreamProperties.newBuilder(STREAM_PROPERTIES)
.setStreamName("table2")
.setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
.setStorageType(StorageType.TABLE)
.build())
.build();
StreamProperties tableProps2 = newStreamPropsForTable("table2");
when(client.getStreamProperties(eq(NAMESPACE), eq("table2")))
.thenReturn(FutureUtils.value(tableProps2));

Expand All @@ -146,11 +151,7 @@ public void testOpenPTableDifferentNamespace() throws Exception {
@SuppressWarnings("unchecked")
@Test
public void testOpenTable() throws Exception {
StreamProperties streamProps = StreamProperties.newBuilder(STREAM_PROPERTIES)
.setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
.setStorageType(StorageType.TABLE)
.build())
.build();
StreamProperties streamProps = newStreamPropsAsTable();
when(client.getStreamProperties(anyString(), anyString()))
.thenReturn(FutureUtils.value(streamProps));

Expand All @@ -171,21 +172,11 @@ public void testOpenTable() throws Exception {
@SuppressWarnings("unchecked")
@Test
public void testOpenTableWithDifferentNamespace() throws Exception {
StreamProperties tableProps1 = StreamProperties.newBuilder(STREAM_PROPERTIES)
.setStreamName("table1")
.setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
.setStorageType(StorageType.TABLE)
.build())
.build();
StreamProperties tableProps1 = newStreamPropsForTable("table1");
when(client.getStreamProperties(eq(NAMESPACE), eq("table1")))
.thenReturn(FutureUtils.value(tableProps1));

StreamProperties tableProps2 = StreamProperties.newBuilder(STREAM_PROPERTIES)
.setStreamName("table2")
.setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
.setStorageType(StorageType.TABLE)
.build())
.build();
StreamProperties tableProps2 = newStreamPropsForTable("table2");
when(client.getStreamProperties(eq(NAMESPACE), eq("table2")))
.thenReturn(FutureUtils.value(tableProps2));

Expand Down Expand Up @@ -216,11 +207,8 @@ public void testOpenTableWithDifferentNamespace() throws Exception {
@SuppressWarnings("unchecked")
@Test
public void testOpenPTableIllegalOp() throws Exception {
StreamProperties streamProps = StreamProperties.newBuilder(STREAM_PROPERTIES)
.setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
.setStorageType(StorageType.STREAM)
.build())
.build();
StreamProperties streamProps = new StreamProperties().copyFrom(STREAM_PROPERTIES);
streamProps.setStreamConf().copyFrom(DEFAULT_STREAM_CONF).setStorageType(StorageType.STREAM);
when(client.getStreamProperties(anyString(), anyString()))
.thenReturn(FutureUtils.value(streamProps));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,32 @@
*/
public class TestStorageAdminClientImpl {

private static final NamespaceConfiguration colConf = NamespaceConfiguration.newBuilder()
.setDefaultStreamConf(DEFAULT_STREAM_CONF)
.build();
private static final NamespaceProperties colProps = NamespaceProperties.newBuilder()
.setNamespaceId(System.currentTimeMillis())
.setNamespaceName("namespace")
.setDefaultStreamConf(DEFAULT_STREAM_CONF)
.build();
private static final StreamProperties streamProps = StreamProperties.newBuilder()
.setStreamId(System.currentTimeMillis())
.setStorageContainerId(System.currentTimeMillis())
.setStreamName("stream_" + System.currentTimeMillis())
.setStreamConf(DEFAULT_STREAM_CONF)
.build();
private static final NamespaceConfiguration colConf = newColConf();
private static final NamespaceProperties colProps = newColProps();
private static final StreamProperties streamProps = newStreamProps();

private static NamespaceConfiguration newColConf() {
NamespaceConfiguration c = new NamespaceConfiguration();
c.setDefaultStreamConf().copyFrom(DEFAULT_STREAM_CONF);
return c;
}

private static NamespaceProperties newColProps() {
NamespaceProperties p = new NamespaceProperties()
.setNamespaceId(System.currentTimeMillis())
.setNamespaceName("namespace");
p.setDefaultStreamConf().copyFrom(DEFAULT_STREAM_CONF);
return p;
}

private static StreamProperties newStreamProps() {
StreamProperties p = new StreamProperties()
.setStreamId(System.currentTimeMillis())
.setStorageContainerId(System.currentTimeMillis())
.setStreamName("stream_" + System.currentTimeMillis());
p.setStreamConf().copyFrom(DEFAULT_STREAM_CONF);
return p;
}

@Rule
public TestName testName = new TestName();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.bookkeeper.clients.grpc;

import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.ClientCalls;
import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc;

/**
* ListenableFuture-returning stub for MetaRangeService.
*
* <p>The lightproto gRPC plugin does not generate FutureStub classes, so this adapter
* provides a drop-in replacement that uses {@link ClientCalls#futureUnaryCall} directly.
*/
public final class MetaRangeServiceFutureStub extends AbstractStub<MetaRangeServiceFutureStub> {

public static MetaRangeServiceFutureStub newFutureStub(Channel channel) {
return new MetaRangeServiceFutureStub(channel, CallOptions.DEFAULT);
}

private MetaRangeServiceFutureStub(Channel channel, CallOptions callOptions) {
super(channel, callOptions);
}

@Override
protected MetaRangeServiceFutureStub build(Channel channel, CallOptions callOptions) {
return new MetaRangeServiceFutureStub(channel, callOptions);
}

public ListenableFuture<GetActiveRangesResponse> getActiveRanges(GetActiveRangesRequest request) {
return ClientCalls.futureUnaryCall(
getChannel().newCall(MetaRangeServiceGrpc.getGetActiveRangesMethod(), getCallOptions()),
request);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.bookkeeper.clients.grpc;

import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.ClientCalls;
import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.CreateStreamResponse;
import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;

/**
* ListenableFuture-returning stub for RootRangeService.
*
* <p>The lightproto gRPC plugin does not generate FutureStub classes, so this adapter
* provides a drop-in replacement that uses {@link ClientCalls#futureUnaryCall} directly.
*/
public final class RootRangeServiceFutureStub extends AbstractStub<RootRangeServiceFutureStub> {

public static RootRangeServiceFutureStub newFutureStub(Channel channel) {
return new RootRangeServiceFutureStub(channel, CallOptions.DEFAULT);
}

private RootRangeServiceFutureStub(Channel channel, CallOptions callOptions) {
super(channel, callOptions);
}

@Override
protected RootRangeServiceFutureStub build(Channel channel, CallOptions callOptions) {
return new RootRangeServiceFutureStub(channel, callOptions);
}

public ListenableFuture<CreateNamespaceResponse> createNamespace(CreateNamespaceRequest request) {
return ClientCalls.futureUnaryCall(
getChannel().newCall(RootRangeServiceGrpc.getCreateNamespaceMethod(), getCallOptions()),
request);
}

public ListenableFuture<DeleteNamespaceResponse> deleteNamespace(DeleteNamespaceRequest request) {
return ClientCalls.futureUnaryCall(
getChannel().newCall(RootRangeServiceGrpc.getDeleteNamespaceMethod(), getCallOptions()),
request);
}

public ListenableFuture<GetNamespaceResponse> getNamespace(GetNamespaceRequest request) {
return ClientCalls.futureUnaryCall(
getChannel().newCall(RootRangeServiceGrpc.getGetNamespaceMethod(), getCallOptions()),
request);
}

public ListenableFuture<CreateStreamResponse> createStream(CreateStreamRequest request) {
return ClientCalls.futureUnaryCall(
getChannel().newCall(RootRangeServiceGrpc.getCreateStreamMethod(), getCallOptions()),
request);
}

public ListenableFuture<DeleteStreamResponse> deleteStream(DeleteStreamRequest request) {
return ClientCalls.futureUnaryCall(
getChannel().newCall(RootRangeServiceGrpc.getDeleteStreamMethod(), getCallOptions()),
request);
}

public ListenableFuture<GetStreamResponse> getStream(GetStreamRequest request) {
return ClientCalls.futureUnaryCall(
getChannel().newCall(RootRangeServiceGrpc.getGetStreamMethod(), getCallOptions()),
request);
}
}
Loading
Loading