1 引言
在 Java 生态中,JRaft 无疑是一个值得关注的生产级 Raft 算法实现。JRaft 并非 Raft 协议的简单翻译,它在保证协议正确性的基础上,提供了丰富的功能增强和性能优化。接下来的几篇文章我们将探讨 JRaft 如何实现 Raft 协议的核心机制,并重点解析其为解决实际生产环境问题而设计的独特特性与优化。
2 Idea 启动配置
在 JRaft 的 Example 中,有一个计数 Counter 的例子,需要启动三个 Raft 节点构成集群,可以在 Idea 进行如下配置快速启动:
这样就可以通过 Compound 一键启动三个 Raft 节点。
3 预投票与优先级选举
预投票机制是一种对标准 Raft 选举的优化,主要用于解决网络分区或节点故障导致的无效 term 增长和集群稳定性破坏问题。
优先级选举是在标准 Raft 基础上引入的扩展机制。核心点在于并非所有节点都有平等的资格被选为 Leader。系统可以为节点配置一个优先级,优先级高的节点在选举中更具优势,更容易成为 Leader。通过优先级选举机制,将标准 Raft 的“民主”选举变成了一种“民主集中制”,在保留 Raft 核心正确性的前提下,赋予了系统运维人员引导和优化选举结果的能力。
3.1 捣乱节点干扰集群
当某个节点因为网络分区或者临时故障与集群其他节点失联时,它可能因为选举超时不断自增 term 值并发起选举。当其重新加入集群时,携带的高 term 会迫使当前 Leader 退位(Step down),从而触发不必要的重新选举,即使原来的 Leader 仍然正常运行。
举个例子。
假设一个三节点的 Raft 集群,此时节点 A 是 term=2 的 Leader,而节点 B 和 C 是 Follower,并且节点 B 和 C 正常接收 Leader 的心跳。
在某个时刻,节点 B 与集群的网络突然断开,但是节点 A 和节点 C 之间通信正常。
一段时间后,节点 B 在选举超时时间内没有收到 Leader 的心跳,那么节点 B 就会发起选举,自增 term,由于节点 B 出现网络分区,选举请求无法送达其他节点,它只能获得 1 票(来自于自己),所以选举失败。
如果网络分区的情况持续下去,那么节点 B 又会再次超时,然后发起新一轮选举,从而继续自增 term 值。
这样持续很长一段时间后,在某个时刻,节点 B 网络恢复,携带 term=100 重新加入集群,此时节点 A 在收到节点 B 的某个响应之后(AppendEntries 或者 Heartbeat RPC),会发现对方的 term 值更高(节点 B 的 term 值更高),所以节点 A 立即退位,转为 Follower 状态,此时整个 Raft 集群进入无 Leader 状态。
3.2 预投票机制
预投票机制是指节点在发起正式选举投票之前,需要先发生预投票请求,该请求不会增加 term 值,仅仅尝试将 term 加一去试探性的征集选票,只有当该请求获得多数派认可之后,才会真正自增 term 并发起正式选举。
而如果预投票未通过,节点则会放弃竞选,从而避免无效的 term 抬升。
3.3 优先级选举
优先级选举的机制相当于在标准 Raft 选举过程中增加了一个“权重”因子,让集群在多个符合条件的 Candidate 中,更倾向于选择那个“更好”的节点作为 Leader。
引入优先级选举的原因在于解决标准 Raft 在生产环境中遇到的一些关于稳定性、性能和运维效率的实际问题。
标准 Raft 是一个“民主”算法,只要日志够新,任何节点都有均等的机会成为 Leader。这理论上很公平,但在实践中可能并非最优:
1)节点异构性:在一个真实的分布式集群中,节点的硬件配置(CPU、内存、磁盘 IO、网络带宽)可能并不完全相同。我们可能希望让性能更强、网络条件更好的节点来承担 Leader 这个重任,因为它要处理所有客户端请求、复制日志,负载最重。让一个性能差的节点成为 Leader 会成为整个系统的瓶颈。
2)避免“劣质” Leader:在网络出现短暂分区或节点高负载时,某个节点的日志可能足够新从而被选为 Leader,但它的网络连接可能很不稳定或者负载极高。这会导致 Leader 频繁切换,整个集群持续处于选举和不安定状态,服务可用性差。
3)支持纯 Follower 角色:通过将优先级设置为 -1,可以创建永远不会参与选举的节点。这种节点可以作为只读副本,无限水平扩展系统的读能力,而不用担心它会被意外选为 Leader 影响写性能。同时,它也可以用于一些特殊的运维或备份场景。
在 JRaft 中,节点的优先级有三种情况:
- Disabled(-1):如果节点的选举优先级 ≤ -1 表示该节点采用标准的 Raft 选举机制,不考虑优先级因素,所有节点在选举中具有相同的机会成为 Leader。
- NotElected(0):优先级 0 是一个特殊值,表示该节点永远不会参与选举,设置为此优先级的节点将不会发起选举,也不会成为 Leader 候选者,但是它们仍然会接受和应用日志条目,保持与 Leader 的数据同步。
- MinValue(1):优先级 1 是参与优先级选举的最小值,所有参与优先级选举的节点,其优先级值必须 ≥ 1,同时优先级值越大,在选举中成为 Leader 的机会就越大。
默认情况下,节点的选举优先级是 -1,表示采用标准的 Raft 选举机制。
那这种优先级选举是怎么实现的?核心的逻辑在 com.alipay.sofa.jraft.core.NodeImpl#allowLaunchElection 中
private boolean allowLaunchElection() {
// 优先级为0是特殊值,表示该节点永远不参与选举
if (this.serverId.isPriorityNotElected()) { // priority == 0
LOG.warn("Node {} will never participate in election, because it's priority={}.", getNodeId(),
this.serverId.getPriority());
return false;
}
// 如果该节点禁用了优先级选举,则总是可以发起选举
if (this.serverId.isPriorityDisabled()) { // priority <= -1
return true;
}
// 如果当前节点的优先级小于目标优先级,则不发起选举,等待下一个选举超时
if (this.serverId.getPriority() < this.targetPriority) {
this.electionTimeoutCounter++;
// 如果下一轮选举超时期间仍未选出Leader,则按指数衰减本地 targetPriority
if (this.electionTimeoutCounter > 1) {
decayTargetPriority();
this.electionTimeoutCounter = 0;
}
// 第一次等待,返回false,不发起选举
if (this.electionTimeoutCounter == 1) {
LOG.debug("Node {} does not initiate leader election and waits for the next election timeout.", getNodeId());
return false;
}
}
// 只有当节点优先级大于等于目标优先级时,才允许发起选举
return this.serverId.getPriority() >= this.targetPriority;
}
这里的 targetPriority 是整个 Raft 集群所有节点的 priority 的最大值,所以,当 Raft 集群刚启动的时候,如果 Leader 选举成功,那么必然是优先级最大的那个节点(如果所有节点都参与优先级选举)。
同时为了防止高优先级节点故障宕机或不可用,导致剩余节点因期望过高而无法选出新的 Leader,就引入了 “decayTargetPriority()” 目标优先级衰减降级函数,如果在上一轮由随机超时时间触发的选举周期内没有投票选出 Leader 角色,那么 Raft 集群中其他各个节点会对本地全局变量 targetPriority 的值按照每次减少 20% 进行衰减,直至衰减值优先级的最小值 “1”。
private void decayTargetPriority() {
// 默认衰减间隔值应该大于10
final int decayPriorityGap = Math.max(this.options.getDecayPriorityGap(), 10);
// 实际衰减值为配置值和目标优先级1/5的较大值,确保衰减效果明显
final int gap = Math.max(decayPriorityGap, (this.targetPriority / 5));
final int prevTargetPriority = this.targetPriority;
// 衰减目标优先级,但不能低于最小优先级值
this.targetPriority = Math.max(ElectionPriority.MinValue, (this.targetPriority - gap));
LOG.info("Node {} priority decay, from: {}, to: {}.", getNodeId(), prevTargetPriority, this.targetPriority);
}
当其他节点在对自身维护的本地全局变量 targetPriority 进行衰减后,如果节点自身的 priority 值 ≥ targetPriority 值,则该节点能够参与到由随机超时时间触发的下一轮 Leader 选举流程中。在一般的情况下,次优先级值的节点能够抢占到下一轮 Leader 选举的机会(需要配置好节点的 priority 值)。
4 核心源码
4.1 预选举
在一个 Raft 节点初始化时(com.alipay.sofa.jraft.core.NodeImpl#init),如果集群节点配置不为空,则会调用 stepDown 进行一次退位。
if (!this.conf.isEmpty()) {
// 进行一次退位
stepDown(this.currTerm, false, new Status());
}
这个 stepDown 方法在很多情况下都会被调用,而在 Raft 节点启动的过程中,最重要的逻辑就是启动选举超时计时器 electionTimer。
初始的时候,所有的 Raft 节点都是 Follower,在退位的过程中,如果节点不是 Learner 节点,就会启动选举超时的定时器。
if (!isLearner()) {
LOG.info("Node {} start electionTimer, term={}.", getNodeId(), this.currTerm);
// 启动选举超时计时器
this.electionTimer.restart();
} else {
LOG.info("Node {} is a learner, election timer is not started.", this.nodeId);
}
在 JRaft 中,Learner 节点也叫只读节点,其类似于 Follower 节点,它可以从 Leader 复制日志并应用到本地状态机中,但是不参与选举,不会被认为是多数派的一员。除了复制日志以外,只读成员不参与其他任何 Raft 算法过程。一般应用在为某个服务创建一个只读服务时,实现类似读写分离的效果,或者数据冷备等场景。
这个 electionTimer 是 JRaft 内部基于 Netty 时间轮封装的一个带有生命周期的定时器,当超时时间到达,就会回调 onTrigger 方法,同时为了避免多个 Follower 同时发起预选举请求,electionTimer 重写了 adjustTimeout 实现在 timeoutMs 的基础上随机化一小段超时时间(0 ~ 1s)。
this.electionTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(),
TIMER_FACTORY.getElectionTimer(this.options.isSharedElectionTimer(), name)) {
@Override
protected void onTrigger() {
handleElectionTimeout();
}
@Override
protected int adjustTimeout(final int timeoutMs) {
return randomTimeout(timeoutMs);
}
};
所以,当 Follower 在一段时间还没有收到 Leader 的心跳,那么就会触发选举超时,handleElectionTimeout 方法由于处理选举超时:
/**
* 处理选举超时事件。
* 当节点作为Follower时,如果在选举超时时间内没有收到Leader的心跳(AppendEntries RPC),就会触发此方法来发起新的选举。
*/
private void handleElectionTimeout() {
// 只有Follower才能发起预选举
if (this.state != State.STATE_FOLLOWER) {
return;
}
// 如果当前Leader仍然有效,则不需要发起选举
// 通过比较最后一次收到Leader心跳的时间与当前时间的差值是否小于选举超时时间来判断
if (isCurrentLeaderValid()) {
return;
}
boolean doUnlock = true;
this.writeLock.lock();
try {
// 在锁内再次检查状态,防止并发情况下状态已发生变化
if (this.state != State.STATE_FOLLOWER) {
return;
}
if (isCurrentLeaderValid()) {
return;
}
// 重置LeaderID为空,表示与Leader失去连接
resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "Lost connection from leader %s.",
this.leaderId));
// 基于节点优先级机制判断是否允许发起选举
if (!allowLaunchElection()) {
return;
}
doUnlock = false;
// 预投票
preVote();
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
其中 resetLeaderId 会清空本地记录的 LeaderId,如果当前节点不是 Leader 并且正在跟随某个 Leader,则回调 FSMCaller#onStopFollowing 方法通知状态机停止跟随当前 Leader。
/**
* 重置LeaderID,并通知状态机Leader变更事件。
* 这是Raft协议中重要的状态维护操作,确保节点对Leader的认知保持正确。
*
* @param newLeaderId 新的Leader节点ID,如果为空则表示清除当前Leader
* @param status 状态变更的原因和详细信息
*/
private void resetLeaderId(final PeerId newLeaderId, final Status status) {
if (newLeaderId.isEmpty()) {
// 失去Leader或者开始选举
if (!this.leaderId.isEmpty() && this.state.compareTo(State.STATE_TRANSFERRING) > 0) {
// 当前节点正在跟随某个Leader && 当前节点不是Leader
// 通知状态机停止跟随当前Leader,LeaderChangeContext包含变更前Leader的信息:LeaderID、term、变更原因
this.fsmCaller.onStopFollowing(new LeaderChangeContext(this.leaderId.copy(), this.currTerm, status));
}
this.leaderId = PeerId.emptyPeer();
} else {
// 跟随新的Leader && 之前没有Leader
if (this.leaderId == null || this.leaderId.isEmpty()) {
// 通知状态机开始跟随新Leader,LeaderChangeContext包含新Leader的信息:LeaderID、term、变更原因
this.fsmCaller.onStartFollowing(new LeaderChangeContext(newLeaderId, this.currTerm, status));
}
this.leaderId = newLeaderId.copy();
}
}
这里的 FSMCaller 是一个有限状态机调用器,它主要负责将 Raft 协议层的事件通过 Disruptor 异步通知到用户需要实现的状态机(StateMachine),这样就可以让业务层感知 Leader 变更、日志提交等关键事件,同时将 Raft 协议层与业务状态机解耦,避免业务阻塞 Raft 核心流程。
如果当前节点能够发起预选举,则会调用 preVote 方法发起预选举。
/**
* 发起预投票阶段。
* 预投票是SOFA-JRaft的优化机制,在正式选举前先进行一轮投票,可以有效避免网络分区等异常情况下的频繁无效选举。
* 预投票不会增加实际的term,只是探测是否有机会成为Leader
*/
private void preVote() {
long oldTerm;
try {
LOG.info("Node {} term {} start preVote.", getNodeId(), this.currTerm);
// 检查是否正在安装快照,此时配置可能过期,不适合发起预投票
if (this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) {
LOG.warn(
"Node {} term {} doesn't do preVote when installing snapshot as the configuration may be out of date.",
getNodeId(), this.currTerm);
return;
}
// 检查节点是否在当前配置中,只有配置中的节点才能发起选举
if (!this.conf.contains(this.serverId)) {
LOG.warn("Node {} can't do preVote as it is not in conf <{}>.", getNodeId(), this.conf);
return;
}
oldTerm = this.currTerm;
} finally {
// 释放锁以获取 lastLogId,lastLogId 可能需要从磁盘中获取,是一个阻塞操作,所以释放锁避免长时间占有锁影响其他流程
this.writeLock.unlock();
}
final LogId lastLogId = this.logManager.getLastLogId(true);
boolean doUnlock = true;
this.writeLock.lock();
try {
// 预投票需要防范ABA问题:在释放锁期间term可能已经变化
if (oldTerm != this.currTerm) {
LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);
return;
}
// 初始化预投票上下文
this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
for (final PeerId peer : this.conf.listPeers()) {
if (peer.equals(this.serverId)) {
continue; // 跳过自己
}
// 检查与目标节点的连接
if (!this.rpcService.connect(peer.getEndpoint())) {
LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
continue;
}
// 构建预投票请求
final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
done.request = RequestVoteRequest.newBuilder()
.setPreVote(true) // 标记为预投票请求
.setGroupId(this.groupId)
.setServerId(this.serverId.toString())
.setPeerId(peer.toString())
.setTerm(this.currTerm + 1) // 使用下一个term进行预投票,不会真正自增term值
.setLastLogIndex(lastLogId.getIndex())
.setLastLogTerm(lastLogId.getTerm())
.build();
// 发起预投票RPC请求,请求是异步的,响应在OnPreVoteRpcDone回调中处理
this.rpcService.preVote(peer.getEndpoint(), done.request, done);
}
// 给自己投预投票
this.prevVoteCtx.grant(this.serverId);
// 如果预投票获得多数派认可,则发起正式选举
if (this.prevVoteCtx.isGranted()) {
doUnlock = false;
electSelf(); // 正式选举
}
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
整体流程比较直观,就是进行一些 check,然后向其他所有节点发起 preVote 请求,这里需要注意的是 JRaft 的编程范式,在 JRaft 中,大量使用了异步 Callback的回调机制。
比如上面的 preVote RPC 是异步的,对端节点的响应会在 OnPreVoteRpcDone#run 中进行处理。
在 RPC 的调用中,如果你想要看请求是如何处理的,不妨可以全局搜索一下 registerProcessor,一般来说,在应用启动的时候会注册一些业务处理器,处理请求的逻辑就在这些处理器里面,JRaft、Seata、RocketMQ 都是如此。
那么对于 preVote RPC 的处理逻辑就在 com.alipay.sofa.jraft.rpc.impl.core.RequestVoteRequestProcessor。
public Message processRequest0(final RaftServerService service, final RequestVoteRequest request,
final RpcRequestClosure done) {
if (request.getPreVote()) {
return service.handlePreVoteRequest(request); // 处理预投票
} else {
return service.handleRequestVoteRequest(request); // 处理正式投票
}
}
我们看先处理预投票的逻辑:
public Message handlePreVoteRequest(final RequestVoteRequest request) {
boolean doUnlock = true;
this.writeLock.lock();
try {
// 检测节点状态,必须是活节点
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
return RpcFactoryHelper
.responseFactory()
.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
"Node %s is not in active state, state %s.", getNodeId(), this.state.name());
}
// 解析候选人Id
final PeerId candidateId = new PeerId();
if (!candidateId.parse(request.getServerId())) {
LOG.warn("Node {} received PreVoteRequest from {} serverId bad format.", getNodeId(),
request.getServerId());
return RpcFactoryHelper
.responseFactory()
.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
"Parse candidateId failed: %s.", request.getServerId());
}
boolean granted = false;
// 使用do-while(false)模式进行多层条件判断,便于使用break跳出
do {
// 候选人必须在当前配置中
if (!this.conf.contains(candidateId)) {
LOG.warn("Node {} ignore PreVoteRequest from {} as it is not in conf <{}>.", getNodeId(),
request.getServerId(), this.conf);
break;
}
// 如果当前有有效的Leader,则拒绝预投票
// 通过比较当前节点最后一次收到Leader心跳的时间与当前时间的差值是否小于选举超时时间来判断
if (this.leaderId != null && !this.leaderId.isEmpty() && isCurrentLeaderValid()) {
LOG.info(
"Node {} ignore PreVoteRequest from {}, term={}, currTerm={}, because the leader {}'s lease is still valid.",
getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, this.leaderId);
break;
}
// 请求term不能低于当前term
if (request.getTerm() < this.currTerm) {
LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
request.getServerId(), request.getTerm(), this.currTerm);
// 如果当前节点是Leader,但是仍有节点发起预选举,说明当前节点与目标节点之间的复制关系存在问题,需要重新建立复制关系
checkReplicator(candidateId);
break;
}
// 如果当前节点是Leader,但是仍有节点发起预选举,说明当前节点与目标节点之间的复制关系存在问题,需要重新建立复制关系
checkReplicator(candidateId);
doUnlock = false;
this.writeLock.unlock(); // 解锁以获取lastLogId
final LogId lastLogId = this.logManager.getLastLogId(true);
doUnlock = true;
this.writeLock.lock();
// 候选人的日志必须足够新(term大的更新,term相同index大的更新)
final LogId requestLastLogId = new LogId(request.getLastLogIndex(), request.getLastLogTerm());
granted = requestLastLogId.compareTo(lastLogId) >= 0;
LOG.info(
"Node {} received PreVoteRequest from {}, term={}, currTerm={}, granted={}, requestLastLogId={}, lastLogId={}.",
getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, granted, requestLastLogId,
lastLogId);
} while (false);
// 构建预投票响应,响应的term是当前节点的term,不是请求的term
return RequestVoteResponse.newBuilder()
.setTerm(this.currTerm)
.setGranted(granted)
.build();
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
这里的 do-while 其实并没有循环起来,这样做的目的是可以更好地实现多个条件的判断,granted 初始是 false 的,一旦有校验没通过,则会 break 出循环,那么自然 granted 还是 false,就不会对节点进行预投票。
发起预选举节点收到其他节点的响应之后,就会在 OnPreVoteRpcDone 回调中进行处理。
/**
* 处理预投票响应。当节点发起预投票后,会收到其他节点的预投票响应
*/
public void handlePreVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) {
boolean doUnlock = true;
this.writeLock.lock();
try {
// 节点可能已经预选举成功,进入Candidate状态,忽略该响应
if (this.state != State.STATE_FOLLOWER) {
LOG.warn("Node {} received invalid PreVoteResponse from {}, state not in STATE_FOLLOWER but {}.",
getNodeId(), peerId, this.state);
return;
}
// 响应的term必须与发起预投票时的term一致
if (term != this.currTerm) {
LOG.warn("Node {} received invalid PreVoteResponse from {}, term={}, currTerm={}.", getNodeId(),
peerId, term, this.currTerm);
return;
}
// 如果响应term更高,需要退位
if (response.getTerm() > this.currTerm) {
LOG.warn("Node {} received invalid PreVoteResponse from {}, term {}, expect={}.", getNodeId(), peerId,
response.getTerm(), this.currTerm);
stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
"Raft node receives higher term pre_vote_response."));
return;
}
LOG.info("Node {} received PreVoteResponse from {}, term={}, granted={}.", getNodeId(), peerId,
response.getTerm(), response.getGranted());
// 统计预投票结果
if (response.getGranted()) {
// 记录该节点的预投票支持
this.prevVoteCtx.grant(peerId);
// 检查是否已获得过半数预投票支持
if (this.prevVoteCtx.isGranted()) {
doUnlock = false;
electSelf(); // 预投票通过,发起正式选举
}
}
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
如果响应的 term 更高,那么当前节点需要退位,此时节点状态依然是 Follower,除了重置本地状态、再次启动 electionTimer 定时器之外,一个更重要的操作是更新当前节点的 term 值,保证向它目前视野中的最大 term 值看齐。
4.2 正式选举
当预选举获取多数派认可之后,节点就会进入正式选举流程 electSelf。
private void electSelf() {
long oldTerm;
try {
LOG.info("Node {} start vote and grant vote self, term={}.", getNodeId(), this.currTerm);
// 非法节点不参与选举
if (!this.conf.contains(this.serverId)) {
LOG.warn("Node {} can't do electSelf as it is not in {}.", getNodeId(), this.conf);
return;
}
// 开始选举了,停止选举超时定时器,避免选举期间再次触发预选举
if (this.state == State.STATE_FOLLOWER) {
LOG.debug("Node {} stop election timer, term={}.", getNodeId(), this.currTerm);
this.electionTimer.stop();
}
// 清除当前节点的Leader,表示开始新的选举
resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT,
"A follower's leader_id is reset to NULL as it begins to request_vote."));
// 开始选举之后,转换为候选人
this.state = State.STATE_CANDIDATE;
// 正式选举阶段,自增term值
this.currTerm++;
// 给自己投票,设置votedId
this.votedId = this.serverId.copy();
LOG.debug("Node {} start vote timer, term={} .", getNodeId(), this.currTerm);
// 启动投票定时器,防止选举无限期等待
// 如果正式选举超时,那么根据配置有两种选择:1)退位为Follower,然后重新发起预选举;2)Candidate重新发起正式选举
this.voteTimer.start();
// 初始化投票上下文
this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
oldTerm = this.currTerm;
} finally {
// 释放锁,因为要获取lastLogId了(耗时操作)
this.writeLock.unlock();
}
final LogId lastLogId = this.logManager.getLastLogId(true);
this.writeLock.lock();
try {
if (oldTerm != this.currTerm) {
LOG.warn("Node {} raise term {} when getLastLogId.", getNodeId(), this.currTerm);
return;
}
// 向所有其他节点发送正式投票请求
for (final PeerId peer : this.conf.listPeers()) {
if (peer.equals(this.serverId)) {
continue; // 跳过自己
}
if (!this.rpcService.connect(peer.getEndpoint())) {
LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
continue;
}
// 构建正式投票请求
final OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, this.currTerm, this);
done.request = RequestVoteRequest.newBuilder()
.setPreVote(false) // 不是预投票,而是正式投票
.setGroupId(this.groupId)
.setServerId(this.serverId.toString())
.setPeerId(peer.toString())
.setTerm(this.currTerm) // 使用当前term(已增加)
.setLastLogIndex(lastLogId.getIndex())
.setLastLogTerm(lastLogId.getTerm())
.build();
// 发起正式投票RPC请求,请求是异步的,响应会通过OnRequestVoteRpcDone回调处理
this.rpcService.requestVote(peer.getEndpoint(), done.request, done);
}
// 更新本地元数据信息
this.metaStorage.setTermAndVotedFor(this.currTerm, this.serverId);
// 给自己投票
this.voteCtx.grant(this.serverId);
// 检查是否获得多数派支持
if (this.voteCtx.isGranted()) {
becomeLeader(); // 获得多数派认可,成为Leader
}
} finally {
this.writeLock.unlock();
}
}
正式选举和预选举流程大致类似,只是在正式选举阶段会真正到自增 term 值。
对于 requestVote RPC 的处理逻辑也是在 com.alipay.sofa.jraft.rpc.impl.core.RequestVoteRequestProcessor。
/**
* 处理来自候选人的投票请求
*/
public Message handleRequestVoteRequest(final RequestVoteRequest request) {
boolean doUnlock = true;
this.writeLock.lock();
try {
// 检测节点状态,必须是活节点
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
return RpcFactoryHelper
.responseFactory()
.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
"Node %s is not in active state, state %s.", getNodeId(), this.state.name());
}
// 解析候选人Id
final PeerId candidateId = new PeerId();
if (!candidateId.parse(request.getServerId())) {
LOG.warn("Node {} received RequestVoteRequest from {} serverId bad format.", getNodeId(),
request.getServerId());
return RpcFactoryHelper
.responseFactory()
.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
"Parse candidateId failed: %s.", request.getServerId());
}
// 使用do-while(false)模式进行多层条件判断,便于使用break跳出
do {
if (request.getTerm() >= this.currTerm) {
LOG.info("Node {} received RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
request.getServerId(), request.getTerm(), this.currTerm);
// 如果候选人的term更大,则退位
if (request.getTerm() > this.currTerm) {
stepDown(request.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
"Raft node receives higher term RequestVoteRequest."));
}
} else {
// 如果候选人的term较小,不投票
LOG.info("Node {} ignore RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
request.getServerId(), request.getTerm(), this.currTerm);
break;
}
doUnlock = false;
this.writeLock.unlock(); // 解锁以获取lastLogId
final LogId lastLogId = this.logManager.getLastLogId(true);
doUnlock = true;
this.writeLock.lock();
// vote need ABA check after unlock&writeLock
if (request.getTerm() != this.currTerm) {
LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);
break;
}
// 检查候选人的日志是否至少与当前节点的日志一样新
// 根据Raft协议,只有日志更新或相等的候选人才能获得投票
final boolean logIsOk = new LogId(request.getLastLogIndex(), request.getLastLogTerm())
.compareTo(lastLogId) >= 0;
// 候选人日志更新 && 当前节点没有尚未投票
// 这里也体现了先来先服务的语义
if (logIsOk && (this.votedId == null || this.votedId.isEmpty())) {
// 退位
stepDown(request.getTerm(), false, new Status(RaftError.EVOTEFORCANDIDATE,
"Raft node votes for some candidate, step down to restart election_timer."));
// 记录投票给的候选人Id
this.votedId = candidateId.copy();
// 持久化投票信息
this.metaStorage.setVotedFor(candidateId);
}
} while (false);
return RequestVoteResponse.newBuilder()
.setTerm(this.currTerm)
.setGranted(request.getTerm() == this.currTerm && candidateId.equals(this.votedId))
.build();
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
发起选举节点收到其他节点的响应之后,就会在 OnRequestVoteRpcDone 回调中进行处理。
/**
* 处理正式投票响应。当候选人收到其他节点的投票响应时,调用此方法进行统计和处理。
*/
public void handleRequestVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) {
this.writeLock.lock();
try {
// 可能已经竞选成功为Leader或者退位为Follower,忽略请求
if (this.state != State.STATE_CANDIDATE) {
LOG.warn("Node {} received invalid RequestVoteResponse from {}, state not in STATE_CANDIDATE but {}.",
getNodeId(), peerId, this.state);
return;
}
// 响应的term必须与发起投票时的term一致
if (term != this.currTerm) {
LOG.warn("Node {} received stale RequestVoteResponse from {}, term={}, currTerm={}.", getNodeId(),
peerId, term, this.currTerm);
return;
}
// 有更高term值,需要退位
if (response.getTerm() > this.currTerm) {
LOG.warn("Node {} received invalid RequestVoteResponse from {}, term={}, expect={}.", getNodeId(),
peerId, response.getTerm(), this.currTerm);
stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
"Raft node receives higher term request_vote_response."));
return;
}
// 统计投票结果
if (response.getGranted()) {
// 记录该节点的投票支持
this.voteCtx.grant(peerId);
// 检查是否已获得过半数预投票支持
if (this.voteCtx.isGranted()) {
becomeLeader(); // 获得多数派支持,成为Leader
}
}
} finally {
this.writeLock.unlock();
}
}
对于正式投票请求的响应很简单,就是一些 check 之后统计选举票数,如果候选人获得多数派支持,则 becomeLeader。
private void becomeLeader() {
// 只有Candidate才能成为Leader
Requires.requireTrue(this.state == State.STATE_CANDIDATE, "Illegal state: " + this.state);
LOG.info("Node {} become leader of group, term={}, conf={}, oldConf={}.", getNodeId(), this.currTerm,
this.conf.getConf(), this.conf.getOldConf());
// 停止投票定时器,不再需要选举了
stopVoteTimer();
// 当前节点是Leader了
this.state = State.STATE_LEADER;
this.leaderId = this.serverId.copy();
// 重置复制组的term,确保使用Leader最新的term进行日志复制
this.replicatorGroup.resetTerm(this.currTerm);
// 为所有Follower启动复制器,用于日志复制和心跳
for (final PeerId peer : this.conf.listPeers()) {
if (peer.equals(this.serverId)) {
continue; // 跳过自己
}
LOG.debug("Node {} add a replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
if (!this.replicatorGroup.addReplicator(peer)) {
LOG.error("Fail to add a replicator, peer={}.", peer);
}
}
// 为所有Learner节点启动复制器,Learner只接收日志,并不参与投票
for (final PeerId peer : this.conf.listLearners()) {
LOG.debug("Node {} add a learner replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
if (!this.replicatorGroup.addReplicator(peer, ReplicatorType.Learner)) {
LOG.error("Fail to add a learner replicator, peer={}.", peer);
}
}
// 重置选票箱
this.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() + 1);
if (this.confCtx.isBusy()) {
throw new IllegalStateException();
}
// 将当前集群配置信息写入日志
this.confCtx.flush(this.conf.getConf(), this.conf.getOldConf());
// 启动stepDownTimer定时器
this.stepDownTimer.start();
}
当一个节点竞选成功成为 Leader 之后,其他所有节点都将成为该 Leader 的 Follower 节点,所以 Leader 节点需要建立到这些节点的复制关系,也包括 Learner,建立复制关系由 addReplicator 实现。
在 becomeLeader 中,还需要重置选票箱,在 JRaft 中,选票箱(BallotBox)负责跟踪每个日志条目在集群中的复制状态,根据多数派原则判断何时可以安全地提交日志到状态机,这部分逻辑会放在其他部分介绍。
通过 confCtx.flush
会将当前集群的节点配置信息作为当前 Leader 的第一条日志同步给集群中的所有 Follower 节点,这部分逻辑会放在其他部分介绍,但是在日志同步之前会设置一个 ConfigurationChangeDone 回调,当日志条目被 commit 之后会触发。
public void run(final Status status) {
if (status.isOk()) {
// 当配置变更日志被提交后,调用此方法
onConfigurationChangeDone(this.term);
if (this.leaderStart) {
// 回调状态机 StateMachine#onLeaderStart
getOptions().getFsm().onLeaderStart(this.term);
}
} else {
LOG.error("Fail to run ConfigurationChangeDone, status: {}.", status);
}
}
onConfigurationChangeDone 方法涉及到了 Raft 集群配置变更的部分,这部分逻辑会放在其他部分介绍。
将集群节点配置信息同步给其他 Follower 还有另外一个作用:即确保 Leader 可以知晓最新的提交信息。
在 Raft 中,由于提交限制的存在,所以当新 Leader 上任后,会立即向自己的日志追加一个空白条目(no-op entry),一旦这个空白条目被复制到多数派节点,根据 Raft 的提交规则,这个空白条目就被提交了。提交这个空白条目有一个非常重要的副作用:它使得所有在该空白条目之前的日志条目(包括之前任期的)都被间接提交了(因为 Raft 的提交规则保证,如果一个条目被提交,那么它之前的所有条目也都被提交)。
而这里的集群节点配置信息就充当了 no-op entry。
4.3 定期检查
Leader 需要定期检查自己的权威是否持续有效,即集群中的多数派节点都能响应自己的心跳请求,如果不是则当前 Leader 需要主动退位以避免可能出现的脑裂。
这一过程由 stepDownTimer 定时器负责,当节点成为 Leader 之后会启动 stepDownTimer 定时器,默认每半个选举超时时间触发一次,触发处理逻辑如下:
private void handleStepDownTimeout() {
// 使用读锁进行快速检查,避免不必要的写锁竞争
do {
this.readLock.lock();
try {
// Leader才能够退位
if (this.state.compareTo(State.STATE_TRANSFERRING) > 0) {
LOG.debug("Node {} stop step-down timer, term={}, state={}.", getNodeId(), this.currTerm,
this.state);
return;
}
// 获取当前单调时间,用于计算节点最后通信时间
final long monotonicNowMs = Utils.monotonicMs();
// 检查当前配置中的节点存活状态(不触发退位,只是检查)
if (!checkDeadNodes(this.conf.getConf(), monotonicNowMs, false)) {
// 如果当前配置中的节点无法维持多数派,跳出到写锁阶段
break;
}
// 如果存在旧配置(配置变更期间的联合配置),也需要检查
if (!this.conf.getOldConf().isEmpty()) {
if (!checkDeadNodes(this.conf.getOldConf(), monotonicNowMs, false)) {
// 如果旧配置中的节点无法维持多数派,跳出到写锁阶段
break;
}
}
// 如果所有配置中的节点都能维持多数派,直接返回,无需退位
return;
} finally {
this.readLock.unlock();
}
} while (false); // 使用do-while(false)模式实现类似goto的效果
// 使用写锁进行最终检查和可能的退位操作
this.writeLock.lock();
try {
// 再次检查节点状态(可能在等待写锁期间,状态已改变)
if (this.state.compareTo(State.STATE_TRANSFERRING) > 0) {
LOG.debug("Node {} stop step-down timer, term={}, state={}.", getNodeId(), this.currTerm, this.state);
return;
}
final long monotonicNowMs = Utils.monotonicMs();
// 检查当前配置中的节点,如果无法维持多数派则退位
checkDeadNodes(this.conf.getConf(), monotonicNowMs, true);
// 如果存在旧配置,也进行退位检查
if (!this.conf.getOldConf().isEmpty()) {
checkDeadNodes(this.conf.getOldConf(), monotonicNowMs, true);
}
} finally {
this.writeLock.unlock();
}
}
而检查节点是否存活(其他节点是否响应当前 Leader 的号召)并按需退位,由 checkDeadNodes 完成。
private boolean checkDeadNodes(final Configuration conf, final long monotonicNowMs,
final boolean stepDownOnCheckFail) {
// 首先检查Learner的复制器状态
// Learner不参与投票,但需要保持复制器的活跃状态
for (final PeerId peer : conf.getLearners()) {
checkReplicator(peer);
}
final List<PeerId> peers = conf.listPeers();
final Configuration deadNodes = new Configuration(); // 用于收集死亡节点信息
// 核心检查方法,检查投票节点的存活状态
if (checkDeadNodes0(peers, monotonicNowMs, true, deadNodes)) {
return true; // 能够维持多数派
}
// 如果无法维持多数派且允许退位,则执行退位操作
if (stepDownOnCheckFail) {
LOG.warn("Node {} steps down when alive nodes don't satisfy quorum, term={}, deadNodes={}, conf={}.",
getNodeId(), this.currTerm, deadNodes, conf);
final Status status = new Status();
status.setError(RaftError.ERAFTTIMEDOUT, "Majority of the group dies: %d/%d", deadNodes.size(),
peers.size());
// 执行退位操作,不是因为更高任期而下台
stepDown(this.currTerm, false, status);
}
return false; // 无法维持多数派
}
/**
* 通过检查每个节点的最后RPC发送时间,来判断节点是否存活,并计算是否能够维持多数派。
* 节点存活判断标准:当前时间 - 最后RPC发送时间 <= LeaderLeaseTimeout
*/
private boolean checkDeadNodes0(final List<PeerId> peers, final long monotonicNowMs, final boolean checkReplicator,
final Configuration deadNodes) {
// 获取Leader租约超时时间,用于判断节点是否存活
final int leaderLeaseTimeoutMs = this.options.getLeaderLeaseTimeoutMs();
int aliveCount = 0; // 存活节点计数
long startLease = Long.MAX_VALUE; // 记录最早的租约开始时间
// 遍历所有节点,检查存活状态
for (final PeerId peer : peers) {
// Leader自己总是被认为是存活的
if (peer.equals(this.serverId)) {
aliveCount++;
continue;
}
// 如果需要,检查并可能重启复制器
if (checkReplicator) {
checkReplicator(peer);
}
// 获取向该节点最后一次发送RPC的时间戳
final long lastRpcSendTimestamp = this.replicatorGroup.getLastRpcSendTimestamp(peer);
// 判断节点是否在租约期内(即是否存活)
if (monotonicNowMs - lastRpcSendTimestamp <= leaderLeaseTimeoutMs) {
// 节点存活
aliveCount++;
// 更新最早的租约开始时间,用于Leader租约管理
if (startLease > lastRpcSendTimestamp) {
startLease = lastRpcSendTimestamp;
}
continue;
}
// 节点被认为已死亡,记录到死亡节点列表中
if (deadNodes != null) {
deadNodes.addPeer(peer);
}
}
// 检查是否能够维持多数派 (存活节点数 >= 总节点数/2 + 1)
if (aliveCount >= peers.size() / 2 + 1) {
// 能够维持多数派,更新Leader的最后有效时间戳
updateLastLeaderTimestamp(startLease);
return true;
}
// 无法维持多数派
return false;
}
如果检查出存活的节点已不足多数派,那么当前 Leader 节点就需要退位,以避免可能出现的集群脑裂问题。
4.4 节点退位
节点退位由 stepDown 方法实现,负责将节点从 Leader 或 Candidate 状态安全地转换为 Follower,这是 Raft 中保证集群一致性和避免脑裂的关键机制。该方法在很多情况下都会被调用,比如:
1)发现更高的任期
// 收到更高任期节点的 RequestVote 请求
stepDown(request.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
"Raft node receives higher term RequestVoteRequest."));
// 收到更高任期节点的响应
stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
"Raft node receives higher term request_vote_response."));
2)Leader 定期维权失败
final Status status = new Status();
status.setError(RaftError.ERAFTTIMEDOUT, "Majority of the group dies: %d/%d", deadNodes.size(),
peers.size());
// 执行退位操作,不是因为更高任期而下台
stepDown(this.currTerm, false, status);
3)投票相关退位
// 投票给候选者后退位
stepDown(request.getTerm(), false, new Status(RaftError.EVOTEFORCANDIDATE,
"Raft node votes for some candidate, step down to restart election_timer."));
// 选举投票超时
stepDown(this.currTerm, false, new Status(RaftError.ETIMEDOUT,
"Vote timeout: fail to get quorum vote-granted."));
4)配置变更导致的退位
// 节点被从集群中移除
this.node.stepDown(this.node.currTerm, true, new Status(RaftError.ELEADERREMOVED,
"This node was removed."));
// setPeers操作
stepDown(this.currTerm + 1, false, new Status(RaftError.ESETPEER, "Raft node set peer normally"));
5)领导权冲突,集群脑裂
// 发现同任期内有多个Leader
stepDown(request.getTerm() + 1, false, new Status(RaftError.ELEADERCONFLICT,
"More than one leader in the same term."));
6)系统正常关闭
stepDown(this.currTerm, this.state == State.STATE_LEADER,
new Status(RaftError.ESHUTDOWN, "Raft node is going to quit."));
等 ……
节点 stepDown 的逻辑如下:
private void stepDown(final long term, final boolean wakeupCandidate, final Status status) {
LOG.debug("Node {} stepDown, term={}, newTerm={}, wakeupCandidate={}.", getNodeId(), this.currTerm, term,
wakeupCandidate);
// 如果节点已经不活跃(如已关闭),直接返回
if (!this.state.isActive()) {
return;
}
if (this.state == State.STATE_CANDIDATE) {
// 候选者退位:停止投票定时器
stopVoteTimer();
} else if (this.state.compareTo(State.STATE_TRANSFERRING) <= 0) {
// Leader或正在转移领导权的节点退位
stopStepDownTimer(); // 停止退位检查定时器
this.ballotBox.clearPendingTasks(); // 清空投票箱中的待处理任务
// 如果是Leader退位,通知状态机停止Leader行为
if (this.state == State.STATE_LEADER) {
onLeaderStop(status);
}
}
// 重置Leader信息,清空当前已知的LeaderId
resetLeaderId(PeerId.emptyPeer(), status);
// 更新内存中的软状态
this.state = State.STATE_FOLLOWER; // 转换为Follower状态
this.confCtx.reset(); // 重置配置变更上下文
updateLastLeaderTimestamp(Utils.monotonicMs()); // 更新最后Leader时间戳
// 中断正在进行的快照下载
if (this.snapshotExecutor != null) {
this.snapshotExecutor.interruptDownloadingSnapshots(term);
}
// 更新持久化的元数据状态
if (term > this.currTerm) {
// 如果新任期更高,更新当前任期和投票记录
this.currTerm = term;
this.votedId = PeerId.emptyPeer(); // 清空投票记录
// 持久化新的任期和投票信息
this.metaStorage.setTermAndVotedFor(term, this.votedId);
}
// 处理复制组的停止和候选者唤醒
if (wakeupCandidate) {
// 停止所有复制器并找到下一个候选者(用于领导权转移)
this.wakingCandidate = this.replicatorGroup.stopAllAndFindTheNextCandidate(this.conf);
if (this.wakingCandidate != null) {
// 向选定的候选者发送TimeoutNow消息,促使其立即开始选举
Replicator.sendTimeoutNowAndStop(this.wakingCandidate, this.options.getElectionTimeoutMs());
}
} else {
// 普通退位:直接停止所有复制器
this.replicatorGroup.stopAll();
}
// 清理领导权转移相关的状态
if (this.stopTransferArg != null) {
if (this.transferTimer != null) {
this.transferTimer.cancel(true); // 取消转移定时器
}
// 在同一任期内最多只有一个StopTransferTimer,安全地清空stopTransferArg
this.stopTransferArg = null;
}
// 重启选举定时器(Learner节点除外)
if (!isLearner()) {
// Follower重启选举定时器,准备参与下一轮选举
this.electionTimer.restart();
} else {
// Learner节点不参与选举,不启动选举定时器
LOG.info("Node {} is a learner, election timer is not started.", this.nodeId);
}
}
stepDown 方法有一个重要的参数:wakeupCandidate,它用于控制当前 Leader 在退位时是否要主动唤醒一个最佳候选者来快速接替领导权,而不是让集群进行自然的选举过程。
而选择这个最佳候选者的逻辑如下:
public PeerId findTheNextCandidate(final ConfigurationEntry conf) {
PeerId peerId = null;
int priority = Integer.MIN_VALUE;
long maxIndex = -1L;
for (final Map.Entry<PeerId, ThreadId> entry : this.replicatorMap.entrySet()) {
// 必须在当前配置中
if (!conf.contains(entry.getKey())) {
continue;
}
// 不能是禁止选举的节点
final int nextPriority = entry.getKey().getPriority();
if (nextPriority == ElectionPriority.NotElected) {
continue;
}
// 选择日志最新的节点
final long nextIndex = Replicator.getNextIndex(entry.getValue());
if (nextIndex > maxIndex) {
maxIndex = nextIndex;
peerId = entry.getKey();
priority = peerId.getPriority();
}
// 日志相同时选择优先级更高的节点
else if (nextIndex == maxIndex && nextPriority > priority) {
peerId = entry.getKey();
priority = peerId.getPriority();
}
}
if (maxIndex == -1L) {
return null;
} else {
return peerId;
}
}
为什么需要 wakeupCandidate?
wakeupCandidate 可以主动选择最佳候选者,避免多个节点同时发起选举,提高选举成功率;对于在计划内的退位(如节点关闭、配置变更)时,需要快速选出新 Leader,以减少集群无 Leader 的时间窗口。
而在一些场景下,自然选举是更合适的。
比如网络分区(Leader 定期维权失败)、发现更高任期、领导权冲突(集群脑裂产生),这些情况下,不进行额外干预是更好地选择,因为通过上面选择候选人的逻辑不一定能选出最优的 Leader 节点,或者说,在这些情况下,自然选举更安全。
5 文末言
这篇文章只是大概讲述了一下 Raft Leader 选举的流程,对于源码的剖析也只是在流程的视野内,对于像复制器、ballotBox、日志复制、配置变更(联合共识)等都没有进一步解释,这些内容会放到其他文章中。