Skip to content

Commit 913a85a

Browse files
fix: catch uncaught exception in onApply to prevent apply pipeline stall (#1246)
* fix: catch uncaught exception in onApply to prevent apply pipeline stall * use setError instead of setErrorAndRollback per maintainer feedback * fix: rewrite flaky tests to avoid Thread.sleep and leader shutdown races --------- Co-authored-by: Qian-Cheng-nju <Qian-Cheng-nju@users.noreply.github.com>
1 parent bab26d9 commit 913a85a

4 files changed

Lines changed: 243 additions & 86 deletions

File tree

jraft-core/src/main/java/com/alipay/sofa/jraft/core/FSMCallerImpl.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,19 @@ private void doCommitted(final long committedIndex) {
559559
}
560560

561561
// Apply data task to user state machine
562-
doApplyTasks(iterImpl);
562+
try {
563+
doApplyTasks(iterImpl);
564+
} catch (final Throwable t) {
565+
// If the user state machine throws, we must not let the
566+
// exception propagate — that would skip setLastApplied()
567+
// and stall the apply pipeline permanently (the same
568+
// entries would be retried on every doCommitted() call,
569+
// hitting the same exception in an infinite loop).
570+
LOG.error("StateMachine threw when applying entries starting at index={}. "
571+
+ "Halting state machine to avoid stall.", iterImpl.getIndex(), t);
572+
iterImpl.setError(new Status(RaftError.ESTATEMACHINE, "StateMachine threw: %s", t.getMessage()));
573+
break;
574+
}
563575
}
564576

565577
if (iterImpl.hasError()) {

jraft-core/src/main/java/com/alipay/sofa/jraft/core/IteratorImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,19 @@ public void commitAndSnapshotSync(Closure done) {
167167
}
168168
}
169169

