程序员必备的十大技能(进阶版)之分布式核心技术(二)

简介: 教程来源 http://xcfsr.cn/ Paxos、Raft、ZAB是三大分布式一致性协议:Paxos理论奠基但复杂;Raft分治设计(选举/日志/安全),易理解易实现;ZAB为ZooKeeper定制,基于ZXID实现原子广播与崩溃恢复。

三、分布式一致性协议

3.1 Paxos协议详解
Paxos是Leslie Lamport提出的分布式一致性协议,虽难理解但却是后续协议的基础。

// Paxos角色
public enum PaxosRole {
    PROPOSER,   // 提案者:提出value
    ACCEPTOR,   // 接受者:投票决定value
    LEARNER     // 学习者:学习最终决议
}

// Paxos两阶段流程
public class PaxosProtocol {

    // Phase 1: Prepare阶段
    // 1. Proposer生成提案编号n,向多数派Acceptor发送Prepare(n)
    // 2. Acceptor收到Prepare(n):若n > 已响应的最大编号,则承诺不再接受小于n的提案,并返回已接受的最高编号提案

    // Phase 2: Accept阶段
    // 1. Proposer收到多数派响应后,选择编号最大的提案值v,发送Accept(n, v)
    // 2. Acceptor收到Accept(n, v):若未响应过大于n的Prepare,则接受提案

    private static class Proposal {
        long number;
        Object value;
    }

    private static class Acceptor {
        private long lastPreparedNumber = 0;      // 已响应的最大Prepare编号
        private Proposal acceptedProposal = null; // 已接受的提案(编号和值)

        // 处理Prepare请求
        public synchronized PrepareResponse prepare(long proposalNumber) {
            if (proposalNumber > lastPreparedNumber) {
                lastPreparedNumber = proposalNumber;
                return new PrepareResponse(true, acceptedProposal);
            }
            return new PrepareResponse(false, acceptedProposal);
        }

        // 处理Accept请求
        public synchronized boolean accept(long proposalNumber, Object value) {
            if (proposalNumber >= lastPreparedNumber) {
                lastPreparedNumber = proposalNumber;
                acceptedProposal = new Proposal();
                acceptedProposal.number = proposalNumber;
                acceptedProposal.value = value;
                return true;
            }
            return false;
        }
    }

    private static class Proposer {
        private long proposalNumber;
        private Acceptor[] acceptors;

        public Object propose(Object value) {
            // Phase 1: Prepare
            proposalNumber = generateNextNumber();
            List<PrepareResponse> prepareResponses = new ArrayList<>();

            for (Acceptor acceptor : acceptors) {
                PrepareResponse response = acceptor.prepare(proposalNumber);
                if (response.isPromised) {
                    prepareResponses.add(response);
                }
            }

            // 检查是否获得多数派承诺
            if (prepareResponses.size() <= acceptors.length / 2) {
                throw new RuntimeException("无法获得多数派承诺");
            }

            // 选择编号最大的已接受提案的值
            Proposal maxProposal = null;
            for (PrepareResponse response : prepareResponses) {
                if (response.acceptedProposal != null) {
                    if (maxProposal == null || 
                        response.acceptedProposal.number > maxProposal.number) {
                        maxProposal = response.acceptedProposal;
                    }
                }
            }

            // Phase 2: Accept
            Object acceptValue = (maxProposal != null) ? maxProposal.value : value;
            int acceptCount = 0;

            for (Acceptor acceptor : acceptors) {
                if (acceptor.accept(proposalNumber, acceptValue)) {
                    acceptCount++;
                }
            }

            if (acceptCount > acceptors.length / 2) {
                return acceptValue;
            }

            throw new RuntimeException("无法获得多数派接受");
        }
    }
}

3.2 Raft协议详解
Raft是更易于理解的分布式一致性协议,将问题分解为:领导者选举、日志复制、安全性。

public class RaftNode {

    // 节点状态
    private enum NodeState {
        FOLLOWER, CANDIDATE, LEADER
    }

    private NodeState state = NodeState.FOLLOWER;

    // 持久化状态(所有节点)
    private long currentTerm = 0;              // 当前任期号
    private String votedFor = null;            // 当前任期投票给谁
    private List<LogEntry> log = new ArrayList<>(); // 日志条目

