Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.

Commit c839974

Browse files
authored
chore: route multi-use read-only txn reads via location-aware cache (#4340)
* chore: route multi-use read-only txn reads via location-aware cache * incorporate changes
1 parent 95ac7a7 commit c839974

3 files changed

Lines changed: 483 additions & 16 deletions

File tree

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,20 +66,28 @@ public void update(CacheUpdate update) {
6666
}
6767

6868
public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder) {
69+
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()));
70+
}
71+
72+
public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder, boolean preferLeader) {
6973
recipeCache.computeKeys(reqBuilder);
7074
return fillRoutingHint(
71-
reqBuilder.getTransaction(),
72-
reqBuilder.getDirectedReadOptions(),
75+
preferLeader,
7376
KeyRangeCache.RangeMode.COVERING_SPLIT,
77+
reqBuilder.getDirectedReadOptions(),
7478
reqBuilder.getRoutingHintBuilder());
7579
}
7680

7781
public ChannelEndpoint findServer(ExecuteSqlRequest.Builder reqBuilder) {
82+
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()));
83+
}
84+
85+
public ChannelEndpoint findServer(ExecuteSqlRequest.Builder reqBuilder, boolean preferLeader) {
7886
recipeCache.computeKeys(reqBuilder);
7987
return fillRoutingHint(
80-
reqBuilder.getTransaction(),
81-
reqBuilder.getDirectedReadOptions(),
88+
preferLeader,
8289
KeyRangeCache.RangeMode.PICK_RANDOM,
90+
reqBuilder.getDirectedReadOptions(),
8391
reqBuilder.getRoutingHintBuilder());
8492
}
8593

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java

