Skip to content

Commit bab26d9

Browse files
authored
fix: base read-index failure threshold (#1249)
1 parent 5adff8a commit bab26d9

2 files changed

Lines changed: 72 additions & 6 deletions

File tree

jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1514,19 +1514,19 @@ private class ReadIndexHeartbeatResponseClosure extends RpcResponseClosureAdapte
15141514
final ReadIndexResponse.Builder respBuilder;
15151515
final RpcResponseClosure<ReadIndexResponse> closure;
15161516
final int quorum;
1517-
final int failPeersThreshold;
1517+
final int expectedFollowerResponses;
15181518
int ackSuccess;
15191519
int ackFailures;
15201520
boolean isDone;
15211521

15221522
public ReadIndexHeartbeatResponseClosure(final RpcResponseClosure<ReadIndexResponse> closure,
15231523
final ReadIndexResponse.Builder rb, final int quorum,
1524-
final int peersCount) {
1524+
final int expectedFollowerResponses) {
15251525
super();
15261526
this.closure = closure;
15271527
this.respBuilder = rb;
15281528
this.quorum = quorum;
1529-
this.failPeersThreshold = peersCount % 2 == 0 ? (quorum - 1) : quorum;
1529+
this.expectedFollowerResponses = expectedFollowerResponses;
15301530
this.ackSuccess = 0;
15311531
this.ackFailures = 0;
15321532
this.isDone = false;
@@ -1548,7 +1548,8 @@ public synchronized void run(final Status status) {
15481548
this.closure.setResponse(this.respBuilder.build());
15491549
this.closure.run(Status.OK());
15501550
this.isDone = true;
1551-
} else if (this.ackFailures >= this.failPeersThreshold) {
1551+
// The extra 1 is the leader's own vote, so only follower failures are counted here.
1552+
} else if (this.ackFailures > this.expectedFollowerResponses + 1 - this.quorum) {
15521553
this.respBuilder.setSuccess(false);
15531554
this.closure.setResponse(this.respBuilder.build());
15541555
this.closure.run(Status.OK());
@@ -1654,8 +1655,14 @@ private void readLeader(final ReadIndexRequest request, final ReadIndexResponse.
16541655
final List<PeerId> peers = this.conf.getConf().getPeers();
16551656
Requires.requireTrue(peers != null && !peers.isEmpty(), "Empty peers");
16561657
final List<PeerId> targetPeers = filterPeersForReadIndex(peers);
1658+
int expectedResponses = 0;
1659+
for (final PeerId peer : targetPeers) {
1660+
if (!peer.equals(this.serverId)) {
1661+
expectedResponses++;
1662+
}
1663+
}
16571664
final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure(closure,
1658-
respBuilder, quorum, peers.size());
1665+
respBuilder, quorum, expectedResponses);
16591666
// Send heartbeat requests to followers
16601667
for (final PeerId peer : targetPeers) {
16611668
if (peer.equals(this.serverId)) {
@@ -1691,7 +1698,6 @@ private List<PeerId> filterPeersForReadIndex(final List<PeerId> allPeers) {
16911698
if (peer.equals(this.serverId)) {
16921699
continue;
16931700
}
1694-
16951701
final ThreadId rid = this.replicatorGroup.getReplicator(peer);
16961702
if (rid == null) {
16971703
continue;

jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.alipay.sofa.jraft.core;
1818

1919
import java.io.File;
20+
import java.lang.reflect.Constructor;
2021
import java.nio.ByteBuffer;
2122
import java.util.ArrayList;
2223
import java.util.Arrays;
@@ -72,6 +73,10 @@
7273
import com.alipay.sofa.jraft.option.BootstrapOptions;
7374
import com.alipay.sofa.jraft.option.NodeOptions;
7475
import com.alipay.sofa.jraft.option.RaftOptions;
76+
import com.alipay.sofa.jraft.rpc.RpcRequests.AppendEntriesResponse;
77+
import com.alipay.sofa.jraft.rpc.RpcRequests.ReadIndexResponse;
78+
import com.alipay.sofa.jraft.rpc.RpcResponseClosure;
79+
import com.alipay.sofa.jraft.rpc.RpcResponseClosureAdapter;
7580
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
7681
import com.alipay.sofa.jraft.rpc.RpcServer;
7782
import com.alipay.sofa.jraft.storage.SnapshotThrottle;
@@ -1454,6 +1459,61 @@ public void run(final Status status, final long index, final byte[] reqCtx) {
14541459
cluster.stopAll();
14551460
}
14561461

1462+
@Test
1463+
public void testReadIndexHeartbeatResponseClosureFailsWhenFilteredRecipientsCannotReachQuorum() throws Exception {
1464+
final NodeImpl node = new NodeImpl();
1465+
try {
1466+
final Status[] callbackStatus = new Status[1];
1467+
final ReadIndexResponse[] callbackResponse = new ReadIndexResponse[1];
1468+
final RpcResponseClosure<ReadIndexResponse> done = new RpcResponseClosureAdapter<ReadIndexResponse>() {
1469+
1470+
@Override
1471+
public void run(final Status status) {
1472+
callbackStatus[0] = status;
1473+
callbackResponse[0] = getResponse();
1474+
}
1475+
};
1476+
1477+
final RpcResponseClosureAdapter<AppendEntriesResponse> heartbeatDone = newReadIndexHeartbeatResponseClosure(
1478+
node, done, ReadIndexResponse.newBuilder().setIndex(11), 3, 2);
1479+
1480+
heartbeatDone.setResponse(AppendEntriesResponse.newBuilder().setTerm(1).setSuccess(true).build());
1481+
heartbeatDone.run(Status.OK());
1482+
assertNull(callbackStatus[0]);
1483+
assertNull(callbackResponse[0]);
1484+
1485+
heartbeatDone.setResponse(AppendEntriesResponse.newBuilder().setTerm(1).setSuccess(false).build());
1486+
heartbeatDone.run(Status.OK());
1487+
assertNotNull(callbackStatus[0]);
1488+
assertTrue(callbackStatus[0].isOk());
1489+
assertNotNull(callbackResponse[0]);
1490+
assertFalse(callbackResponse[0].getSuccess());
1491+
assertEquals(11, callbackResponse[0].getIndex());
1492+
} finally {
1493+
assertEquals(0, NodeImpl.GLOBAL_NUM_NODES.decrementAndGet());
1494+
}
1495+
}
1496+
1497+
@SuppressWarnings("unchecked")
1498+
private RpcResponseClosureAdapter<AppendEntriesResponse> newReadIndexHeartbeatResponseClosure(final NodeImpl node,
1499+
final RpcResponseClosure<ReadIndexResponse> done,
1500+
final ReadIndexResponse.Builder respBuilder,
1501+
final int quorum,
1502+
final int expectedFollowerResponses)
1503+
throws Exception {
1504+
for (final Class<?> innerClass : NodeImpl.class.getDeclaredClasses()) {
1505+
if ("ReadIndexHeartbeatResponseClosure".equals(innerClass.getSimpleName())) {
1506+
final Constructor<?> constructor = innerClass.getDeclaredConstructor(NodeImpl.class,
1507+
RpcResponseClosure.class, ReadIndexResponse.Builder.class, int.class, int.class);
1508+
constructor.setAccessible(true);
1509+
return (RpcResponseClosureAdapter<AppendEntriesResponse>) constructor.newInstance(node, done,
1510+
respBuilder, quorum, expectedFollowerResponses);
1511+
}
1512+
}
1513+
fail("ReadIndexHeartbeatResponseClosure not found");
1514+
return null;
1515+
}
1516+
14571517
@Test
14581518
public void testReadIndexTimeout() throws Exception {
14591519
final List<PeerId> peers = TestUtils.generatePeers(3);

0 commit comments

Comments
 (0)