    // 易失性状态(所有节点)
    private long commitIndex = 0;              // 已知已提交的最高日志索引
    private long lastApplied = 0;              // 已应用到状态机的最高日志索引

    // 易失性状态(仅Leader)
    private Map<String, Long> nextIndex = new ConcurrentHashMap<>();   // 每个节点下一个日志索引
    private Map<String, Long> matchIndex = new ConcurrentHashMap<>();  // 每个节点已复制的最高索引

    private final List<RaftNode> peers;         // 集群中其他节点
    private final Timer electionTimer;          // 选举超时定时器
    private final Timer heartbeatTimer;         // 心跳定时器(Leader用)

    static class LogEntry {
        long term;          // 任期号
        int index;          // 日志索引
        String command;     // 命令(如SET key value)

        LogEntry(long term, int index, String command) {
            this.term = term;
            this.index = index;
            this.command = command;
        }
    }

    // 请求投票RPC
    private class RequestVoteRPC {
        long term;
        String candidateId;
        long lastLogIndex;
        long lastLogTerm;

        RequestVoteRPC(long term, String candidateId, long lastLogIndex, long lastLogTerm) {
            this.term = term;
            this.candidateId = candidateId;
            this.lastLogIndex = lastLogIndex;
            this.lastLogTerm = lastLogTerm;
        }
    }

    private class RequestVoteResponse {
        long term;
        boolean voteGranted;

        RequestVoteResponse(long term, boolean voteGranted) {
            this.term = term;
            this.voteGranted = voteGranted;
        }
    }

    // 领导者选举
    private void startElection() {
        state = NodeState.CANDIDATE;
        currentTerm++;
        votedFor = getNodeId();

        RequestVoteRPC request = new RequestVoteRPC(
            currentTerm, 
            getNodeId(), 
            getLastLogIndex(), 
            getLastLogTerm()
        );

        int votesReceived = 1;  // 投票给自己
        for (RaftNode peer : peers) {
            new Thread(() -> {
                RequestVoteResponse response = peer.requestVote(request);
                synchronized (this) {
                    if (response.term > currentTerm) {
                        // 发现更高任期,退化为Follower
                        becomeFollower(response.term);
                        return;
                    }
                    if (response.voteGranted) {
                        votesReceived++;
                        if (votesReceived > peers.size() / 2 && state == NodeState.CANDIDATE) {
                            becomeLeader();
                        }
                    }
                }
            }).start();
        }

        // 重置选举计时器
        resetElectionTimer();
    }

    // 处理投票请求
    private RequestVoteResponse requestVote(RequestVoteRPC request) {
        synchronized (this) {
            // 如果请求的任期小于当前任期,拒绝
            if (request.term < currentTerm) {
                return new RequestVoteResponse(currentTerm, false);
            }

            // 如果请求的任期大于当前任期,成为Follower
            if (request.term > currentTerm) {
                becomeFollower(request.term);
            }

            // 检查日志是否至少与本地一样新
            boolean logUpToDate = isLogUpToDate(request.lastLogTerm, request.lastLogIndex);

            // 如果尚未投票给其他人,或者投票给了同一个候选人,且日志足够新
            if ((votedFor == null || votedFor.equals(request.candidateId)) && logUpToDate) {
                votedFor = request.candidateId;
                resetElectionTimer();
                return new RequestVoteResponse(currentTerm, true);
            }

            return new RequestVoteResponse(currentTerm, false);
        }
    }

    // 日志复制
    private void replicateLog() {
        for (RaftNode peer : peers) {
            long nextIdx = nextIndex.getOrDefault(peer.getNodeId(), 1L);
            List<LogEntry> entries = new ArrayList<>();
            for (long i = nextIdx; i <= getLastLogIndex(); i++) {
                entries.add(log.get((int) i - 1));  // 索引从1开始
            }

            AppendEntriesRPC request = new AppendEntriesRPC(
                currentTerm,
                getNodeId(),
                nextIdx - 1,
                getTermAtIndex(nextIdx - 1),
                entries,
                commitIndex
            );

            peer.appendEntries(request, response -> {
                synchronized (this) {
                    if (response.success) {
                        // 更新matchIndex和nextIndex
                        long newMatchIndex = nextIdx + entries.size() - 1;
                        matchIndex.put(peer.getNodeId(), newMatchIndex);
                        nextIndex.put(peer.getNodeId(), newMatchIndex + 1);

                        // 提交日志
                        updateCommitIndex();
                    } else {
                        // 复制失败,减小nextIndex重试
                        if (response.term > currentTerm) {
                            becomeFollower(response.term);
                        } else {
                            nextIndex.put(peer.getNodeId(), Math.max(1, nextIdx - 1));
                        }
                    }
                }
            });
        }
    }

