Skip to content

Commit eb4d96c

Browse files
authored
IGNITE-28483 Sql. Simplify close method in DistributedQueryManager (#7951)
1 parent 234a393 commit eb4d96c

2 files changed

Lines changed: 13 additions & 53 deletions

File tree

modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1143,7 +1143,17 @@ private void handleError(Throwable ex, String initiatorNode, long fragmentId) {
11431143
} catch (Exception e) {
11441144
LOG.info("Unable to send error message", e);
11451145

1146-
close(CancellationReason.CANCEL);
1146+
try {
1147+
messageService.send(
1148+
initiatorNode,
1149+
FACTORY.queryCloseMessage()
1150+
.queryId(executionId.queryId())
1151+
.executionToken(executionId.executionToken())
1152+
.build()
1153+
);
1154+
} finally {
1155+
close(CancellationReason.CANCEL);
1156+
}
11471157
}
11481158
}
11491159

@@ -1327,19 +1337,13 @@ private CompletableFuture<Void> close(CancellationReason reason) {
13271337
return cancelFut;
13281338
}
13291339

1330-
CompletableFuture<Void> start = new CompletableFuture<>();
1331-
13321340
CompletableFuture<Void> stage;
13331341

13341342
if (coordinator) {
1335-
stage = start.thenCompose(ignored -> closeRootNode(reason))
1343+
stage = closeRootNode(reason)
13361344
.thenCompose(ignored -> awaitFragmentInitialisationAndClose());
13371345
} else {
1338-
stage = start.thenCompose(ignored -> messageService.send(coordinatorNodeName, FACTORY.queryCloseMessage()
1339-
.queryId(executionId.queryId())
1340-
.executionToken(executionId.executionToken())
1341-
.build()).exceptionally(ignore -> null))
1342-
.thenCompose(ignored -> closeLocalFragments());
1346+
stage = closeLocalFragments();
13431347
}
13441348

13451349
stage.whenComplete((r, e) -> {
@@ -1354,8 +1358,6 @@ private CompletableFuture<Void> close(CancellationReason reason) {
13541358
cancelFut.complete(null);
13551359
}).thenRun(() -> localFragments.forEach(f -> f.context().cancel()));
13561360

1357-
start.completeAsync(() -> null, taskExecutor);
1358-
13591361
return cancelFut;
13601362
}
13611363

modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1235,48 +1235,6 @@ public void ddlExecutionUpdatesObservableTime() {
12351235
assertThat(txCtx.observableTime(), equalTo(expectedCatalogActivationTimestamp));
12361236
}
12371237

1238-
@Test
1239-
public void coordinatorIgnoresRemoteCloseErrorFromNodeOnCoordinator() throws InterruptedException {
1240-
ExecutionService execService = executionServices.get(0);
1241-
1242-
nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
1243-
1244-
var expectedEx = new RuntimeException("Test error");
1245-
var queryClosed = new CountDownLatch(nodeNames.size() - 1);
1246-
1247-
String coordinatorNode = nodeNames.get(0);
1248-
testCluster.node(coordinatorNode).interceptor((senderNode, msg, original) -> {
1249-
if (msg instanceof QueryStartRequest) {
1250-
QueryStartRequest queryStart = (QueryStartRequest) msg;
1251-
1252-
String nodeName = senderNode.name();
1253-
testCluster.node(coordinatorNode).messageService().send(nodeName, new SqlQueryMessagesFactory().queryStartResponse()
1254-
.queryId(queryStart.queryId())
1255-
.fragmentId(queryStart.fragmentId())
1256-
.error(expectedEx)
1257-
.build()
1258-
);
1259-
} else {
1260-
original.onMessage(senderNode, msg);
1261-
}
1262-
1263-
if (msg instanceof QueryCloseMessage) {
1264-
queryClosed.countDown();
1265-
return CompletableFuture.failedFuture(new RuntimeException("Test exception: failed to close"));
1266-
} else {
1267-
return nullCompletedFuture();
1268-
}
1269-
});
1270-
1271-
SqlOperationContext ctx = createContext();
1272-
QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
1273-
1274-
RuntimeException actualException = assertWillThrow(execService.executePlan(plan, ctx), RuntimeException.class);
1275-
assertEquals(expectedEx, actualException);
1276-
1277-
queryClosed.await();
1278-
}
1279-
12801238
@Test
12811239
public void coordinatorIgnoresRemoteCloseErrorOnNode() throws InterruptedException {
12821240
ExecutionService execService = executionServices.get(0);

0 commit comments

Comments
 (0)