Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.

Commit 866a8c2

Browse files
authored
fix: rollback transactions that are waiting for tx-id to be returned (#4342)
If a transaction has been started by an async query, and the transaction is closed before the async query has returned the first results and the transaction ID, the transaction would not be rolled back by the client. This would cause locks to be held for longer than they should. This fix adds a check whether the transaction has already sent a request that will start the transaction, and if so, it will add a callback that will rollback the transaction when the transaction ID is returned.
1 parent 2869f74 commit 866a8c2

2 files changed

Lines changed: 166 additions & 0 deletions

File tree

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,25 @@ ApiFuture<Empty> rollbackAsync() {
614614
getTransactionChannelHint());
615615
session.markUsed(clock.instant());
616616
return apiFuture;
617+
} else if (transactionIdFuture != null) {
618+
ApiFuture<ByteString> transactionIdOrEmptyFuture =
619+
ApiFutures.catching(
620+
transactionIdFuture,
621+
Throwable.class,
622+
input -> ByteString.empty(),
623+
MoreExecutors.directExecutor());
624+
return ApiFutures.transformAsync(
625+
transactionIdOrEmptyFuture,
626+
transactionId ->
627+
transactionId.isEmpty()
628+
? ApiFutures.immediateFuture(Empty.getDefaultInstance())
629+
: rpc.rollbackAsync(
630+
RollbackRequest.newBuilder()
631+
.setSession(session.getName())
632+
.setTransactionId(transactionId)
633+
.build(),
634+
getTransactionChannelHint()),
635+
MoreExecutors.directExecutor());
617636
} else {
618637
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
619638
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import static org.junit.Assert.assertNull;
20+
21+
import com.google.api.core.ApiFuture;
22+
import com.google.cloud.NoCredentials;
23+
import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
24+
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
25+
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
26+
import com.google.cloud.spanner.connection.AbstractMockServerTest;
27+
import com.google.cloud.spanner.connection.RandomResultSetGenerator;
28+
import com.google.common.base.Function;
29+
import com.google.spanner.v1.ExecuteSqlRequest;
30+
import com.google.spanner.v1.RollbackRequest;
31+
import io.grpc.ManagedChannelBuilder;
32+
import io.grpc.Status;
33+
import java.util.Objects;
34+
import java.util.concurrent.ExecutorService;
35+
import java.util.concurrent.Executors;
36+
import org.junit.BeforeClass;
37+
import org.junit.Test;
38+
import org.junit.runner.RunWith;
39+
import org.junit.runners.JUnit4;
40+
import org.threeten.bp.Duration;
41+
42+
@RunWith(JUnit4.class)
43+
public class OrphanedTransactionTest extends AbstractMockServerTest {
44+
private static final Statement STATEMENT = Statement.of("SELECT * FROM random");
45+
46+
@BeforeClass
47+
public static void setupReadResult() {
48+
com.google.cloud.spanner.connection.RandomResultSetGenerator generator =
49+
new RandomResultSetGenerator(10);
50+
mockSpanner.putStatementResult(StatementResult.query(STATEMENT, generator.generate()));
51+
}
52+
53+
private Spanner createSpanner() {
54+
return SpannerOptions.newBuilder()
55+
.setProjectId("fake-project")
56+
.setHost("http://localhost:" + getPort())
57+
.setCredentials(NoCredentials.getInstance())
58+
.setChannelConfigurator(ManagedChannelBuilder::usePlaintext)
59+
.setSessionPoolOption(
60+
SessionPoolOptions.newBuilder().setWaitForMinSessions(Duration.ofSeconds(5L)).build())
61+
.build()
62+
.getService();
63+
}
64+
65+
@Test
66+
public void testOrphanedTransaction() throws Exception {
67+
ExecutorService executor = Executors.newCachedThreadPool();
68+
try (Spanner spanner = createSpanner()) {
69+
DatabaseClient client =
70+
spanner.getDatabaseClient(
71+
DatabaseId.of("fake-project", "fake-instance", "fake-database"));
72+
// Freeze the mock server to ensure that the request lands on the mock server before we
73+
// proceed.
74+
mockSpanner.freeze();
75+
AsyncTransactionManager manager = client.transactionManagerAsync();
76+
TransactionContextFuture context = manager.beginAsync();
77+
context.then(
78+
(txn, input) -> {
79+
try (AsyncResultSet resultSet = txn.executeQueryAsync(STATEMENT)) {
80+
resultSet.toListAsync(
81+
(Function<StructReader, Object>)
82+
row -> Objects.requireNonNull(row).getValue(0).getAsString(),
83+
executor);
84+
}
85+
return null;
86+
},
87+
executor);
88+
// Wait for the ExecuteSqlRequest to land on the mock server.
89+
mockSpanner.waitForRequestsToContain(
90+
input ->
91+
input instanceof ExecuteSqlRequest
92+
&& ((ExecuteSqlRequest) input).getSql().equals(STATEMENT.getSql()),
93+
5000L);
94+
// Now close the transaction. This should (eventually) trigger a rollback, even though the
95+
// client has not yet received a transaction ID.
96+
manager.closeAsync();
97+
// Unfreeze the mock server and wait for the Rollback request to be received.
98+
mockSpanner.unfreeze();
99+
mockSpanner.waitForLastRequestToBe(RollbackRequest.class, 5000L);
100+
} finally {
101+
executor.shutdown();
102+
}
103+
}
104+
105+
@Test
106+
public void testOrphanedTransactionWithFailedFirstQuery() throws Exception {
107+
ExecutorService executor = Executors.newCachedThreadPool();
108+
mockSpanner.setExecuteStreamingSqlExecutionTime(
109+
SimulatedExecutionTime.ofException(
110+
Status.INVALID_ARGUMENT.withDescription("table not found").asRuntimeException()));
111+
try (Spanner spanner = createSpanner()) {
112+
DatabaseClient client =
113+
spanner.getDatabaseClient(
114+
DatabaseId.of("fake-project", "fake-instance", "fake-database"));
115+
// Freeze the mock server to ensure that the request lands on the mock server before we
116+
// proceed.
117+
mockSpanner.freeze();
118+
AsyncTransactionManager manager = client.transactionManagerAsync();
119+
TransactionContextFuture context = manager.beginAsync();
120+
context.then(
121+
(txn, input) -> {
122+
try (AsyncResultSet resultSet = txn.executeQueryAsync(STATEMENT)) {
123+
resultSet.toListAsync(
124+
(Function<StructReader, Object>)
125+
row -> Objects.requireNonNull(row).getValue(0).getAsString(),
126+
executor);
127+
}
128+
return null;
129+
},
130+
executor);
131+
// Wait for the ExecuteSqlRequest to land on the mock server.
132+
mockSpanner.waitForRequestsToContain(
133+
input ->
134+
input instanceof ExecuteSqlRequest
135+
&& ((ExecuteSqlRequest) input).getSql().equals(STATEMENT.getSql()),
136+
5000L);
137+
// Now close the transaction. This will not trigger a Rollback, as the statement failed.
138+
// The closeResult will be done when the error for the failed statement is returned to the
139+
// client.
140+
ApiFuture<Void> closeResult = manager.closeAsync();
141+
mockSpanner.unfreeze();
142+
assertNull(closeResult.get());
143+
} finally {
144+
executor.shutdown();
145+
}
146+
}
147+
}

0 commit comments

Comments
 (0)