    // 更新提交索引
    private void updateCommitIndex() {
        for (int i = (int) commitIndex + 1; i <= getLastLogIndex(); i++) {
            // 检查日志是否在当前任期
            if (log.get(i - 1).term != currentTerm) {
                continue;
            }

            // 统计有多少节点复制了这条日志
            int replicatedCount = 1;  // 自己
            for (Long matchIdx : matchIndex.values()) {
                if (matchIdx >= i) {
                    replicatedCount++;
                }
            }

            // 超过半数节点复制,提交
            if (replicatedCount > peers.size() / 2) {
                commitIndex = i;
                applyLogToStateMachine();
            }
        }
    }

    // 成为Leader后的初始化
    private void becomeLeader() {
        state = NodeState.LEADER;
        log.info("节点 {} 成为Leader,任期 {}", getNodeId(), currentTerm);

        // 初始化nextIndex和matchIndex
        long lastLogIndex = getLastLogIndex();
        for (RaftNode peer : peers) {
            nextIndex.put(peer.getNodeId(), lastLogIndex + 1);
            matchIndex.put(peer.getNodeId(), 0L);
        }

        // 启动心跳
        startHeartbeat();
    }

    // 成为Follower
    private void becomeFollower(long term) {
        state = NodeState.FOLLOWER;
        currentTerm = term;
        votedFor = null;
        resetElectionTimer();
        log.info("节点 {} 成为Follower,任期 {}", getNodeId(), term);
    }
}

3.3 ZooKeeper的ZAB协议
ZAB协议是ZooKeeper使用的原子广播协议,类似Raft但有其特色。

// ZooKeeper节点状态
public class ZKNode {

    // ZXID结构:高32位为epoch(纪元),低32位为counter
    public static long createZxid(long epoch, long counter) {
        return (epoch << 32) | (counter & 0xFFFFFFFFL);
    }

    // 启动模式
    public enum ServerState {
        LOOKING,   // 寻找Leader
        FOLLOWING, // 跟随者
        LEADING,   // 领导者
        OBSERVING  // 观察者(不参与投票)
    }

    // 事务处理
    public class ZKDatabase {
        private final TreeMap<Long, TxnLogEntry> txnLog = new TreeMap<>();
        private final DataTree dataTree = new DataTree();

        public long processTxn(TxnHeader header, Record txn) {
            long zxid = header.getZxid();
            txnLog.put(zxid, new TxnLogEntry(header, txn, dataTree));
            return zxid;
        }

        // 数据同步
        public void sync(long peerZxid) {
            // 获取peerZxid之后的所有事务
            SortedMap<Long, TxnLogEntry> proposals = txnLog.tailMap(peerZxid + 1);
            for (Map.Entry<Long, TxnLogEntry> entry : proposals.entrySet()) {
                // 发送给Follower
                sendProposal(entry.getValue());
            }
        }
    }

    // 领导者选举(FastLeaderElection)
    public class FastLeaderElection {

        private static class Vote {
            long zxid;
            long sid;  // Server ID

            boolean isBetter(Vote other) {
                if (zxid != other.zxid) {
                    return zxid > other.zxid;
                }
                return sid > other.sid;
            }
        }

