程序员必备的十大技能(进阶版)之分布式核心技术(二)

简介: 教程来源 http://xcfsr.cn/ Paxos、Raft、ZAB是三大分布式一致性协议:Paxos理论奠基但复杂;Raft分治设计(选举/日志/安全),易理解易实现;ZAB为ZooKeeper定制,基于ZXID实现原子广播与崩溃恢复。

三、分布式一致性协议

3.1 Paxos协议详解
Paxos是Leslie Lamport提出的分布式一致性协议,虽难理解但却是后续协议的基础。

// Paxos角色
public enum PaxosRole {
    PROPOSER,   // 提案者:提出value
    ACCEPTOR,   // 接受者:投票决定value
    LEARNER     // 学习者:学习最终决议
}

// Paxos两阶段流程
public class PaxosProtocol {

    // Phase 1: Prepare阶段
    // 1. Proposer生成提案编号n,向多数派Acceptor发送Prepare(n)
    // 2. Acceptor收到Prepare(n):若n > 已响应的最大编号,则承诺不再接受小于n的提案,并返回已接受的最高编号提案

    // Phase 2: Accept阶段
    // 1. Proposer收到多数派响应后,选择编号最大的提案值v,发送Accept(n, v)
    // 2. Acceptor收到Accept(n, v):若未响应过大于n的Prepare,则接受提案

    private static class Proposal {
        long number;
        Object value;
    }

    private static class Acceptor {
        private long lastPreparedNumber = 0;      // 已响应的最大Prepare编号
        private Proposal acceptedProposal = null; // 已接受的提案(编号和值)

        // 处理Prepare请求
        public synchronized PrepareResponse prepare(long proposalNumber) {
            if (proposalNumber > lastPreparedNumber) {
                lastPreparedNumber = proposalNumber;
                return new PrepareResponse(true, acceptedProposal);
            }
            return new PrepareResponse(false, acceptedProposal);
        }

        // 处理Accept请求
        public synchronized boolean accept(long proposalNumber, Object value) {
            if (proposalNumber >= lastPreparedNumber) {
                lastPreparedNumber = proposalNumber;
                acceptedProposal = new Proposal();
                acceptedProposal.number = proposalNumber;
                acceptedProposal.value = value;
                return true;
            }
            return false;
        }
    }

    private static class Proposer {
        private long proposalNumber;
        private Acceptor[] acceptors;

        public Object propose(Object value) {
            // Phase 1: Prepare
            proposalNumber = generateNextNumber();
            List<PrepareResponse> prepareResponses = new ArrayList<>();

            for (Acceptor acceptor : acceptors) {
                PrepareResponse response = acceptor.prepare(proposalNumber);
                if (response.isPromised) {
                    prepareResponses.add(response);
                }
            }

            // 检查是否获得多数派承诺
            if (prepareResponses.size() <= acceptors.length / 2) {
                throw new RuntimeException("无法获得多数派承诺");
            }

            // 选择编号最大的已接受提案的值
            Proposal maxProposal = null;
            for (PrepareResponse response : prepareResponses) {
                if (response.acceptedProposal != null) {
                    if (maxProposal == null || 
                        response.acceptedProposal.number > maxProposal.number) {
                        maxProposal = response.acceptedProposal;
                    }
                }
            }

            // Phase 2: Accept
            Object acceptValue = (maxProposal != null) ? maxProposal.value : value;
            int acceptCount = 0;

            for (Acceptor acceptor : acceptors) {
                if (acceptor.accept(proposalNumber, acceptValue)) {
                    acceptCount++;
                }
            }

            if (acceptCount > acceptors.length / 2) {
                return acceptValue;
            }

            throw new RuntimeException("无法获得多数派接受");
        }
    }
}

3.2 Raft协议详解
Raft是更易于理解的分布式一致性协议,将问题分解为:领导者选举、日志复制、安全性。

public class RaftNode {

    // 节点状态
    private enum NodeState {
        FOLLOWER, CANDIDATE, LEADER
    }

    private NodeState state = NodeState.FOLLOWER;

    // 持久化状态(所有节点)
    private long currentTerm = 0;              // 当前任期号
    private String votedFor = null;            // 当前任期投票给谁
    private List<LogEntry> log = new ArrayList<>(); // 日志条目

    // 易失性状态(所有节点)
    private long commitIndex = 0;              // 已知已提交的最高日志索引
    private long lastApplied = 0;              // 已应用到状态机的最高日志索引

