【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式竞选

简介:

前言

上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁-升级版中,我们通过利用ZooKeeper的临时节点和Watcher特性,实现了一个分布式锁。
本文我们结合实际场景,完成一个分布式竞争选举。

设计

这里我们实现一个公平的选举方式,即先参加选举的优先被选为leader。
具体的实现思路 参考了ZooKeeper提供的官方示例:zookeeper-recipes-election

  • START:服务器开始竞选
  • OFFER:创建临时顺序结点
  • DETERMINE:开始决策,将临时节点按末尾序号从小到大排序,如果当前节点的序号最小,则竞选成功,否则,则Watch前一个节点,当前一个节点被删除时,再次进行决策
  • ELECTED:当前节点是序号最小的节点,竞选成功
  • READY:当前节点不是序号最小的节点,竞选不成功,Watch前一个节点,进入READY态
  • FAILED:当出现异常情况时,为失败状态
  • STOP:结束竞选

LeaderElectionSupport

public class LeaderElectionSupport implements LeaderElection{
    private static Logger logger = LoggerFactory.getLogger(LeaderElectionSupport.class);

    //ZooKeeper客户端,进行ZooKeeper操作
    private ZooKeeper zooKeeper;

    //根节点名称
    private String dir;

    //节点前缀
    private String node;

    //ZooKeeper鉴权信息
    private List<ACL> acls;

    //要加锁节点
    private String fullPath;

    //选举状态
    private State state;

    //监听器
    private Set<LeaderElectionListener> listeners;

    //存当前节点的信息
    private volatile LeaderNode leaderNode;

    //监察器
    private Watcher watcher;


    /**
     * Constructor.
     *
     * @param zooKeeper the zoo keeper
     * @param dir       the dir
     * @param node      the node
     * @param acls      the acls
     */
    public LeaderElectionSupport(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
        this.zooKeeper = zooKeeper;
        this.dir = dir;
        this.node = node;
        this.acls = acls;
        this.fullPath = dir.concat("/").concat(this.node);
        init();

        state = State.STOP;
        listeners = Collections.synchronizedSet(new HashSet<>());
    }