Lines changed: 77 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import com.google.api.core.InternalApi;
2020
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
21+
import com.google.common.cache.Cache;
22+
import com.google.common.cache.CacheBuilder;
2123
import com.google.protobuf.ByteString;
2224
import com.google.spanner.v1.BeginTransactionRequest;
2325
import com.google.spanner.v1.CommitRequest;
@@ -51,6 +53,7 @@
5153
*/
5254
@InternalApi
5355
final class KeyAwareChannel extends ManagedChannel {
56+
private static final long MAX_TRACKED_READ_ONLY_TRANSACTIONS = 100_000L;
5457
private static final String STREAMING_READ_METHOD = "google.spanner.v1.Spanner/StreamingRead";
5558
private static final String STREAMING_SQL_METHOD =
5659
"google.spanner.v1.Spanner/ExecuteStreamingSql";
@@ -67,6 +70,11 @@ final class KeyAwareChannel extends ManagedChannel {
6770
private final Map<String, SoftReference<ChannelFinder>> channelFinders =
6871
new ConcurrentHashMap<>();
6972
private final Map<ByteString, String> transactionAffinities = new ConcurrentHashMap<>();
73+
// Maps read-only transaction IDs to their preferLeader value.
74+
// Strong reads → true (prefer leader), Stale reads → false (any replica).
75+
// Bounded to prevent unbounded growth if application code does not close read-only transactions.
76+
private final Cache<ByteString, Boolean> readOnlyTxPreferLeader =
77+
CacheBuilder.newBuilder().maximumSize(MAX_TRACKED_READ_ONLY_TRANSACTIONS).build();
7078

7179
private KeyAwareChannel(
7280
InstantiatingGrpcChannelProvider channelProvider,
@@ -184,12 +192,34 @@ private void clearAffinity(ByteString transactionId) {
184192
return;
185193
}
186194
transactionAffinities.remove(transactionId);
195+
readOnlyTxPreferLeader.invalidate(transactionId);
187196
}
188197

189198
void clearTransactionAffinity(ByteString transactionId) {
190199
clearAffinity(transactionId);
191200
}
192201

202+
private boolean isReadOnlyTransaction(ByteString transactionId) {
203+
return transactionId != null
204+
&& !transactionId.isEmpty()
205+
&& readOnlyTxPreferLeader.getIfPresent(transactionId) != null;
206+
}
207+
208+
@Nullable
209+
private Boolean readOnlyPreferLeader(ByteString transactionId) {
210+
if (transactionId == null || transactionId.isEmpty()) {
211+
return null;
212+
}
213+
return readOnlyTxPreferLeader.getIfPresent(transactionId);
214+
}
215+
216+
private void trackReadOnlyTransaction(ByteString transactionId, boolean preferLeader) {
217+
if (transactionId == null || transactionId.isEmpty()) {
218+
return;
219+
}
220+
readOnlyTxPreferLeader.put(transactionId, preferLeader);
221+
}
222+
193223
private void recordAffinity(
194224
ByteString transactionId, @Nullable ChannelEndpoint endpoint, boolean allowDefault) {
195225
if (transactionId == null || transactionId.isEmpty() || endpoint == null) {
@@ -250,6 +280,8 @@ static final class KeyAwareClientCall<RequestT, ResponseT>
250280
@Nullable private Boolean pendingMessageCompression;
251281
@Nullable private io.grpc.Status cancelledStatus;
252282
@Nullable private Metadata cancelledTrailers;
283+
private boolean isReadOnlyBegin;
284+
private boolean readOnlyIsStrong;
253285
private final Object lock = new Object();
254286

255287
KeyAwareClientCall(
@@ -307,12 +339,14 @@ public void sendMessage(RequestT message) {
307339

308340
if (message instanceof ReadRequest) {
309341
ReadRequest.Builder reqBuilder = ((ReadRequest) message).toBuilder();
342+
maybeTrackReadOnlyBegin(reqBuilder.getTransaction());
310343
RoutingDecision routing = routeFromRequest(reqBuilder);
311344
finder = routing.finder;
312345
endpoint = routing.endpoint;
313346
message = (RequestT) reqBuilder.build();
314347
} else if (message instanceof ExecuteSqlRequest) {
315348
ExecuteSqlRequest.Builder reqBuilder = ((ExecuteSqlRequest) message).toBuilder();
349+
maybeTrackReadOnlyBegin(reqBuilder.getTransaction());
316350
RoutingDecision routing = routeFromRequest(reqBuilder);
317351
finder = routing.finder;
318352
endpoint = routing.endpoint;
@@ -325,7 +359,12 @@ public void sendMessage(RequestT message) {
325359
finder = parentChannel.getOrCreateChannelFinder(databaseId);
326360
endpoint = finder.findServer(reqBuilder);
327361
}
328-
allowDefaultAffinity = true;
362+
if (reqBuilder.hasOptions() && reqBuilder.getOptions().hasReadOnly()) {
363+
isReadOnlyBegin = true;
364+
readOnlyIsStrong = reqBuilder.getOptions().getReadOnly().getStrong();
365+
} else {
366+
allowDefaultAffinity = true;
367+
}
329368
message = (RequestT) reqBuilder.build();
330369
} else if (message instanceof CommitRequest) {
331370
CommitRequest request = (CommitRequest) message;
@@ -483,32 +522,52 @@ void maybeClearAffinity() {
483522
parentChannel.clearAffinity(transactionIdToClear);
484523
}
485524

525+
private void maybeTrackReadOnlyBegin(TransactionSelector selector) {
526+
if (selector.getSelectorCase() == TransactionSelector.SelectorCase.BEGIN
527+
&& selector.getBegin().hasReadOnly()) {
528+
isReadOnlyBegin = true;
529+
readOnlyIsStrong = selector.getBegin().getReadOnly().getStrong();
530+
}
531+
}
532+
486533
private RoutingDecision routeFromRequest(ReadRequest.Builder reqBuilder) {
487534
String databaseId = parentChannel.extractDatabaseIdFromSession(reqBuilder.getSession());
488535
ByteString transactionId = transactionIdFromSelector(reqBuilder.getTransaction());
489-
ChannelEndpoint endpoint = parentChannel.affinityEndpoint(transactionId);
536+
// Skip affinity for read-only transactions so each read routes independently.
537+
boolean isReadOnly = parentChannel.isReadOnlyTransaction(transactionId);
538+
ChannelEndpoint endpoint = isReadOnly ? null : parentChannel.affinityEndpoint(transactionId);
490539
ChannelFinder finder = null;
491540
if (databaseId != null) {
492541
finder = parentChannel.getOrCreateChannelFinder(databaseId);
493-
ChannelEndpoint routed = finder.findServer(reqBuilder);
494-
if (endpoint == null) {
495-
endpoint = routed;
496-
}
542+
}
543+
if (databaseId != null && endpoint == null) {
544+
Boolean preferLeaderOverride = parentChannel.readOnlyPreferLeader(transactionId);
545+
ChannelEndpoint routed =
546+
preferLeaderOverride != null
547+
? finder.findServer(reqBuilder, preferLeaderOverride)
548+
: finder.findServer(reqBuilder);
549+
endpoint = routed;
497550
}
498551
return new RoutingDecision(finder, endpoint);
499552
}
500553

501554
private RoutingDecision routeFromRequest(ExecuteSqlRequest.Builder reqBuilder) {
502555
String databaseId = parentChannel.extractDatabaseIdFromSession(reqBuilder.getSession());
503556
ByteString transactionId = transactionIdFromSelector(reqBuilder.getTransaction());
504-
ChannelEndpoint endpoint = parentChannel.affinityEndpoint(transactionId);
557+
// Skip affinity for read-only transactions so each query routes independently.
558+
boolean isReadOnly = parentChannel.isReadOnlyTransaction(transactionId);
559+
ChannelEndpoint endpoint = isReadOnly ? null : parentChannel.affinityEndpoint(transactionId);
505560
ChannelFinder finder = null;
506561
if (databaseId != null) {
507562
finder = parentChannel.getOrCreateChannelFinder(databaseId);
508-
ChannelEndpoint routed = finder.findServer(reqBuilder);
509-
if (endpoint == null) {
510-
endpoint = routed;
511-
}
563+
}
564+
if (databaseId != null && endpoint == null) {
565+
Boolean preferLeaderOverride = parentChannel.readOnlyPreferLeader(transactionId);
566+
ChannelEndpoint routed =
567+
preferLeaderOverride != null
568+
? finder.findServer(reqBuilder, preferLeaderOverride)
569+
: finder.findServer(reqBuilder);
570+
endpoint = routed;
512571
}
513572
return new RoutingDecision(finder, endpoint);
514573
}
@@ -554,7 +613,13 @@ public void onMessage(ResponseT message) {
554613
transactionId = transactionIdFromTransaction(response);
555614
}
556615
if (transactionId != null) {
557-
call.maybeRecordAffinity(transactionId);
616+
if (call.isReadOnlyBegin) {
617+
// Track the read-only transaction so subsequent reads skip affinity
618+
// and route independently based on key-based routing.
619+
call.parentChannel.trackReadOnlyTransaction(transactionId, call.readOnlyIsStrong);
620+
} else if (!call.parentChannel.isReadOnlyTransaction(transactionId)) {
621+
call.maybeRecordAffinity(transactionId);
622+
}
558623
}
559624
super.onMessage(message);
560625
}

0 commit comments

Comments
 (0)