    // 易失性状态(仅Leader)
    private Map<String, Long> nextIndex = new ConcurrentHashMap<>();   // 每个节点下一个日志索引
    private Map<String, Long> matchIndex = new ConcurrentHashMap<>();  // 每个节点已复制的最高索引

    private final List<RaftNode> peers;         // 集群中其他节点
    private final Timer electionTimer;          // 选举超时定时器
    private final Timer heartbeatTimer;         // 心跳定时器(Leader用)

    static class LogEntry {
        long term;          // 任期号
        int index;          // 日志索引
        String command;     // 命令(如SET key value)

        LogEntry(long term, int index, String command) {
            this.term = term;
            this.index = index;
            this.command = command;
        }
    }

    // 请求投票RPC
    private class RequestVoteRPC {
        long term;
        String candidateId;
        long lastLogIndex;
        long lastLogTerm;

        RequestVoteRPC(long term, String candidateId, long lastLogIndex, long lastLogTerm) {
            this.term = term;
            this.candidateId = candidateId;
            this.lastLogIndex = lastLogIndex;
            this.lastLogTerm = lastLogTerm;
        }
    }

    private class RequestVoteResponse {
        long term;
        boolean voteGranted;

        RequestVoteResponse(long term, boolean voteGranted) {
            this.term = term;
            this.voteGranted = voteGranted;
        }
    }

    // 领导者选举
    private void startElection() {
        state = NodeState.CANDIDATE;
        currentTerm++;
        votedFor = getNodeId();

        RequestVoteRPC request = new RequestVoteRPC(
            currentTerm, 
            getNodeId(), 
            getLastLogIndex(), 
            getLastLogTerm()
        );

        int votesReceived = 1;  // 投票给自己
        for (RaftNode peer : peers) {
            new Thread(() -> {
                RequestVoteResponse response = peer.requestVote(request);
                synchronized (this) {
                    if (response.term > currentTerm) {
                        // 发现更高任期,退化为Follower
                        becomeFollower(response.term);
                        return;
                    }
                    if (response.voteGranted) {
                        votesReceived++;
                        if (votesReceived > peers.size() / 2 && state == NodeState.CANDIDATE) {
                            becomeLeader();
                        }
                    }
                }
            }).start();
        }

        // 重置选举计时器
        resetElectionTimer();
    }

    // 处理投票请求
    private RequestVoteResponse requestVote(RequestVoteRPC request) {
        synchronized (this) {
            // 如果请求的任期小于当前任期,拒绝
            if (request.term < currentTerm) {
                return new RequestVoteResponse(currentTerm, false);
            }

            // 如果请求的任期大于当前任期,成为Follower
            if (request.term > currentTerm) {
                becomeFollower(request.term);
            }

            // 检查日志是否至少与本地一样新
            boolean logUpToDate = isLogUpToDate(request.lastLogTerm, request.lastLogIndex);

            // 如果尚未投票给其他人,或者投票给了同一个候选人,且日志足够新
            if ((votedFor == null || votedFor.equals(request.candidateId)) && logUpToDate) {
                votedFor = request.candidateId;
                resetElectionTimer();
                return new RequestVoteResponse(currentTerm, true);
            }

            return new RequestVoteResponse(currentTerm, false);
        }
    }

    // 日志复制
    private void replicateLog() {
        for (RaftNode peer : peers) {
            long nextIdx = nextIndex.getOrDefault(peer.getNodeId(), 1L);
            List<LogEntry> entries = new ArrayList<>();
            for (long i = nextIdx; i <= getLastLogIndex(); i++) {
                entries.add(log.get((int) i - 1));  // 索引从1开始
            }

            AppendEntriesRPC request = new AppendEntriesRPC(
                currentTerm,
                getNodeId(),
                nextIdx - 1,
                getTermAtIndex(nextIdx - 1),
                entries,
                commitIndex
            );

            peer.appendEntries(request, response -> {
                synchronized (this) {
                    if (response.success) {
                        // 更新matchIndex和nextIndex
                        long newMatchIndex = nextIdx + entries.size() - 1;
                        matchIndex.put(peer.getNodeId(), newMatchIndex);
                        nextIndex.put(peer.getNodeId(), newMatchIndex + 1);

                        // 提交日志
                        updateCommitIndex();
                    } else {
                        // 复制失败,减小nextIndex重试
                        if (response.term > currentTerm) {
                            becomeFollower(response.term);
                        } else {
                            nextIndex.put(peer.getNodeId(), Math.max(1, nextIdx - 1));
                        }
                    }
                }
            });
        }
    }