    /**
     * 初始化根节点、检查器等
     * */
    private void init() {
        try {
            watcher = new LeaderWatcher();
            Stat stat = zooKeeper.exists(dir, false);
            if (stat == null) {
                zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            logger.error("[LeaderElectionSupport#init] error : " + e.toString(), e);
        }
    }
}

start

/**
 * Start.
 * 开始竞选
 */
@Override
public void start() {
    synchronized (this) {
        state = State.START;
        dispatchEvent(EventType.START);
        offerElection();
        determineElection();
    }

}

offerElection

/**
 * 创建临时节点,参加竞选,并将主机信息保存在node中
 * */
private void offerElection() {
    dispatchEvent(EventType.OFFER_START);
    state = State.OFFER;
    if (leaderNode == null) {
        synchronized (this) {
            try {
                if (leaderNode == null) {
                    InetAddress ia = InetAddress.getLocalHost();
                    LeaderNode tmpNode = new LeaderNode();
                    tmpNode.setHostName(ia.getHostName());
                    String path = zooKeeper.create(fullPath, ConversionUtil.objectToBytes(ia.getHostName()), acls, CreateMode.EPHEMERAL_SEQUENTIAL);

                    tmpNode.setNodePath(path);
                    tmpNode.setId(NodeUtil.getNodeId(path));

                    leaderNode = tmpNode;
                }
            } catch (Exception e) {
                becomeFailed(e);
            }
        }
    }
    dispatchEvent(EventType.OFFER_COMPLETE);
}

determineElection

/**
 * 决定竞选结果
 * 1、竞选节点序号最低的赢取选举
 * 2、未赢得选举的节点,监听上一个节点,直到上一个节点被删除,则尝试重新竞选
 * */
private void determineElection() {
    dispatchEvent(EventType.DETERMINE_START);
    state = State.DETERMINE;
    synchronized (this) {
        TreeSet<String> nodePathSet = getNodePathSet();
        if (nodePathSet.isEmpty()) {
            becomeFailed(new Exception("no node"));
            return;
        }
        String leaderPath = nodePathSet.first();
        if (leaderNode.getNodePath().equalsIgnoreCase(leaderPath)) {
            becomeLeader();
        } else {
            becomeReady(nodePathSet.headSet(leaderNode.getNodePath()).last());
        }
    }
    dispatchEvent(EventType.DETERMINE_COMPLETE);
}

becomeLeader

/**
 * 竞选成功
 * */
private void becomeLeader() {
    dispatchEvent(EventType.ELECTED_START);
    state = State.ELECTED;
    dispatchEvent(EventType.ELECTED_COMPLETE);
}

becomeReady

/**
 * 竞选失败进入就绪态
 * */
private void becomeReady(String path) {
    try {
        Stat stat = zooKeeper.exists(path, watcher);

        if (stat == null) {
            determineElection();
        } else {
            dispatchEvent(EventType.READY_START);
            state = State.READY;
            dispatchEvent(EventType.READY_COMPLETE);
        }
    } catch (KeeperException e) {
        becomeFailed(e);
    } catch (InterruptedException e) {
        becomeFailed(e);
    }
}

becomeFailed

/**
 * 当发生异常时,更新为FAILED状态
 * */
private void becomeFailed(Exception e) {
    state = State.FAILED;
    dispatchEvent(EventType.FAILED);
    logger.error("[LeaderElectionSupport#becomeFailed] error : " + e.toString(), e);
}

getNodePathSet

/**
 * 获取参加竞选的节点信息
 * */
private TreeSet<String> getNodePathSet() {
    TreeSet<String> nodeSet = new TreeSet<>();
    try {
        List<String> nodes = zooKeeper.getChildren(dir, false);

        for (String node : nodes) {
            nodeSet.add(dir.concat("/").concat(node));
        }

    } catch (KeeperException e) {
        becomeFailed(e);
    } catch (InterruptedException e) {
        becomeFailed(e);
    }

    return nodeSet;
}

stop

/**
 * Stop.
 * 停止竞选
 */
@Override
public void stop() {
    synchronized (this) {
        dispatchEvent(EventType.STOP_START);
        deleteNode();
        state = State.STOP;
        dispatchEvent(EventType.STOP_COMPLETE);
    }
}

deleteNode

/**
 * 停止时,删除节点,退出竞选
 * */
private void deleteNode() {
    try {
        if (leaderNode != null) {
            synchronized (this) {
                zooKeeper.delete(leaderNode.getNodePath(), -1);
                leaderNode = null;
            }
        }
    } catch (InterruptedException e) {
        becomeFailed(e);
    } catch (KeeperException e) {
        becomeFailed(e);
    }
}

getLeaderHostName

/**
 * Gets get leader host name.
 *
 * @return the get leader host name
 */
@Override
public String getLeaderHostName() {
    synchronized (this) {
        TreeSet<String> nodePathSet = getNodePathSet();

        if (!nodePathSet.isEmpty()) {
            try {
                String leaderPath = nodePathSet.first();
                return (String) ConversionUtil.bytesToObject(zooKeeper.getData(leaderPath, false, null));
            } catch (KeeperException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            } catch (InterruptedException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            } catch (IOException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            } catch (ClassNotFoundException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            }
        }

        return null;
    }
}

getLeaderNodePath

/**
 * Gets get leader node path.
 *
 * @return the get leader node path
 */
@Override
public String getLeaderNodePath() {
    synchronized (this) {
        TreeSet<String> nodePathSet = getNodePathSet();

        return nodePathSet.isEmpty() ? null : nodePathSet.first();
    }

}

LeaderWatcher

/**
 * 内部watcher类,当竞选失败时,watch前一个节点,当前一个节点别移除时,再次发起决策
 * */
private class LeaderWatcher implements Watcher {
    /**
     * Process.
     *
     * @param watchedEvent the watched event
     */
    @Override
    public void process(WatchedEvent watchedEvent) {
        try {
            if (Event.EventType.NodeDeleted.equals(watchedEvent.getType()) && !State.STOP.equals(state)) {
                determineElection();
            }
        } catch (Exception e) {
            logger.error("[LeaderWatcher#process] error : " + e.toString(), e);
        }

    }
}

总结

以上就是我们利用ZooKeeper的临时节点和Watcher特性实现的公平模式分布式竞选。

可以进行简单的选主操作,适用于如执行单机定时任务、心跳检测等场景。实际上是实现的Master-Slave模型。

源代码可见:aloofJr

而对高可用要求较多的复杂选举场景,如分布式存储、同步等,则需要考虑集群一致性、脑裂等各种情况,则需要实现如Paxos、raft、Zab等一致性算法协议。如ZooKeeper集群的选举模式就是使用的Zab算法。
我们后续会进行深入的探讨。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

目录
相关文章
|
9月前
|
人工智能 Kubernetes 数据可视化
Kubernetes下的分布式采集系统设计与实战:趋势监测失效引发的架构进化
本文回顾了一次关键词监测任务在容器集群中失效的全过程,分析了中转IP复用、调度节奏和异常处理等隐性风险,并提出通过解耦架构、动态IP分发和行为模拟优化采集策略,最终实现稳定高效的数据抓取与分析。
160 2
Kubernetes下的分布式采集系统设计与实战:趋势监测失效引发的架构进化
|
6月前
|
消息中间件 分布式计算 资源调度
《聊聊分布式》ZooKeeper与ZAB协议:分布式协调的核心引擎
ZooKeeper是一个开源的分布式协调服务,基于ZAB协议实现数据一致性,提供分布式锁、配置管理、领导者选举等核心功能,具有高可用、强一致和简单易用的特点,广泛应用于Kafka、Hadoop等大型分布式系统中。
|
7月前
|
存储 算法 安全
“卧槽,系统又崩了!”——别慌,这也许是你看过最通俗易懂的分布式入门
本文深入解析分布式系统核心机制:数据分片与冗余副本实现扩展与高可用,租约、多数派及Gossip协议保障一致性与容错。探讨节点故障、网络延迟等挑战,揭示CFT/BFT容错原理,剖析规模与性能关系,为构建可靠分布式系统提供理论支撑。
335 2
|
9月前
|
数据采集 缓存 NoSQL
分布式新闻数据采集系统的同步效率优化实战
本文介绍了一个针对高频新闻站点的分布式爬虫系统优化方案。通过引入异步任务机制、本地缓存池、Redis pipeline 批量写入及身份池策略,系统采集效率提升近两倍,数据同步延迟显著降低,实现了分钟级热点追踪能力,为实时舆情监控与分析提供了高效、稳定的数据支持。
365 1
分布式新闻数据采集系统的同步效率优化实战
|
10月前
|
数据采集 机器学习/深度学习 数据可视化
让回归模型不再被异常值"带跑偏",MSE和Cauchy损失函数在噪声数据环境下的实战对比
本文探讨了MSE与Cauchy损失函数在线性回归中的表现,特别是在含噪声数据环境下的差异。研究发现,MSE虽具良好数学性质,但对异常值敏感;而Cauchy通过其对数惩罚机制降低异常值影响,展现出更强稳定性。实验结果表明,Cauchy损失函数在处理含噪声数据时参数估计更接近真实值,为实际应用提供了更鲁棒的选择。
371 1
让回归模型不再被异常值"带跑偏",MSE和Cauchy损失函数在噪声数据环境下的实战对比
|
10月前
|
缓存 NoSQL 算法
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
2696 7
|
11月前
|
监控 Java 调度
SpringBoot中@Scheduled和Quartz的区别是什么?分布式定时任务框架选型实战
本文对比分析了SpringBoot中的`@Scheduled`与Quartz定时任务框架。`@Scheduled`轻量易用,适合单机简单场景,但存在多实例重复执行、无持久化等缺陷;Quartz功能强大,支持分布式调度、任务持久化、动态调整和失败重试,适用于复杂企业级需求。文章通过特性对比、代码示例及常见问题解答,帮助开发者理解两者差异,合理选择方案。记住口诀:单机简单用注解,多节点上Quartz;若是任务要可靠,持久化配置不能少。
962 4
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1