        public Vote lookForLeader() {
            // 投票给自己
            Vote selfVote = new Vote();
            selfVote.zxid = getLastZxid();
            selfVote.sid = getServerId();

            sendNotifications(selfVote);

            // 收集其他节点的投票
            Map<Long, Vote> votes = new ConcurrentHashMap<>();
            votes.put(selfVote.sid, selfVote);

            while (true) {
                Notification n = recvQueue.poll(500, TimeUnit.MILLISECONDS);
                if (n == null) {
                    // 超时,重新发送通知
                    sendNotifications(selfVote);
                    continue;
                }

                Vote receivedVote = new Vote();
                receivedVote.zxid = n.zxid;
                receivedVote.sid = n.sid;

                // 更新逻辑时钟
                if (n.state == ServerState.LOOKING && n.logicalClock > logicalClock) {
                    logicalClock = n.logicalClock;
                    receivedVote = selfVote;
                }

                if (isBetterVote(receivedVote, selfVote)) {
                    selfVote = receivedVote;
                    sendNotifications(selfVote);
                }

                votes.put(n.sid, receivedVote);

                // 检查是否获得多数票
                int count = 0;
                for (Vote v : votes.values()) {
                    if (v.sid == selfVote.sid && v.zxid == selfVote.zxid) {
                        count++;
                    }
                }

                if (count > getQuorumSize()) {
                    // 确认投票
                    if (n.state == ServerState.LEADING) {
                        return selfVote;
                    }
                }
            }
        }
    }
}

来源:
http://htnus.cn/

相关文章
|
9天前
|
Shell API 开发工具
Claude Code 快速上手指南(新手友好版)
AI编程工具卷疯啦!Claude Code凭借任务驱动+终端原生的特性,成了开发者的效率搭子。本文从安装、登录、切换国产模型到常用命令,手把手带新手快速上手,全程避坑,30分钟独立用起来。
2794 16
|
6天前
|
人工智能 开发工具 iOS开发
Claude Code 新手完全上手指南:安装、国产模型配置与常用命令全解
Claude Code 是一款运行在终端环境中的 AI 编程助手,能够直接在命令行中完成代码生成、项目分析、文件修改、命令执行、Git 管理等开发全流程工作。它最大的特点是**任务驱动、终端原生、轻量高效、多模型兼容**,无需图形界面、不依赖 IDE 插件,能够深度融入开发者日常工作流。
2383 5
|
21天前
|
人工智能 JSON 供应链
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
LucianaiB分享零成本畅用JVS Claw教程(学生认证享7个月使用权),并开源GeoMind项目——将JVS改造为科研与产业地理情报可视化AI助手,支持飞书文档解析、地理编码与腾讯地图可视化,助力产业关系图谱构建。
23554 14
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
|
8天前
|
人工智能 JSON BI
DeepSeek V4-Pro 接入 Claude Code 完全实战:体验、测试与关键避坑指南
Claude Code 作为当前主流的 AI 编程辅助工具,凭借强大的代码理解、工程执行与自动化能力深受开发者喜爱,但原生模型的使用成本相对较高。为了在保持能力的同时进一步降低开销,不少开发者开始寻找兼容度高、价格更友好的替代模型。DeepSeek V4 系列的发布带来了新的选择,该系列包含 V4-Pro 与 V4-Flash 两款模型,并提供了与 Anthropic 完全兼容的 API 接口,理论上只需简单修改配置,即可让 Claude Code 无缝切换为 DeepSeek 引擎。
2086 2
|
2天前
|
人工智能 Linux BI
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek
JeecgBoot AI专题研究 一键脚本:Claude Code + JeecgBoot Skills + DeepSeek 全平台接入 一行命令装好 Claude Code + JeecgBoot Skills + DeepSeek 接入,无需翻墙使用 Claude Code,支持 Wind
1362 1
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek
|
15天前
|
人工智能 缓存 Shell
Claude Code 全攻略:命令大全 + 实战工作流(完整版)
Claude Code 是一款运行在终端环境下的 AI 编码助手,能够直接在项目目录中理解代码结构、编辑文件、执行命令、执行开发计划,并支持持久化记忆、上下文压缩、后台任务、多模型切换等专业能力。对于日常开发、项目维护、快速重构、代码审查等场景,它可以大幅减少手动操作、提升编码效率。本文从常用命令、界面模式、核心指令、记忆机制、图片处理、进阶工作流等维度完整说明,帮助开发者快速上手并稳定使用。
3483 6
|
7天前
|
人工智能 安全 开发工具
Claude Code 官方工作原理与使用指南
Claude Code 不是传统代码补全工具,而是 Anthropic 推出的终端 AI 代理,具备代理循环、双驱动架构(模型+工具)、全局项目感知、6 种权限模式等核心能力,本文基于官方文档系统解析其工作原理与高效使用技巧。
1113 0