    // 更新提交索引
    private void updateCommitIndex() {
        for (int i = (int) commitIndex + 1; i <= getLastLogIndex(); i++) {
            // 检查日志是否在当前任期
            if (log.get(i - 1).term != currentTerm) {
                continue;
            }

            // 统计有多少节点复制了这条日志
            int replicatedCount = 1;  // 自己
            for (Long matchIdx : matchIndex.values()) {
                if (matchIdx >= i) {
                    replicatedCount++;
                }
            }

            // 超过半数节点复制,提交
            if (replicatedCount > peers.size() / 2) {
                commitIndex = i;
                applyLogToStateMachine();
            }
        }
    }

    // 成为Leader后的初始化
    private void becomeLeader() {
        state = NodeState.LEADER;
        log.info("节点 {} 成为Leader,任期 {}", getNodeId(), currentTerm);

        // 初始化nextIndex和matchIndex
        long lastLogIndex = getLastLogIndex();
        for (RaftNode peer : peers) {
            nextIndex.put(peer.getNodeId(), lastLogIndex + 1);
            matchIndex.put(peer.getNodeId(), 0L);
        }

        // 启动心跳
        startHeartbeat();
    }

    // 成为Follower
    private void becomeFollower(long term) {
        state = NodeState.FOLLOWER;
        currentTerm = term;
        votedFor = null;
        resetElectionTimer();
        log.info("节点 {} 成为Follower,任期 {}", getNodeId(), term);
    }
}

3.3 ZooKeeper的ZAB协议
ZAB协议是ZooKeeper使用的原子广播协议,类似Raft但有其特色。

// ZooKeeper节点状态
public class ZKNode {

    // ZXID结构:高32位为epoch(纪元),低32位为counter
    public static long createZxid(long epoch, long counter) {
        return (epoch << 32) | (counter & 0xFFFFFFFFL);
    }

    // 启动模式
    public enum ServerState {
        LOOKING,   // 寻找Leader
        FOLLOWING, // 跟随者
        LEADING,   // 领导者
        OBSERVING  // 观察者(不参与投票)
    }

    // 事务处理
    public class ZKDatabase {
        private final TreeMap<Long, TxnLogEntry> txnLog = new TreeMap<>();
        private final DataTree dataTree = new DataTree();

        public long processTxn(TxnHeader header, Record txn) {
            long zxid = header.getZxid();
            txnLog.put(zxid, new TxnLogEntry(header, txn, dataTree));
            return zxid;
        }

        // 数据同步
        public void sync(long peerZxid) {
            // 获取peerZxid之后的所有事务
            SortedMap<Long, TxnLogEntry> proposals = txnLog.tailMap(peerZxid + 1);
            for (Map.Entry<Long, TxnLogEntry> entry : proposals.entrySet()) {
                // 发送给Follower
                sendProposal(entry.getValue());
            }
        }
    }

    // 领导者选举(FastLeaderElection)
    public class FastLeaderElection {

        private static class Vote {
            long zxid;
            long sid;  // Server ID

            boolean isBetter(Vote other) {
                if (zxid != other.zxid) {
                    return zxid > other.zxid;
                }
                return sid > other.sid;
            }
        }

        public Vote lookForLeader() {
            // 投票给自己
            Vote selfVote = new Vote();
            selfVote.zxid = getLastZxid();
            selfVote.sid = getServerId();

            sendNotifications(selfVote);

            // 收集其他节点的投票
            Map<Long, Vote> votes = new ConcurrentHashMap<>();
            votes.put(selfVote.sid, selfVote);

            while (true) {
                Notification n = recvQueue.poll(500, TimeUnit.MILLISECONDS);
                if (n == null) {
                    // 超时,重新发送通知
                    sendNotifications(selfVote);
                    continue;
                }

                Vote receivedVote = new Vote();
                receivedVote.zxid = n.zxid;
                receivedVote.sid = n.sid;

                // 更新逻辑时钟
                if (n.state == ServerState.LOOKING && n.logicalClock > logicalClock) {
                    logicalClock = n.logicalClock;
                    receivedVote = selfVote;
                }

                if (isBetterVote(receivedVote, selfVote)) {
                    selfVote = receivedVote;
                    sendNotifications(selfVote);
                }

                votes.put(n.sid, receivedVote);

                // 检查是否获得多数票
                int count = 0;
                for (Vote v : votes.values()) {
                    if (v.sid == selfVote.sid && v.zxid == selfVote.zxid) {
                        count++;
                    }
                }

                if (count > getQuorumSize()) {
                    // 确认投票
                    if (n.state == ServerState.LEADING) {
                        return selfVote;
                    }
                }
            }
        }
    }
}

