Skip to content

Commit ef1490f

Browse files
authored
IGNITE-28102 Jdbc. Fix client observable timestamp skew in multistatement query cases (#7949)
1 parent 7a10df1 commit ef1490f

8 files changed

Lines changed: 496 additions & 42 deletions

File tree

modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1119,7 +1119,7 @@ private CompletableFuture<ResponseWriter> processOperation(
11191119
);
11201120

11211121
case ClientOp.SQL_CURSOR_NEXT_RESULT_SET:
1122-
return ClientSqlCursorNextResultRequest.process(in, resources, partitionOperationsExecutor, metrics);
1122+
return ClientSqlCursorNextResultRequest.process(partitionOperationsExecutor, in, resources, metrics, tsTracker);
11231123

11241124
case ClientOp.OPERATION_CANCEL:
11251125
return ClientOperationCancelRequest.process(in, cancelHandles);

modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
import org.apache.ignite.internal.binarytuple.BinaryTupleContainer;
3737
import org.apache.ignite.internal.binarytuple.BinaryTupleParser;
3838
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
39+
import org.apache.ignite.internal.client.proto.ClientOp;
3940
import org.apache.ignite.internal.client.sql.QueryModifier;
41+
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
4042
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
4143
import org.apache.ignite.internal.lang.IgniteInternalException;
4244
import org.apache.ignite.internal.sql.SqlCommon;
@@ -268,6 +270,7 @@ static CompletableFuture<ResponseWriter> writeResultSetAsync(
268270
ClientResourceRegistry resources,
269271
AsyncResultSetImpl asyncResultSet,
270272
ClientHandlerMetricSource metrics,
273+
HybridTimestampTracker parentTsTracker,
271274
int pageSize,
272275
boolean includePartitionAwarenessMeta,
273276
boolean sqlDirectTxMappingSupported,
@@ -277,7 +280,7 @@ static CompletableFuture<ResponseWriter> writeResultSetAsync(
277280
) {
278281
try {
279282
Long nextResultResourceId = sqlMultiStatementSupported && asyncResultSet.cursor().hasNextResult()
280-
? saveNextResultResource(asyncResultSet.cursor().nextResult(), pageSize, resources, executor)
283+
? saveNextResultResource(asyncResultSet.cursor().nextResult(), pageSize, resources, parentTsTracker, executor)
281284
: null;
282285

283286
if ((asyncResultSet.hasRowSet() && asyncResultSet.hasMorePages())) {
@@ -317,10 +320,11 @@ private static Long saveNextResultResource(
317320
CompletableFuture<AsyncSqlCursor<InternalSqlRow>> nextResultFuture,
318321
int pageSize,
319322
ClientResourceRegistry resources,
323+
HybridTimestampTracker parentTsTracker,
320324
Executor executor
321325
) throws IgniteInternalCheckedException {
322326
ClientResource resource = new ClientResource(
323-
new CursorWithPageSize(nextResultFuture, pageSize),
327+
new NextCursorContext(parentTsTracker, pageSize, nextResultFuture),
324328
() -> nextResultFuture.thenAccept(cur -> iterateThroughResultsAndCloseThem(cur, executor))
325329
);
326330

@@ -414,22 +418,33 @@ private static void packPartitionAwarenessMeta(
414418
}
415419
}
416420

417-
/** Holder of the cursor future and page size. */
418-
static class CursorWithPageSize {
421+
/** Holder of the context for future result set retrieval. */
422+
static class NextCursorContext {
419423
private final CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture;
420424
private final int pageSize;
421-
422-
CursorWithPageSize(CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture, int pageSize) {
423-
this.cursorFuture = cursorFuture;
425+
private final HybridTimestampTracker parentTsTracker;
426+
427+
NextCursorContext(
428+
HybridTimestampTracker parentTsTracker,
429+
int pageSize,
430+
CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture
431+
) {
432+
this.parentTsTracker = parentTsTracker;
424433
this.pageSize = pageSize;
434+
this.cursorFuture = cursorFuture;
425435
}
426436

427-
CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture() {
428-
return cursorFuture;
437+
/** Tracker of the request that initiated query processing (i.e. {@link ClientOp#SQL_EXEC}). */
438+
HybridTimestampTracker parentTsTracker() {
439+
return parentTsTracker;
429440
}
430441

431442
int pageSize() {
432443
return pageSize;
433444
}
445+
446+
CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture() {
447+
return cursorFuture;
448+
}
434449
}
435450
}

modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@
2323
import org.apache.ignite.client.handler.ClientResource;
2424
import org.apache.ignite.client.handler.ClientResourceRegistry;
2525
import org.apache.ignite.client.handler.ResponseWriter;
26-
import org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.CursorWithPageSize;
26+
import org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.NextCursorContext;
2727
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
28+
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
2829
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
2930
import org.apache.ignite.internal.sql.api.AsyncResultSetImpl;
3031
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
@@ -37,43 +38,54 @@ public class ClientSqlCursorNextResultRequest {
3738
/**
3839
* Processes the request.
3940
*
41+
* @param operationExecutor Operation executor.
4042
* @param in Unpacker.
43+
* @param resources Resource bundle.
44+
* @param metrics Client metrics.
45+
* @param requestTsTracker TS tracker attached to current request processing.
4146
* @return Future representing result of operation.
4247
*/
4348
public static CompletableFuture<ResponseWriter> process(
49+
Executor operationExecutor,
4450
ClientMessageUnpacker in,
4551
ClientResourceRegistry resources,
46-
Executor operationExecutor,
47-
ClientHandlerMetricSource metrics
52+
ClientHandlerMetricSource metrics,
53+
HybridTimestampTracker requestTsTracker
4854
) throws IgniteInternalCheckedException {
4955
long resourceId = in.unpackLong();
5056
ClientResource resource = resources.remove(resourceId);
51-
CursorWithPageSize cursorWithPageSize = resource.get(CursorWithPageSize.class);
52-
int pageSize = cursorWithPageSize.pageSize();
57+
NextCursorContext nextCursorContext = resource.get(NextCursorContext.class);
58+
HybridTimestampTracker parentTsTracker = nextCursorContext.parentTsTracker();
59+
int pageSize = nextCursorContext.pageSize();
5360

54-
CompletableFuture<ResponseWriter> f = cursorWithPageSize.cursorFuture()
61+
CompletableFuture<ResponseWriter> f = nextCursorContext.cursorFuture()
5562
.thenComposeAsync(cur -> cur.requestNextAsync(pageSize)
5663
.thenApply(batchRes -> new AsyncResultSetImpl<SqlRow>(
5764
cur,
5865
batchRes,
5966
pageSize
6067
)
61-
).thenCompose(asyncResultSet ->
62-
ClientSqlCommon.writeResultSetAsync(
63-
resources,
64-
asyncResultSet,
65-
metrics,
66-
pageSize,
67-
false,
68-
false,
69-
true,
70-
false,
71-
operationExecutor)
72-
).thenApply(rsWriter -> rsWriter), operationExecutor);
68+
).thenCompose(asyncResultSet -> {
69+
// For multi-statement DML operations, this will help us keep the client's timestamp tracker up to date and
70+
// ensure client reads are consistent with the latest updates.
71+
requestTsTracker.update(parentTsTracker.get());
72+
73+
return ClientSqlCommon.writeResultSetAsync(
74+
resources,
75+
asyncResultSet,
76+
metrics,
77+
parentTsTracker,
78+
pageSize,
79+
false,
80+
false,
81+
true,
82+
false,
83+
operationExecutor);
84+
}).thenApply(rsWriter -> rsWriter), operationExecutor);
7385

7486
f.whenCompleteAsync((r, t) -> {
7587
if (t != null) {
76-
cursorWithPageSize.cursorFuture().thenAccept(cur -> closeRemainingCursors(cur, false, operationExecutor));
88+
nextCursorContext.cursorFuture().thenAccept(cur -> closeRemainingCursors(cur, false, operationExecutor));
7789
}
7890
}, operationExecutor);
7991

modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public static CompletableFuture<ResponseWriter> process(
9898
ClockService clockService,
9999
NotificationSender notificationSender,
100100
@Nullable String username,
101-
boolean sqlMultistatementsSupported,
101+
boolean sqlMultistatementSupported,
102102
boolean sqlPartitionAwarenessQualifiedNameSupported,
103103
Consumer<SqlQueryType> queryTypeListener
104104
) {
@@ -122,7 +122,7 @@ public static CompletableFuture<ResponseWriter> process(
122122
resIdHolder
123123
);
124124

125-
ClientSqlProperties props = new ClientSqlProperties(in, sqlMultistatementsSupported);
125+
ClientSqlProperties props = new ClientSqlProperties(in, sqlMultistatementSupported);
126126
String statement = in.unpackString();
127127
Object[] arguments = readArgsNotNull(in);
128128

@@ -147,10 +147,11 @@ public static CompletableFuture<ResponseWriter> process(
147147
resources,
148148
asyncResultSet,
149149
metrics,
150+
timestampTracker,
150151
props.pageSize(),
151152
includePartitionAwarenessMeta,
152153
sqlDirectTxMappingSupported,
153-
sqlMultistatementsSupported,
154+
sqlMultistatementSupported,
154155
sqlPartitionAwarenessQualifiedNameSupported,
155156
operationExecutor))
156157
.thenApply(rsWriter -> out -> {

modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultiStatementSelfTest.java

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919

2020
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
2121
import static org.apache.ignite.jdbc.util.JdbcTestUtils.assertThrowsSqlException;
22+
import static org.apache.ignite.jdbc.util.RowsProjectionMatcher.hasValuesInAnyOrder;
23+
import static org.apache.ignite.jdbc.util.RowsProjectionMatcher.hasValuesOrder;
24+
import static org.apache.ignite.jdbc.util.StatementResultCheck.isResultSet;
25+
import static org.apache.ignite.jdbc.util.StatementResultCheck.isUpdateCounter;
26+
import static org.apache.ignite.jdbc.util.StatementResultCheck.noMoreResults;
2227
import static org.hamcrest.MatcherAssert.assertThat;
2328
import static org.hamcrest.Matchers.containsString;
2429
import static org.hamcrest.Matchers.is;
@@ -35,9 +40,11 @@
3540
import java.sql.ResultSet;
3641
import java.sql.SQLException;
3742
import java.sql.Statement;
43+
import java.util.List;
3844
import java.util.concurrent.TimeUnit;
3945
import org.apache.ignite.internal.jdbc.JdbcStatement;
4046
import org.apache.ignite.internal.sql.SqlCommon;
47+
import org.apache.ignite.jdbc.util.StatementResultCheck;
4148
import org.awaitility.Awaitility;
4249
import org.junit.jupiter.api.AfterEach;
4350
import org.junit.jupiter.api.BeforeEach;
@@ -281,18 +288,26 @@ public void statementMustCloseAllDependentCursors() throws SQLException {
281288

282289
@Test
283290
public void testMixedDmlQueryExecute() throws Exception {
284-
boolean res = stmt.execute("INSERT INTO TEST_TX VALUES (6, 5, '5'); DELETE FROM TEST_TX WHERE ID=6; SELECT 1;");
285-
assertFalse(res);
286-
assertEquals(1, getResultSetSize());
291+
assertStatementResults("INSERT INTO TEST_TX VALUES (6, 5, '5'); DELETE FROM TEST_TX WHERE ID=6; SELECT 1;",
292+
isUpdateCounter(1),
293+
isUpdateCounter(1),
294+
isResultSet(rs -> rs.getInt(1), hasValuesOrder(1)),
295+
noMoreResults()
296+
);
287297

288-
res = stmt.execute("SELECT 1; INSERT INTO TEST_TX VALUES (7, 5, '5'); DELETE FROM TEST_TX WHERE ID=6;");
289-
assertEquals(true, res);
290-
assertEquals(1, getResultSetSize());
298+
assertStatementResults("SELECT 1; INSERT INTO TEST_TX VALUES (7, 5, '5'); DELETE FROM TEST_TX WHERE ID=6;",
299+
isResultSet(rs -> rs.getInt(1), hasValuesOrder(1)),
300+
isUpdateCounter(1),
301+
isUpdateCounter(0),
302+
noMoreResults()
303+
);
291304

292-
// empty results set in the middle
293-
res = stmt.execute("SELECT * FROM TEST_TX; INSERT INTO TEST_TX VALUES (6, 6, '6'); SELECT * FROM TEST_TX;");
294-
assertEquals(true, res);
295-
assertEquals(11, getResultSetSize());
305+
assertStatementResults("SELECT * FROM TEST_TX; INSERT INTO TEST_TX VALUES (6, 6, '6'); SELECT * FROM TEST_TX;",
306+
isResultSet(rs -> rs.getInt(1), hasValuesInAnyOrder(1, 2, 3, 4, 7)),
307+
isUpdateCounter(1),
308+
isResultSet(rs -> rs.getInt(1), hasValuesInAnyOrder(1, 2, 3, 4, 6, 7)),
309+
noMoreResults()
310+
);
296311
}
297312

298313
@Test
@@ -780,6 +795,18 @@ private int getResultSetSize() throws SQLException {
780795
return size;
781796
}
782797

798+
/** Verifies that after query execution statement returns results that satisfy provided assertions. */
799+
private void assertStatementResults(String query, StatementResultCheck... resultCheck) throws SQLException {
800+
List<StatementResultCheck> checks = List.of(resultCheck);
801+
802+
stmt.execute(query);
803+
804+
for (StatementResultCheck check : checks) {
805+
check.check(stmt);
806+
stmt.getMoreResults();
807+
}
808+
}
809+
783810
private boolean checkNoMoreResults() throws SQLException {
784811
boolean more = stmt.getMoreResults();
785812
int updCnt = stmt.getUpdateCount();
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.jdbc.util;
19+
20+
import java.sql.ResultSet;
21+
import java.sql.SQLException;
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
25+
/**
26+
* Functional interface for extracting a value from the current row of a {@link ResultSet}.
27+
*
28+
* @param <T> Type of the extracted value.
29+
*/
30+
@FunctionalInterface
31+
public interface RowColumnProjection<T> {
32+
/** Extracts a value from the current row of {@code rs}. */
33+
T extract(ResultSet rs) throws SQLException;
34+
35+
/** Drains result set to list by projecting each record with provided extractor. */
36+
static <T> List<T> projectRowsColumn(ResultSet rs, RowColumnProjection<T> extractor) throws SQLException {
37+
List<T> result = new ArrayList<>();
38+
39+
while (rs.next()) {
40+
result.add(extractor.extract(rs));
41+
}
42+
43+
return result;
44+
}
45+
}

0 commit comments

Comments
 (0)