Skip to content

Commit dc1216e

Browse files
authored
feat(spanner): add connection properties for min/max RPCs for DCP (#12951)
Adds connection properties for min/max RPCs per channel for the dynamic channel pool. Also adds a connection property for concurrentStreamsLowWaterMark.
1 parent d3b76d9 commit dc1216e

5 files changed

Lines changed: 162 additions & 11 deletions

File tree

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@
2626
import static com.google.cloud.spanner.connection.ConnectionProperties.CREDENTIALS_URL;
2727
import static com.google.cloud.spanner.connection.ConnectionProperties.DATABASE_ROLE;
2828
import static com.google.cloud.spanner.connection.ConnectionProperties.DATA_BOOST_ENABLED;
29+
import static com.google.cloud.spanner.connection.ConnectionProperties.DCP_CONCURRENT_STREAMS_LOW_WATERMARK;
2930
import static com.google.cloud.spanner.connection.ConnectionProperties.DCP_INITIAL_CHANNELS;
3031
import static com.google.cloud.spanner.connection.ConnectionProperties.DCP_MAX_CHANNELS;
32+
import static com.google.cloud.spanner.connection.ConnectionProperties.DCP_MAX_RPC_PER_CHANNEL;
3133
import static com.google.cloud.spanner.connection.ConnectionProperties.DCP_MIN_CHANNELS;
34+
import static com.google.cloud.spanner.connection.ConnectionProperties.DCP_MIN_RPC_PER_CHANNEL;
3235
import static com.google.cloud.spanner.connection.ConnectionProperties.DIALECT;
3336
import static com.google.cloud.spanner.connection.ConnectionProperties.ENABLE_API_TRACING;
3437
import static com.google.cloud.spanner.connection.ConnectionProperties.ENABLE_DIRECT_ACCESS;
@@ -165,6 +168,9 @@ public class ConnectionOptions {
165168
static final Integer DEFAULT_DCP_MIN_CHANNELS = null;
166169
static final Integer DEFAULT_DCP_MAX_CHANNELS = null;
167170
static final Integer DEFAULT_DCP_INITIAL_CHANNELS = null;
171+
static final Integer DEFAULT_DCP_MIN_RPC_PER_CHANNEL = null;
172+
static final Integer DEFAULT_DCP_MAX_RPC_PER_CHANNEL = null;
173+
static final Integer DEFAULT_DCP_CONCURRENT_STREAMS_LOW_WATERMARK = null;
168174
static final String DEFAULT_ENDPOINT = null;
169175
static final String DEFAULT_CHANNEL_PROVIDER = null;
170176
static final String DEFAULT_DATABASE_ROLE = null;
@@ -277,6 +283,16 @@ public class ConnectionOptions {
277283
/** Name of the 'dcpInitialChannels' connection property. */
278284
public static final String DCP_INITIAL_CHANNELS_PROPERTY_NAME = "dcpInitialChannels";
279285

286+
/** Name of the 'dcpMinRpcPerChannel' connection property. */
287+
public static final String DCP_MIN_RPC_PER_CHANNEL_PROPERTY_NAME = "dcpMinRpcPerChannel";
288+
289+
/** Name of the 'dcpMaxRpcPerChannel' connection property. */
290+
public static final String DCP_MAX_RPC_PER_CHANNEL_PROPERTY_NAME = "dcpMaxRpcPerChannel";
291+
292+
/** Name of the 'dcpConcurrentStreamsLowWatermark' connection property. */
293+
public static final String DCP_CONCURRENT_STREAMS_LOW_WATERMARK_PROPERTY_NAME =
294+
"dcpConcurrentStreamsLowWatermark";
295+
280296
/** Name of the 'endpoint' connection property. */
281297
public static final String ENDPOINT_PROPERTY_NAME = "endpoint";
282298

@@ -1041,6 +1057,21 @@ public Integer getDcpInitialChannels() {
10411057
return getInitialConnectionPropertyValue(DCP_INITIAL_CHANNELS);
10421058
}
10431059

1060+
/** The minimum number of RPCs per channel in the dynamic channel pool. */
1061+
public Integer getDcpMinRpcPerChannel() {
1062+
return getInitialConnectionPropertyValue(DCP_MIN_RPC_PER_CHANNEL);
1063+
}
1064+
1065+
/** The maximum number of RPCs per channel in the dynamic channel pool. */
1066+
public Integer getDcpMaxRpcPerChannel() {
1067+
return getInitialConnectionPropertyValue(DCP_MAX_RPC_PER_CHANNEL);
1068+
}
1069+
1070+
/** The concurrent streams low watermark in the dynamic channel pool. */
1071+
public Integer getDcpConcurrentStreamsLowWatermark() {
1072+
return getInitialConnectionPropertyValue(DCP_CONCURRENT_STREAMS_LOW_WATERMARK);
1073+
}
1074+
10441075
/** Calls the getChannelProvider() method from the supplied class. */
10451076
public TransportChannelProvider getChannelProvider() {
10461077
String channelProvider = getInitialConnectionPropertyValue(CHANNEL_PROVIDER);

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionProperties.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,12 @@
2929
import static com.google.cloud.spanner.connection.ConnectionOptions.CREDENTIALS_PROVIDER_PROPERTY_NAME;
3030
import static com.google.cloud.spanner.connection.ConnectionOptions.DATABASE_ROLE_PROPERTY_NAME;
3131
import static com.google.cloud.spanner.connection.ConnectionOptions.DATA_BOOST_ENABLED_PROPERTY_NAME;
32+
import static com.google.cloud.spanner.connection.ConnectionOptions.DCP_CONCURRENT_STREAMS_LOW_WATERMARK_PROPERTY_NAME;
3233
import static com.google.cloud.spanner.connection.ConnectionOptions.DCP_INITIAL_CHANNELS_PROPERTY_NAME;
3334
import static com.google.cloud.spanner.connection.ConnectionOptions.DCP_MAX_CHANNELS_PROPERTY_NAME;
35+
import static com.google.cloud.spanner.connection.ConnectionOptions.DCP_MAX_RPC_PER_CHANNEL_PROPERTY_NAME;
3436
import static com.google.cloud.spanner.connection.ConnectionOptions.DCP_MIN_CHANNELS_PROPERTY_NAME;
37+
import static com.google.cloud.spanner.connection.ConnectionOptions.DCP_MIN_RPC_PER_CHANNEL_PROPERTY_NAME;
3538
import static com.google.cloud.spanner.connection.ConnectionOptions.DDL_IN_TRANSACTION_MODE_PROPERTY_NAME;
3639
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_AUTOCOMMIT;
3740
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_AUTO_BATCH_DML;
@@ -45,9 +48,12 @@
4548
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_CREDENTIALS;
4649
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_DATABASE_ROLE;
4750
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_DATA_BOOST_ENABLED;
51+
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_DCP_CONCURRENT_STREAMS_LOW_WATERMARK;
4852
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_DCP_INITIAL_CHANNELS;
4953
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_DCP_MAX_CHANNELS;
54+
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_DCP_MAX_RPC_PER_CHANNEL;
5055
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_DCP_MIN_CHANNELS;
56+
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_DCP_MIN_RPC_PER_CHANNEL;
5157
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_DDL_IN_TRANSACTION_MODE;
5258
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_DEFAULT_SEQUENCE_KIND;
5359
import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE;
@@ -504,6 +510,34 @@ public class ConnectionProperties {
504510
DEFAULT_DCP_INITIAL_CHANNELS,
505511
NonNegativeIntegerConverter.INSTANCE,
506512
Context.STARTUP);
513+
static final ConnectionProperty<Integer> DCP_MIN_RPC_PER_CHANNEL =
514+
create(
515+
DCP_MIN_RPC_PER_CHANNEL_PROPERTY_NAME,
516+
"The minimum number of desired RPCs per channel in the dynamic channel pool. Only used when "
517+
+ "enableDynamicChannelPool is true. The default is "
518+
+ DEFAULT_DCP_MIN_RPC_PER_CHANNEL
519+
+ ".",
520+
DEFAULT_DCP_MIN_RPC_PER_CHANNEL,
521+
NonNegativeIntegerConverter.INSTANCE,
522+
Context.STARTUP);
523+
static final ConnectionProperty<Integer> DCP_MAX_RPC_PER_CHANNEL =
524+
create(
525+
DCP_MAX_RPC_PER_CHANNEL_PROPERTY_NAME,
526+
"The maximum number of desired RPCs per channel in the dynamic channel pool. Only used when "
527+
+ "enableDynamicChannelPool is true. The default is "
528+
+ DEFAULT_DCP_MAX_RPC_PER_CHANNEL
529+
+ ".",
530+
DEFAULT_DCP_MAX_RPC_PER_CHANNEL,
531+
NonNegativeIntegerConverter.INSTANCE,
532+
Context.STARTUP);
533+
static final ConnectionProperty<Integer> DCP_CONCURRENT_STREAMS_LOW_WATERMARK =
534+
create(
535+
DCP_CONCURRENT_STREAMS_LOW_WATERMARK_PROPERTY_NAME,
536+
"The concurrent streams low watermark in the dynamic channel pool. Only used when "
537+
+ "enableDynamicChannelPool is true.",
538+
DEFAULT_DCP_CONCURRENT_STREAMS_LOW_WATERMARK,
539+
NonNegativeIntegerConverter.INSTANCE,
540+
Context.STARTUP);
507541
static final ConnectionProperty<String> CHANNEL_PROVIDER =
508542
create(
509543
CHANNEL_PROVIDER_PROPERTY_NAME,

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,9 @@ static class SpannerPoolKey {
159159
private final Integer dcpMinChannels;
160160
private final Integer dcpMaxChannels;
161161
private final Integer dcpInitialChannels;
162+
private final Integer dcpMinRpcPerChannel;
163+
private final Integer dcpMaxRpcPerChannel;
164+
private final Integer dcpConcurrentStreamsLowWatermark;
162165
private final boolean usePlainText;
163166
private final String userAgent;
164167
private final String databaseRole;
@@ -202,6 +205,9 @@ private SpannerPoolKey(ConnectionOptions options) throws IOException {
202205
this.dcpMinChannels = options.getDcpMinChannels();
203206
this.dcpMaxChannels = options.getDcpMaxChannels();
204207
this.dcpInitialChannels = options.getDcpInitialChannels();
208+
this.dcpMinRpcPerChannel = options.getDcpMinRpcPerChannel();
209+
this.dcpMaxRpcPerChannel = options.getDcpMaxRpcPerChannel();
210+
this.dcpConcurrentStreamsLowWatermark = options.getDcpConcurrentStreamsLowWatermark();
205211
this.usePlainText = options.isUsePlainText();
206212
this.userAgent = options.getUserAgent();
207213
this.routeToLeader = options.isRouteToLeader();
@@ -234,6 +240,10 @@ public boolean equals(Object o) {
234240
&& Objects.equals(this.dcpMinChannels, other.dcpMinChannels)
235241
&& Objects.equals(this.dcpMaxChannels, other.dcpMaxChannels)
236242
&& Objects.equals(this.dcpInitialChannels, other.dcpInitialChannels)
243+
&& Objects.equals(this.dcpMinRpcPerChannel, other.dcpMinRpcPerChannel)
244+
&& Objects.equals(this.dcpMaxRpcPerChannel, other.dcpMaxRpcPerChannel)
245+
&& Objects.equals(
246+
this.dcpConcurrentStreamsLowWatermark, other.dcpConcurrentStreamsLowWatermark)
237247
&& Objects.equals(this.databaseRole, other.databaseRole)
238248
&& Objects.equals(this.usePlainText, other.usePlainText)
239249
&& Objects.equals(this.userAgent, other.userAgent)
@@ -265,6 +275,9 @@ public int hashCode() {
265275
this.dcpMinChannels,
266276
this.dcpMaxChannels,
267277
this.dcpInitialChannels,
278+
this.dcpMinRpcPerChannel,
279+
this.dcpMaxRpcPerChannel,
280+
this.dcpConcurrentStreamsLowWatermark,
268281
this.usePlainText,
269282
this.databaseRole,
270283
this.userAgent,
@@ -442,7 +455,10 @@ Spanner createSpanner(SpannerPoolKey key, ConnectionOptions options) {
442455
// Build custom GcpChannelPoolOptions if any DCP-specific options are set.
443456
if (key.dcpMinChannels != null
444457
|| key.dcpMaxChannels != null
445-
|| key.dcpInitialChannels != null) {
458+
|| key.dcpInitialChannels != null
459+
|| key.dcpMinRpcPerChannel != null
460+
|| key.dcpMaxRpcPerChannel != null
461+
|| key.dcpConcurrentStreamsLowWatermark != null) {
446462
// Build GcpChannelPoolOptions from scratch with custom values or Spanner defaults.
447463
// Note: GcpChannelPoolOptions does not have a toBuilder() method, so we must
448464
// construct from scratch using SpannerOptions defaults for unspecified values.
@@ -458,19 +474,32 @@ Spanner createSpanner(SpannerPoolKey key, ConnectionOptions options) {
458474
key.dcpInitialChannels != null
459475
? key.dcpInitialChannels
460476
: SpannerOptions.DEFAULT_DYNAMIC_POOL_INITIAL_SIZE;
461-
GcpChannelPoolOptions poolOptions =
477+
478+
int minRpc =
479+
key.dcpMinRpcPerChannel != null
480+
? key.dcpMinRpcPerChannel
481+
: SpannerOptions.DEFAULT_DYNAMIC_POOL_MIN_RPC;
482+
int maxRpc =
483+
key.dcpMaxRpcPerChannel != null
484+
? key.dcpMaxRpcPerChannel
485+
: SpannerOptions.DEFAULT_DYNAMIC_POOL_MAX_RPC;
486+
487+
GcpChannelPoolOptions.Builder poolOptionsBuilder =
462488
GcpChannelPoolOptions.newBuilder()
463489
.setMinSize(minChannels)
464490
.setMaxSize(maxChannels)
465491
.setInitSize(initChannels)
466492
.setDynamicScaling(
467-
SpannerOptions.DEFAULT_DYNAMIC_POOL_MIN_RPC,
468-
SpannerOptions.DEFAULT_DYNAMIC_POOL_MAX_RPC,
469-
SpannerOptions.DEFAULT_DYNAMIC_POOL_SCALE_DOWN_INTERVAL)
493+
minRpc, maxRpc, SpannerOptions.DEFAULT_DYNAMIC_POOL_SCALE_DOWN_INTERVAL)
470494
.setAffinityKeyLifetime(SpannerOptions.DEFAULT_DYNAMIC_POOL_AFFINITY_KEY_LIFETIME)
471-
.setCleanupInterval(SpannerOptions.DEFAULT_DYNAMIC_POOL_CLEANUP_INTERVAL)
472-
.build();
473-
builder.setGcpChannelPoolOptions(poolOptions);
495+
.setCleanupInterval(SpannerOptions.DEFAULT_DYNAMIC_POOL_CLEANUP_INTERVAL);
496+
497+
if (key.dcpConcurrentStreamsLowWatermark != null) {
498+
poolOptionsBuilder.setConcurrentStreamsLowWatermark(
499+
key.dcpConcurrentStreamsLowWatermark);
500+
}
501+
502+
builder.setGcpChannelPoolOptions(poolOptionsBuilder.build());
474503
}
475504
} else {
476505
// Explicitly disable DCP when enableDynamicChannelPool=false.

java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/GrpcGcpMockServerTest.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.cloud.spanner.Spanner;
2525
import com.google.cloud.spanner.Statement;
2626
import com.google.cloud.spanner.connection.StatementResult.ResultType;
27+
import java.util.Objects;
2728
import org.junit.After;
2829
import org.junit.Before;
2930
import org.junit.Test;
@@ -56,7 +57,7 @@ private void verifyConnectionWorks(Connection connection) {
5657
@Test
5758
public void testDisableGrpcGcp() {
5859
try (Connection connection = createConnection(";enableGrpcGcp=false")) {
59-
Spanner spanner = ((ConnectionImpl) connection).getSpanner();
60+
Spanner spanner = connection.getSpanner();
6061
assertFalse(spanner.getOptions().isGrpcGcpExtensionEnabled());
6162
verifyConnectionWorks(connection);
6263
}
@@ -65,7 +66,7 @@ public void testDisableGrpcGcp() {
6566
@Test
6667
public void testEnableGrpcGcp() {
6768
try (Connection connection = createConnection(";enableGrpcGcp=true")) {
68-
Spanner spanner = ((ConnectionImpl) connection).getSpanner();
69+
Spanner spanner = connection.getSpanner();
6970
assertTrue(spanner.getOptions().isGrpcGcpExtensionEnabled());
7071
verifyConnectionWorks(connection);
7172
}
@@ -74,10 +75,33 @@ public void testEnableGrpcGcp() {
7475
@Test
7576
public void testDefaultGrpcGcp() {
7677
try (Connection connection = createConnection()) {
77-
Spanner spanner = ((ConnectionImpl) connection).getSpanner();
78+
Spanner spanner = connection.getSpanner();
7879
// Default should be true in SpannerOptions
7980
assertTrue(spanner.getOptions().isGrpcGcpExtensionEnabled());
8081
verifyConnectionWorks(connection);
8182
}
8283
}
84+
85+
@Test
86+
public void testDcpNewSettings() {
87+
try (Connection connection =
88+
createConnection(
89+
";enableDynamicChannelPool=true;dcpMinRpcPerChannel=10;dcpMaxRpcPerChannel=100;dcpConcurrentStreamsLowWatermark=5")) {
90+
Spanner spanner = connection.getSpanner();
91+
assertTrue(spanner.getOptions().isGrpcGcpExtensionEnabled());
92+
assertEquals(
93+
10,
94+
Objects.requireNonNull(spanner.getOptions().getGcpChannelPoolOptions())
95+
.getMinRpcPerChannel());
96+
assertEquals(
97+
100,
98+
Objects.requireNonNull(spanner.getOptions().getGcpChannelPoolOptions())
99+
.getMaxRpcPerChannel());
100+
assertEquals(
101+
5,
102+
Objects.requireNonNull(spanner.getOptions().getGcpChannelPoolOptions())
103+
.getConcurrentStreamsLowWatermark());
104+
verifyConnectionWorks(connection);
105+
}
106+
}
83107
}

java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SpannerPoolTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -742,6 +742,39 @@ public void testDynamicChannelPoolWithAllSettings() {
742742
.build()));
743743
}
744744

745+
@Test
746+
public void testDynamicChannelPoolWithNewSettings() {
747+
SpannerPoolKey keyWithNewDcpSettings =
748+
SpannerPoolKey.of(
749+
ConnectionOptions.newBuilder()
750+
.setUri(
751+
"cloudspanner:/projects/p/instances/i/databases/d"
752+
+ "?enableDynamicChannelPool=true;dcpMinRpcPerChannel=10;dcpMaxRpcPerChannel=100;dcpConcurrentStreamsLowWatermark=5")
753+
.setCredentials(NoCredentials.getInstance())
754+
.build());
755+
SpannerPoolKey keyWithDifferentMinRpc =
756+
SpannerPoolKey.of(
757+
ConnectionOptions.newBuilder()
758+
.setUri(
759+
"cloudspanner:/projects/p/instances/i/databases/d"
760+
+ "?enableDynamicChannelPool=true;dcpMinRpcPerChannel=20;dcpMaxRpcPerChannel=100;dcpConcurrentStreamsLowWatermark=5")
761+
.setCredentials(NoCredentials.getInstance())
762+
.build());
763+
764+
assertNotEquals(keyWithNewDcpSettings, keyWithDifferentMinRpc);
765+
766+
// Same configuration should be equal
767+
assertEquals(
768+
keyWithNewDcpSettings,
769+
SpannerPoolKey.of(
770+
ConnectionOptions.newBuilder()
771+
.setUri(
772+
"cloudspanner:/projects/p/instances/i/databases/d"
773+
+ "?enableDynamicChannelPool=true;dcpMinRpcPerChannel=10;dcpMaxRpcPerChannel=100;dcpConcurrentStreamsLowWatermark=5")
774+
.setCredentials(NoCredentials.getInstance())
775+
.build()));
776+
}
777+
745778
@Test
746779
public void testExplicitlyDisabledDynamicChannelPool() {
747780
SpannerPoolKey keyWithoutDcpSetting =

0 commit comments

Comments
 (0)