三、分布式一致性协议
3.1 Paxos协议详解
Paxos是Leslie Lamport提出的分布式一致性协议,虽难理解但却是后续协议的基础。
// Paxos角色
public enum PaxosRole {
PROPOSER, // 提案者:提出value
ACCEPTOR, // 接受者:投票决定value
LEARNER // 学习者:学习最终决议
}
// Paxos两阶段流程
public class PaxosProtocol {
// Phase 1: Prepare阶段
// 1. Proposer生成提案编号n,向多数派Acceptor发送Prepare(n)
// 2. Acceptor收到Prepare(n):若n > 已响应的最大编号,则承诺不再接受小于n的提案,并返回已接受的最高编号提案
// Phase 2: Accept阶段
// 1. Proposer收到多数派响应后,选择编号最大的提案值v,发送Accept(n, v)
// 2. Acceptor收到Accept(n, v):若未响应过大于n的Prepare,则接受提案
private static class Proposal {
long number;
Object value;
}
private static class Acceptor {
private long lastPreparedNumber = 0; // 已响应的最大Prepare编号
private Proposal acceptedProposal = null; // 已接受的提案(编号和值)
// 处理Prepare请求
public synchronized PrepareResponse prepare(long proposalNumber) {
if (proposalNumber > lastPreparedNumber) {
lastPreparedNumber = proposalNumber;
return new PrepareResponse(true, acceptedProposal);
}
return new PrepareResponse(false, acceptedProposal);
}
// 处理Accept请求
public synchronized boolean accept(long proposalNumber, Object value) {
if (proposalNumber >= lastPreparedNumber) {
lastPreparedNumber = proposalNumber;
acceptedProposal = new Proposal();
acceptedProposal.number = proposalNumber;
acceptedProposal.value = value;
return true;
}
return false;
}
}
private static class Proposer {
private long proposalNumber;
private Acceptor[] acceptors;
public Object propose(Object value) {
// Phase 1: Prepare
proposalNumber = generateNextNumber();
List<PrepareResponse> prepareResponses = new ArrayList<>();
for (Acceptor acceptor : acceptors) {
PrepareResponse response = acceptor.prepare(proposalNumber);
if (response.isPromised) {
prepareResponses.add(response);
}
}
// 检查是否获得多数派承诺
if (prepareResponses.size() <= acceptors.length / 2) {
throw new RuntimeException("无法获得多数派承诺");
}
// 选择编号最大的已接受提案的值
Proposal maxProposal = null;
for (PrepareResponse response : prepareResponses) {
if (response.acceptedProposal != null) {
if (maxProposal == null ||
response.acceptedProposal.number > maxProposal.number) {
maxProposal = response.acceptedProposal;
}
}
}
// Phase 2: Accept
Object acceptValue = (maxProposal != null) ? maxProposal.value : value;
int acceptCount = 0;
for (Acceptor acceptor : acceptors) {
if (acceptor.accept(proposalNumber, acceptValue)) {
acceptCount++;
}
}
if (acceptCount > acceptors.length / 2) {
return acceptValue;
}
throw new RuntimeException("无法获得多数派接受");
}
}
}
3.2 Raft协议详解
Raft是更易于理解的分布式一致性协议,将问题分解为:领导者选举、日志复制、安全性。
public class RaftNode {
// 节点状态
private enum NodeState {
FOLLOWER, CANDIDATE, LEADER
}
private NodeState state = NodeState.FOLLOWER;
// 持久化状态(所有节点)
private long currentTerm = 0; // 当前任期号
private String votedFor = null; // 当前任期投票给谁
private List<LogEntry> log = new ArrayList<>(); // 日志条目
// 易失性状态(所有节点)
private long commitIndex = 0; // 已知已提交的最高日志索引
private long lastApplied = 0; // 已应用到状态机的最高日志索引
// 易失性状态(仅Leader)
private Map<String, Long> nextIndex = new ConcurrentHashMap<>(); // 每个节点下一个日志索引
private Map<String, Long> matchIndex = new ConcurrentHashMap<>(); // 每个节点已复制的最高索引
private final List<RaftNode> peers; // 集群中其他节点
private final Timer electionTimer; // 选举超时定时器
private final Timer heartbeatTimer; // 心跳定时器(Leader用)
static class LogEntry {
long term; // 任期号
int index; // 日志索引
String command; // 命令(如SET key value)
LogEntry(long term, int index, String command) {
this.term = term;
this.index = index;
this.command = command;
}
}
// 请求投票RPC
private class RequestVoteRPC {
long term;
String candidateId;
long lastLogIndex;
long lastLogTerm;
RequestVoteRPC(long term, String candidateId, long lastLogIndex, long lastLogTerm) {
this.term = term;
this.candidateId = candidateId;
this.lastLogIndex = lastLogIndex;
this.lastLogTerm = lastLogTerm;
}
}
private class RequestVoteResponse {
long term;
boolean voteGranted;
RequestVoteResponse(long term, boolean voteGranted) {
this.term = term;
this.voteGranted = voteGranted;
}
}
// 领导者选举
private void startElection() {
state = NodeState.CANDIDATE;
currentTerm++;
votedFor = getNodeId();
RequestVoteRPC request = new RequestVoteRPC(
currentTerm,
getNodeId(),
getLastLogIndex(),
getLastLogTerm()
);
int votesReceived = 1; // 投票给自己
for (RaftNode peer : peers) {
new Thread(() -> {
RequestVoteResponse response = peer.requestVote(request);
synchronized (this) {
if (response.term > currentTerm) {
// 发现更高任期,退化为Follower
becomeFollower(response.term);
return;
}
if (response.voteGranted) {
votesReceived++;
if (votesReceived > peers.size() / 2 && state == NodeState.CANDIDATE) {
becomeLeader();
}
}
}
}).start();
}
// 重置选举计时器
resetElectionTimer();
}
// 处理投票请求
private RequestVoteResponse requestVote(RequestVoteRPC request) {
synchronized (this) {
// 如果请求的任期小于当前任期,拒绝
if (request.term < currentTerm) {
return new RequestVoteResponse(currentTerm, false);
}
// 如果请求的任期大于当前任期,成为Follower
if (request.term > currentTerm) {
becomeFollower(request.term);
}
// 检查日志是否至少与本地一样新
boolean logUpToDate = isLogUpToDate(request.lastLogTerm, request.lastLogIndex);
// 如果尚未投票给其他人,或者投票给了同一个候选人,且日志足够新
if ((votedFor == null || votedFor.equals(request.candidateId)) && logUpToDate) {
votedFor = request.candidateId;
resetElectionTimer();
return new RequestVoteResponse(currentTerm, true);
}
return new RequestVoteResponse(currentTerm, false);
}
}
// 日志复制
private void replicateLog() {
for (RaftNode peer : peers) {
long nextIdx = nextIndex.getOrDefault(peer.getNodeId(), 1L);
List<LogEntry> entries = new ArrayList<>();
for (long i = nextIdx; i <= getLastLogIndex(); i++) {
entries.add(log.get((int) i - 1)); // 索引从1开始
}
AppendEntriesRPC request = new AppendEntriesRPC(
currentTerm,
getNodeId(),
nextIdx - 1,
getTermAtIndex(nextIdx - 1),
entries,
commitIndex
);
peer.appendEntries(request, response -> {
synchronized (this) {
if (response.success) {
// 更新matchIndex和nextIndex
long newMatchIndex = nextIdx + entries.size() - 1;
matchIndex.put(peer.getNodeId(), newMatchIndex);
nextIndex.put(peer.getNodeId(), newMatchIndex + 1);
// 提交日志
updateCommitIndex();
} else {
// 复制失败,减小nextIndex重试
if (response.term > currentTerm) {
becomeFollower(response.term);
} else {
nextIndex.put(peer.getNodeId(), Math.max(1, nextIdx - 1));
}
}
}
});
}
}
// 更新提交索引
private void updateCommitIndex() {
for (int i = (int) commitIndex + 1; i <= getLastLogIndex(); i++) {
// 检查日志是否在当前任期
if (log.get(i - 1).term != currentTerm) {
continue;
}
// 统计有多少节点复制了这条日志
int replicatedCount = 1; // 自己
for (Long matchIdx : matchIndex.values()) {
if (matchIdx >= i) {
replicatedCount++;
}
}
// 超过半数节点复制,提交
if (replicatedCount > peers.size() / 2) {
commitIndex = i;
applyLogToStateMachine();
}
}
}
// 成为Leader后的初始化
private void becomeLeader() {
state = NodeState.LEADER;
log.info("节点 {} 成为Leader,任期 {}", getNodeId(), currentTerm);
// 初始化nextIndex和matchIndex
long lastLogIndex = getLastLogIndex();
for (RaftNode peer : peers) {
nextIndex.put(peer.getNodeId(), lastLogIndex + 1);
matchIndex.put(peer.getNodeId(), 0L);
}
// 启动心跳
startHeartbeat();
}
// 成为Follower
private void becomeFollower(long term) {
state = NodeState.FOLLOWER;
currentTerm = term;
votedFor = null;
resetElectionTimer();
log.info("节点 {} 成为Follower,任期 {}", getNodeId(), term);
}
}
3.3 ZooKeeper的ZAB协议
ZAB协议是ZooKeeper使用的原子广播协议,类似Raft但有其特色。
// ZooKeeper节点状态
public class ZKNode {
// ZXID结构:高32位为epoch(纪元),低32位为counter
public static long createZxid(long epoch, long counter) {
return (epoch << 32) | (counter & 0xFFFFFFFFL);
}
// 启动模式
public enum ServerState {
LOOKING, // 寻找Leader
FOLLOWING, // 跟随者
LEADING, // 领导者
OBSERVING // 观察者(不参与投票)
}
// 事务处理
public class ZKDatabase {
private final TreeMap<Long, TxnLogEntry> txnLog = new TreeMap<>();
private final DataTree dataTree = new DataTree();
public long processTxn(TxnHeader header, Record txn) {
long zxid = header.getZxid();
txnLog.put(zxid, new TxnLogEntry(header, txn, dataTree));
return zxid;
}
// 数据同步
public void sync(long peerZxid) {
// 获取peerZxid之后的所有事务
SortedMap<Long, TxnLogEntry> proposals = txnLog.tailMap(peerZxid + 1);
for (Map.Entry<Long, TxnLogEntry> entry : proposals.entrySet()) {
// 发送给Follower
sendProposal(entry.getValue());
}
}
}
// 领导者选举(FastLeaderElection)
public class FastLeaderElection {
private static class Vote {
long zxid;
long sid; // Server ID
boolean isBetter(Vote other) {
if (zxid != other.zxid) {
return zxid > other.zxid;
}
return sid > other.sid;
}
}
public Vote lookForLeader() {
// 投票给自己
Vote selfVote = new Vote();
selfVote.zxid = getLastZxid();
selfVote.sid = getServerId();
sendNotifications(selfVote);
// 收集其他节点的投票
Map<Long, Vote> votes = new ConcurrentHashMap<>();
votes.put(selfVote.sid, selfVote);
while (true) {
Notification n = recvQueue.poll(500, TimeUnit.MILLISECONDS);
if (n == null) {
// 超时,重新发送通知
sendNotifications(selfVote);
continue;
}
Vote receivedVote = new Vote();
receivedVote.zxid = n.zxid;
receivedVote.sid = n.sid;
// 更新逻辑时钟
if (n.state == ServerState.LOOKING && n.logicalClock > logicalClock) {
logicalClock = n.logicalClock;
receivedVote = selfVote;
}
if (isBetterVote(receivedVote, selfVote)) {
selfVote = receivedVote;
sendNotifications(selfVote);
}
votes.put(n.sid, receivedVote);
// 检查是否获得多数票
int count = 0;
for (Vote v : votes.values()) {
if (v.sid == selfVote.sid && v.zxid == selfVote.zxid) {
count++;
}
}
if (count > getQuorumSize()) {
// 确认投票
if (n.state == ServerState.LEADING) {
return selfVote;
}
}
}
}
}
}