|
28 | 28 | import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; |
29 | 29 | import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCode; |
30 | 30 | import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; |
| 31 | +import static org.apache.ignite.internal.util.ExceptionUtils.hasCause; |
| 32 | +import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR; |
31 | 33 | import static org.hamcrest.MatcherAssert.assertThat; |
32 | 34 | import static org.hamcrest.Matchers.greaterThan; |
33 | 35 | import static org.hamcrest.Matchers.hasSize; |
|
37 | 39 | import static org.junit.jupiter.api.Assertions.assertNotNull; |
38 | 40 | import static org.junit.jupiter.api.Assertions.assertTrue; |
39 | 41 | import static org.junit.jupiter.api.Assertions.fail; |
| 42 | +import static org.junit.jupiter.api.Assumptions.assumeTrue; |
40 | 43 |
|
41 | 44 | import java.time.Instant; |
42 | 45 | import java.time.ZoneId; |
|
51 | 54 | import java.util.stream.IntStream; |
52 | 55 | import org.apache.calcite.rel.core.JoinRelType; |
53 | 56 | import org.apache.ignite.Ignite; |
| 57 | +import org.apache.ignite.internal.app.IgniteImpl; |
54 | 58 | import org.apache.ignite.internal.catalog.commands.CatalogUtils; |
55 | 59 | import org.apache.ignite.internal.catalog.events.CatalogEvent; |
56 | 60 | import org.apache.ignite.internal.catalog.events.CreateTableEventParameters; |
| 61 | +import org.apache.ignite.internal.client.tx.ClientLazyTransaction; |
57 | 62 | import org.apache.ignite.internal.event.EventListener; |
58 | 63 | import org.apache.ignite.internal.sql.BaseSqlIntegrationTest; |
59 | 64 | import org.apache.ignite.internal.sql.ColumnMetadataImpl; |
60 | 65 | import org.apache.ignite.internal.sql.ColumnMetadataImpl.ColumnOriginImpl; |
61 | 66 | import org.apache.ignite.internal.sql.engine.QueryCancelledException; |
62 | 67 | import org.apache.ignite.internal.sql.engine.exec.fsm.QueryInfo; |
63 | 68 | import org.apache.ignite.internal.testframework.IgniteTestUtils; |
| 69 | +import org.apache.ignite.internal.tx.InternalTransaction; |
64 | 70 | import org.apache.ignite.internal.tx.TxManager; |
| 71 | +import org.apache.ignite.internal.tx.TxState; |
| 72 | +import org.apache.ignite.internal.tx.TxStateMeta; |
| 73 | +import org.apache.ignite.internal.tx.message.TxMessageGroup; |
65 | 74 | import org.apache.ignite.internal.util.CompletableFutures; |
66 | 75 | import org.apache.ignite.lang.CancelHandle; |
67 | 76 | import org.apache.ignite.lang.CancellationToken; |
|
83 | 92 | import org.apache.ignite.sql.Statement; |
84 | 93 | import org.apache.ignite.sql.Statement.StatementBuilder; |
85 | 94 | import org.apache.ignite.tx.Transaction; |
| 95 | +import org.apache.ignite.tx.TransactionException; |
86 | 96 | import org.apache.ignite.tx.TransactionOptions; |
| 97 | +import org.awaitility.Awaitility; |
87 | 98 | import org.hamcrest.Matcher; |
88 | 99 | import org.jetbrains.annotations.Nullable; |
89 | 100 | import org.junit.jupiter.api.AfterEach; |
@@ -740,6 +751,175 @@ public void runtimeErrorInQueryCausesTransactionToFail(String query) { |
740 | 751 | "Transaction is already finished due to an error"); |
741 | 752 | } |
742 | 753 |
|
| 754 | + @Test |
| 755 | + public void runtimeErrorReturnsSameTransactionErrorBeforeAndAfterRollbackCompletion() throws Exception { |
| 756 | + sql("CREATE TABLE tst(id INTEGER PRIMARY KEY, val INTEGER)"); |
| 757 | + |
| 758 | + IgniteSql sql = igniteSql(); |
| 759 | + |
| 760 | + Transaction tx = igniteTx().begin(); |
| 761 | + |
| 762 | + // Enlist enough operations to make rollback non-trivial. |
| 763 | + for (int i = 0; i < 100; i++) { |
| 764 | + execute(tx, sql, "INSERT INTO tst VALUES (?, ?)", i, i); |
| 765 | + } |
| 766 | + |
| 767 | + UUID txId = txId(tx); |
| 768 | + |
| 769 | + assertThrowsSqlException( |
| 770 | + Sql.RUNTIME_ERR, |
| 771 | + "Division by zero", |
| 772 | + () -> execute(tx, sql, "SELECT val / 0 FROM tst WHERE id = ?", 0) |
| 773 | + ); |
| 774 | + |
| 775 | + IgniteException[] immediateExceptions = new IgniteException[5]; |
| 776 | + for (int i = 0; i < immediateExceptions.length; i++) { |
| 777 | + immediateExceptions[i] = (IgniteException) assertThrowsWithCause( |
| 778 | + () -> executeForRead(sql, tx, "SELECT * FROM tst WHERE id = ?", 1), |
| 779 | + IgniteException.class |
| 780 | + ); |
| 781 | + } |
| 782 | + |
| 783 | + if (tx instanceof InternalTransaction) { |
| 784 | + assertNotNull(txId, "Expected transaction id for test transaction implementation"); |
| 785 | + |
| 786 | + Awaitility.await() |
| 787 | + .atMost(5, TimeUnit.SECONDS) |
| 788 | + .until(() -> { |
| 789 | + TxStateMeta meta = txManager().stateMeta(txId); |
| 790 | + |
| 791 | + return meta != null && TxState.isFinalState(meta.txState()); |
| 792 | + }); |
| 793 | + } |
| 794 | + |
| 795 | + IgniteException abortedStateException = (IgniteException) assertThrowsWithCause( |
| 796 | + () -> executeForRead(sql, tx, "SELECT * FROM tst WHERE id = ?", 1), |
| 797 | + IgniteException.class |
| 798 | + ); |
| 799 | + |
| 800 | + assertEquals(Transactions.TX_ALREADY_FINISHED_WITH_EXCEPTION_ERR, abortedStateException.code()); |
| 801 | + assertTrue(abortedStateException.getMessage().contains("Transaction is already finished due to an error")); |
| 802 | + |
| 803 | + for (IgniteException immediateException : immediateExceptions) { |
| 804 | + assertEquals(abortedStateException.code(), immediateException.code()); |
| 805 | + assertTrue(immediateException.getMessage().contains("Transaction is already finished due to an error")); |
| 806 | + } |
| 807 | + } |
| 808 | + |
| 809 | + @Test |
| 810 | + public void secondRequestDuringRollbackReturnsFinishedWithExceptionAndPreservesOriginalCause() { |
| 811 | + sql("CREATE TABLE tst(id INTEGER PRIMARY KEY, val INTEGER)"); |
| 812 | + sql("INSERT INTO tst VALUES (0, 1)"); |
| 813 | + |
| 814 | + IgniteSql sql = igniteSql(); |
| 815 | + |
| 816 | + Transaction tx = igniteTx().begin(); |
| 817 | + |
| 818 | + List<IgniteImpl> clusterNodes = CLUSTER.runningNodes() |
| 819 | + .map(node -> unwrapIgniteImpl(node)) |
| 820 | + .collect(toList()); |
| 821 | + |
| 822 | + CompletableFuture<Void> failingRequestStarted = new CompletableFuture<>(); |
| 823 | + CompletableFuture<Void> finishRequestBlocked = new CompletableFuture<>(); |
| 824 | + CompletableFuture<Void> releaseFinishRequest = new CompletableFuture<>(); |
| 825 | + |
| 826 | + for (IgniteImpl clusterNode : clusterNodes) { |
| 827 | + // Install predicates in cluster |
| 828 | + clusterNode.dropMessages((recipientConsistentId, msg) -> { |
| 829 | + if (!failingRequestStarted.isDone()) { |
| 830 | + return false; |
| 831 | + } |
| 832 | + |
| 833 | + if (msg.groupType() != TxMessageGroup.GROUP_TYPE |
| 834 | + || msg.messageType() != TxMessageGroup.TX_FINISH_REQUEST) { |
| 835 | + return false; |
| 836 | + } |
| 837 | + |
| 838 | + finishRequestBlocked.complete(null); |
| 839 | + |
| 840 | + return !releaseFinishRequest.isDone(); |
| 841 | + }); |
| 842 | + } |
| 843 | + |
| 844 | + try { |
| 845 | + CompletableFuture<IgniteException> failingRequestFut = IgniteTestUtils.runAsync(() -> { |
| 846 | + failingRequestStarted.complete(null); |
| 847 | + |
| 848 | + IgniteException ex = assertInstanceOf( |
| 849 | + IgniteException.class, |
| 850 | + assertThrowsWithCause( |
| 851 | + () -> execute(tx, sql, "SELECT val / 0 FROM tst WHERE id = ?", 0), |
| 852 | + IgniteException.class |
| 853 | + ) |
| 854 | + ); |
| 855 | + |
| 856 | + assertTrue(hasCause(ex, "Division by zero", Throwable.class)); |
| 857 | + assertTrue( |
| 858 | + ex.code() == Sql.RUNTIME_ERR || ex.code() == Transactions.TX_ALREADY_FINISHED_WITH_EXCEPTION_ERR, |
| 859 | + "Unexpected code for a request that triggers rollback [code=" + ex.code() + ']' |
| 860 | + ); |
| 861 | + |
| 862 | + return ex; |
| 863 | + }); |
| 864 | + |
| 865 | + Awaitility.await() |
| 866 | + .atMost(5, TimeUnit.SECONDS) |
| 867 | + .until(finishRequestBlocked::isDone); |
| 868 | + |
| 869 | + IgniteException parallelRequestException = assertInstanceOf( |
| 870 | + IgniteException.class, |
| 871 | + assertThrowsWithCause( |
| 872 | + () -> executeForRead(sql, tx, "SELECT * FROM tst WHERE id = ?", 0), |
| 873 | + IgniteException.class |
| 874 | + ) |
| 875 | + ); |
| 876 | + |
| 877 | + assertEquals(Transactions.TX_ALREADY_FINISHED_WITH_EXCEPTION_ERR, parallelRequestException.code()); |
| 878 | + assertTrue(parallelRequestException.getMessage().contains("Transaction is already finished due to an error")); |
| 879 | + assertTrue( |
| 880 | + hasCause(parallelRequestException, "Division by zero", Throwable.class), |
| 881 | + "Expected original rollback cause in user-visible exception chain" |
| 882 | + ); |
| 883 | + |
| 884 | + releaseFinishRequest.complete(null); |
| 885 | + |
| 886 | + IgniteException firstRequestException = await(failingRequestFut); |
| 887 | + |
| 888 | + assertTrue(hasCause(firstRequestException, "Division by zero", Throwable.class)); |
| 889 | + } finally { |
| 890 | + clusterNodes.forEach(IgniteImpl::stopDroppingMessages); |
| 891 | + } |
| 892 | + } |
| 893 | + |
| 894 | + @Test |
| 895 | + public void rollbackWithExceptionCauseIsPropagatedToSubsequentSqlRequest() { |
| 896 | + sql("CREATE TABLE tst(id INTEGER PRIMARY KEY, val INTEGER)"); |
| 897 | + sql("INSERT INTO tst VALUES (?, ?)", 1, 1); |
| 898 | + |
| 899 | + Transaction tx = igniteTx().begin(); |
| 900 | + |
| 901 | + assumeTrue(tx instanceof InternalTransaction, "InternalTransaction is required"); |
| 902 | + |
| 903 | + InternalTransaction internalTx = (InternalTransaction) tx; |
| 904 | + String rollbackCauseMessage = "rollback-cause-primary-replica-changed"; |
| 905 | + TransactionException rollbackCause = new TransactionException(REPLICA_MISS_ERR, rollbackCauseMessage); |
| 906 | + |
| 907 | + await(internalTx.rollbackWithExceptionAsync(rollbackCause)); |
| 908 | + |
| 909 | + IgniteException ex = assertInstanceOf( |
| 910 | + IgniteException.class, |
| 911 | + assertThrowsWithCause( |
| 912 | + () -> executeForRead(igniteSql(), tx, "SELECT * FROM tst WHERE id = ?", 1), |
| 913 | + IgniteException.class |
| 914 | + ) |
| 915 | + ); |
| 916 | + |
| 917 | + assertEquals(Transactions.TX_ALREADY_FINISHED_WITH_EXCEPTION_ERR, ex.code()); |
| 918 | + assertTrue(ex.getMessage().contains("Transaction is already finished due to an error")); |
| 919 | + assertTrue(hasCause(ex, TransactionException.class)); |
| 920 | + assertTrue(hasCause(ex, rollbackCauseMessage, Throwable.class), "Expected rollback cause message in user-visible exception chain"); |
| 921 | + } |
| 922 | + |
743 | 923 | @Test |
744 | 924 | public void testLockIsNotReleasedAfterTxRollback() { |
745 | 925 | IgniteSql sql = igniteSql(); |
@@ -1413,6 +1593,18 @@ protected ResultSet<SqlRow> executeForRead(IgniteSql sql, @Nullable Transaction |
1413 | 1593 |
|
1414 | 1594 | protected abstract ResultSet<SqlRow> executeForRead(IgniteSql sql, @Nullable Transaction tx, Statement statement, Object... args); |
1415 | 1595 |
|
| 1596 | + private static @Nullable UUID txId(Transaction tx) { |
| 1597 | + if (tx instanceof InternalTransaction) { |
| 1598 | + return ((InternalTransaction) tx).id(); |
| 1599 | + } |
| 1600 | + |
| 1601 | + if (tx instanceof ClientLazyTransaction) { |
| 1602 | + return ((ClientLazyTransaction) tx).startedTx().txId(); |
| 1603 | + } |
| 1604 | + |
| 1605 | + return null; |
| 1606 | + } |
| 1607 | + |
1416 | 1608 | protected void checkSqlError( |
1417 | 1609 | int code, |
1418 | 1610 | String msg, |
|
0 commit comments