170+
/**
171+
* Set error without rolling back the index. Used when the state machine
172+
* throws an uncaught exception — we want to halt the node (ERROR state)
173+
* without assuming any rollback policy.
174+
*/
175+
public void setError(final Status st) {
176+
this.currEntry = null;
177+
getOrCreateError().setType(EnumOutter.ErrorType.ERROR_TYPE_STATE_MACHINE);
178+
getOrCreateError().getStatus().setError(RaftError.ESTATEMACHINE,
179+
"StateMachine meet critical error when applying one or more tasks since index=%d, %s", this.currentIndex,
180+
st != null ? st.toString() : "none");
181+
}
182+
170183
public void setErrorAndRollback(final long ntail, final Status st) {
171184
Requires.requireTrue(ntail > 0, "Invalid ntail=" + ntail);
172185
if (this.currEntry == null || this.currEntry.getType() != EnumOutter.EntryType.ENTRY_TYPE_DATA) {

jraft-core/src/test/java/com/alipay/sofa/jraft/core/ElectSelfPersistOrderTest.java

Lines changed: 40 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.alipay.sofa.jraft.core;
1818

1919
import java.io.File;
20-
import java.nio.ByteBuffer;
2120
import java.util.ArrayList;
2221
import java.util.List;
2322
import java.util.concurrent.CountDownLatch;
@@ -33,10 +32,8 @@
3332
import com.alipay.sofa.jraft.Iterator;
3433
import com.alipay.sofa.jraft.Node;
3534
import com.alipay.sofa.jraft.RaftGroupService;
36-
import com.alipay.sofa.jraft.Status;
3735
import com.alipay.sofa.jraft.conf.Configuration;
3836
import com.alipay.sofa.jraft.entity.PeerId;
39-
import com.alipay.sofa.jraft.entity.Task;
4037
import com.alipay.sofa.jraft.option.NodeOptions;
4138
import com.alipay.sofa.jraft.option.RaftMetaStorageOptions;
4239
import com.alipay.sofa.jraft.option.RaftOptions;
@@ -86,16 +83,17 @@ public void teardown() throws Exception {
8683

8784
/**
8885
* Verify that when setTermAndVotedFor() fails during electSelf(), the node
89-
* steps down and does NOT send RequestVote RPCs to peers.
86+
* steps down and does NOT become leader.
9087
*
9188
* Approach:
9289
* 1. Start a 3-node cluster with a FailingMetaStorage on node 0
93-
* 2. Wait for leader election to complete normally
90+
* 2. Wait for leader election to complete normally (node 0 must not be leader)
9491
* 3. Enable the failure flag on node 0's meta storage
95-
* 4. Stop the current leader to force new elections
96-
* 5. Node 0 will attempt electSelf(), fail on setTermAndVotedFor(), step down
97-
* 6. Verify the observer node's term did NOT advance due to node 0's failed election
98-
* (proving no RequestVote RPC was sent before the persistence check)
92+
* 4. Directly trigger electSelf() on node 0 via tryElectSelf()
93+
* 5. Verify: persist failure detected, node 0 did NOT become leader
94+
*
95+
* Using tryElectSelf() directly avoids the need to kill the leader (whose
96+
* graceful shutdown sends TimeoutNow, independently bumping follower terms).
9997
*/
10098
@Test
10199
public void testPersistFailurePreventsRpcSend() throws Exception {
@@ -113,9 +111,6 @@ public void testPersistFailurePreventsRpcSend() throws Exception {
113111
for (int i = 0; i < peers.size(); i++) {
114112
final PeerId peer = peers.get(i);
115113
final NodeOptions nodeOptions = new NodeOptions();
116-
// node0 gets a short election timeout so it fires first after leader death.
117-
// Other nodes get a long timeout so they won't independently start elections
118-
// during the test window — this lets us assert term equality on the observer.
119114
nodeOptions.setElectionTimeoutMs(i == 0 ? 300 : 10_000);
120115
nodeOptions.setSnapshotIntervalSecs(300);
121116
nodeOptions.setInitialConf(conf);
@@ -149,91 +144,37 @@ public void onApply(final Iterator iter) {
149144

150145
try {
151146
// Wait for initial leader election (fail flag is off)
152-
Thread.sleep(3000);
153-
154-
Node leader = null;
155-
for (final Node n : nodes) {
156-
if (n.getLeaderId() != null && !n.getLeaderId().isEmpty()) {
157-
if (n.getNodeId().getPeerId().equals(n.getLeaderId())) {
158-
leader = n;
159-
}
160-
}
161-
}
147+
Node leader = waitForLeader(nodes, null, 30_000);
162148
assertNotNull("Should have a leader", leader);
163149

164150
final Node node0 = nodes.get(0);
165151

166152
// If node0 is the leader, transfer leadership away
167153
if (leader == node0) {
168154
leader.transferLeadershipTo(peers.get(1));
169-
Thread.sleep(2000);
170-
leader = null;
171-
for (final Node n : nodes) {
172-
if (n.getLeaderId() != null && !n.getLeaderId().isEmpty()) {
173-
if (n.getNodeId().getPeerId().equals(n.getLeaderId())) {
174-
leader = n;
175-
}
176-
}
177-
}
178-
assumeTrue("Could not transfer leadership away from node0", leader != node0);
179-
}
180-
181-
assertNotNull("Should still have a leader", leader);
182-
final PeerId leaderPeer = leader.getLeaderId();
183-
final int leaderIdx = peers.indexOf(leaderPeer);
184-
185-
// Find the observer (not node0, not leader)
186-
Node observer = null;
187-
int observerIdx = -1;
188-
for (int i = 0; i < nodes.size(); i++) {
189-
if (i != 0 && i != leaderIdx) {
190-
observer = nodes.get(i);
191-
observerIdx = i;
192-
break;
193-
}
155+
leader = waitForLeader(nodes, node0, 30_000);
156+
assumeTrue("Could not transfer leadership away from node0", leader != null && leader != node0);
194157
}
195-
assertNotNull("Should have an observer", observer);
196-
197-
final long observerTermBefore = ((NodeImpl) observer).getCurrentTerm();
198158

199159
// Enable fail flag: next setTermAndVotedFor on node0 returns false
200160
failFlag.set(true);
201161

202-
// Stop leader to trigger elections
203-
services.get(leaderIdx).shutdown();
204-
services.get(leaderIdx).join();
205-
206-
// Wait for node0 to attempt election and hit the persist failure
207-
boolean failed = failLatch.await(10, TimeUnit.SECONDS);
208-
assertTrue("Node0 should have attempted election", failed);
209-
210-
final long failTerm = failedAtTerm.get();
211-
212-
// Brief pause for any in-flight messages
213-
Thread.sleep(500);
214-
215-
final long observerTermAfter = ((NodeImpl) observer).getCurrentTerm();
216-
217-
// With the fix: setTermAndVotedFor is called BEFORE the RPC loop.
218-
// When it returns false, electSelf() steps down and returns without
219-
// sending any RequestVote RPCs. Therefore the observer's term should
220-
// NOT have been bumped by node0's failed election attempt.
221-
//
222-
// Without the fix: RPCs were sent BEFORE setTermAndVotedFor, so the
223-
// observer would have already received the RequestVote and bumped its
224-
// term even though node0's persistence failed.
225-
//
226-
// The observer uses a long election timeout (10s) so it won't
227-
// independently start its own election during this short test window.
228-
System.out.println("Observer term before=" + observerTermBefore + ", after=" + observerTermAfter
229-
+ ", node0 failed at term=" + failTerm);
230-
231-
// Observer's term must not change — proves no RequestVote RPC was sent.
232-
assertEquals("Observer term should not advance (no RequestVote RPC sent)", observerTermBefore,
233-
observerTermAfter);
234-
235-
// With the fix, node0's persist failure means it steps down and
236-
// does NOT send RequestVote RPCs. Verify node0 is no longer leader.
162+
// Directly trigger electSelf() on node0. With the fix, electSelf()
163+
// calls setTermAndVotedFor() BEFORE sending RequestVote RPCs. When
164+
// it returns false, electSelf() steps down without sending any RPCs.
165+
// Without the fix, RPCs would be sent first and node0 could become
166+
// leader despite the persistence failure (since quorum = 2 and the
167+
// observer would grant the vote).
168+
((NodeImpl) node0).tryElectSelf();
169+
170+
// Persist failure should have been detected
171+
assertTrue("Node0 should have hit persist failure", failLatch.await(5, TimeUnit.SECONDS));
172+
assertTrue("Failed term should be positive", failedAtTerm.get() > 0);
173+
174+
System.out.println("Node0 persist failed at term=" + failedAtTerm.get());
175+
176+
// Node0 must NOT be leader — proves electSelf() did not proceed
177+
// past the failed setTermAndVotedFor() to send RPCs and win votes.
237178
assertFalse("Node0 should not be leader after persist failure", ((NodeImpl) node0).isLeader());
238179

239180
} finally {
@@ -247,6 +188,20 @@ public void onApply(final Iterator iter) {
247188
}
248189
}
249190

191+
private static Node waitForLeader(List<Node> nodes, Node excluded, long timeoutMs) throws InterruptedException {
192+
final long deadline = System.currentTimeMillis() + timeoutMs;
193+
while (System.currentTimeMillis() < deadline) {
194+
for (final Node n : nodes) {
195+
if (n != excluded && n.getLeaderId() != null && !n.getLeaderId().isEmpty()
196+
&& n.getNodeId().getPeerId().equals(n.getLeaderId())) {
197+
return n;
198+
}
199+
}
200+
Thread.sleep(100);
201+
}
202+
return null;
203+
}
204+
250205
/**
251206
* A RaftMetaStorage wrapper that returns false from setTermAndVotedFor()
252207
* when the fail flag is set, simulating a disk I/O error during election.

0 commit comments

Comments
 (0)