来源:
http://htnus.cn/

相关文章
|
14天前
|
人工智能 API 开发者
阿里云发布为Agent而生的全新AI产品官网“千问云”,模型服务全面Skill、CLI化
5月20日,阿里云发布“千问云”(www.qianwenai.com)——专为Agent时代打造的AI模型服务平台,集成150+主流模型API,首创Skills与CLI工具链,支持模型选型、调用、用量管理等全链路自动化,助力开发者与Agent高效构建AI应用。
1542 32
|
14天前
|
前端开发 机器人 API
用两行代码将 AgentRun 集成到你的应用
AgentRun支持OpenAI协议,改两行代码即可将Agent无缝接入现有应用,兼容Python/Node.js/Java等;同时提供SDK、UI嵌入、IM机器人、云事件触发五种集成方式,开箱即用全链路能力。
|
2月前
|
人工智能 监控 Kubernetes
LoongCollector + ACS Agent Sandbox:构建 AI Agent 生产级运行平台
文章介绍了阿里云ACSAgentSandbox与LoongCollector协同构建的AIAgent生产级运行平台,通过沙箱隔离保障运行时安全,并以高性能、全链路可观测能力解决Agent行为不可预测和执行风险难题。
1222 48
|
14天前
|
安全 Java C++
【Java基础】集合框架: ConcurrentHashMap核心原理:JDK1.7 vs 1.8+ 区别、线程安全实现、分段锁 vs CAS+synchronized、扩容机制
ConcurrentHashMap是Java高并发场景下线程安全的哈希表实现,JDK1.7采用Segment分段锁(16段独立加锁),JDK1.8升级为CAS+synchronized细粒度桶锁,并引入红黑树与多线程协助扩容,显著提升性能与扩展性。
|
14天前
|
存储 安全 Java
【Java基础】集合框架: HashMap核心原理:JDK1.7 vs 1.8+ 区别、数据结构、哈希函数、扩容机制、put/get全流程、红黑树转换阈值(附《思维导图》+《面试高频考点清单》)
本文系统对比JDK1.7与1.8+中HashMap的底层原理,涵盖数据结构(数组+链表→+红黑树)、哈希函数、扩容机制、插入方式及并发问题等核心差异,助你深入理解性能优化逻辑与面试高频考点。
|
14天前
|
消息中间件 负载均衡 算法
程序员必备的十大技能(进阶版)之分布式核心技术(一)
教程来源 http://unbgv.cn/ 本文系统剖析分布式核心技术,涵盖CAP/BASE理论、服务治理、一致性协议、分布式事务、锁、消息中间件、负载均衡、存储及可观测性九大维度,直击微服务演进中的核心挑战与落地实践。
|
14天前
|
人工智能 安全 测试技术
基于Harness + Langgraph + A2A 写一个 Agent Team,实现一支硅基团队自己 写代码
基于Harness + Langgraph + A2A 写一个 Agent Team,实现一支硅基团队自己 写代码
基于Harness + Langgraph + A2A 写一个 Agent Team,实现一支硅基团队自己 写代码
|
14天前
|
Ubuntu Linux KVM
虚拟机搭建教程(二)
教程来源 https://zlpow.cn/ 本文详解Windows、Linux三大平台虚拟化实战:Windows下用VMware安装Ubuntu 24.04(含Tools与快照),VirtualBox部署CentOS Stream 9;Linux主机通过KVM命令行及virt-manager搭建高性能虚拟机,覆盖配置、联网、增强工具与管理全流程。
|
14天前
|
存储 缓存 负载均衡
程序员必备的十大技能(进阶版)之分布式核心技术(五)
教程来源 http://yvyus.cn/ 本节系统讲解分布式核心能力:涵盖加权随机、最少活跃连接等负载均衡算法;Failover、Forking等容错策略;多级缓存与一致性保障;虚拟桶分片与平滑迁移;OpenTelemetry链路追踪及全链路日志透传,助力构建高可用、可观测的分布式系统。
|
14天前
|
消息中间件 程序员 RocketMQ
程序员必备的十大技能(进阶版)之分布式核心技术(三)
教程来源 http://vbzcj.cn/ 分布式事务三大主流方案:2PC(强一致、同步阻塞)、TCC(业务侵入、高灵活)、RocketMQ事务消息(最终一致、异步解耦),分别适用于金融核心、高并发微服务及积分订单等场景。