【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式竞选

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:

前言

上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁-升级版中,我们通过利用ZooKeeper的临时节点和Watcher特性,实现了一个分布式锁。
本文我们结合实际场景,完成一个分布式竞争选举。

设计

这里我们实现一个公平的选举方式,即先参加选举的优先被选为leader。
具体的实现思路 参考了ZooKeeper提供的官方示例:zookeeper-recipes-election

  • START:服务器开始竞选
  • OFFER:创建临时顺序结点
  • DETERMINE:开始决策,将临时节点按末尾序号从小到大排序,如果当前节点的序号最小,则竞选成功,否则,则Watch前一个节点,当前一个节点被删除时,再次进行决策
  • ELECTED:当前节点是序号最小的节点,竞选成功
  • READY:当前节点不是序号最小的节点,竞选不成功,Watch前一个节点,进入READY态
  • FAILED:当出现异常情况时,为失败状态
  • STOP:结束竞选

LeaderElectionSupport

public class LeaderElectionSupport implements LeaderElection{
    private static Logger logger = LoggerFactory.getLogger(LeaderElectionSupport.class);

    //ZooKeeper客户端,进行ZooKeeper操作
    private ZooKeeper zooKeeper;

    //根节点名称
    private String dir;

    //节点前缀
    private String node;

    //ZooKeeper鉴权信息
    private List<ACL> acls;

    //要加锁节点
    private String fullPath;

    //选举状态
    private State state;

    //监听器
    private Set<LeaderElectionListener> listeners;

    //存当前节点的信息
    private volatile LeaderNode leaderNode;

    //监察器
    private Watcher watcher;


    /**
     * Constructor.
     *
     * @param zooKeeper the zoo keeper
     * @param dir       the dir
     * @param node      the node
     * @param acls      the acls
     */
    public LeaderElectionSupport(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
        this.zooKeeper = zooKeeper;
        this.dir = dir;
        this.node = node;
        this.acls = acls;
        this.fullPath = dir.concat("/").concat(this.node);
        init();

        state = State.STOP;
        listeners = Collections.synchronizedSet(new HashSet<>());
    }

    /**
     * 初始化根节点、检查器等
     * */
    private void init() {
        try {
            watcher = new LeaderWatcher();
            Stat stat = zooKeeper.exists(dir, false);
            if (stat == null) {
                zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            logger.error("[LeaderElectionSupport#init] error : " + e.toString(), e);
        }
    }
}

start

/**
 * Start.
 * 开始竞选
 */
@Override
public void start() {
    synchronized (this) {
        state = State.START;
        dispatchEvent(EventType.START);
        offerElection();
        determineElection();
    }

}

offerElection

/**
 * 创建临时节点,参加竞选,并将主机信息保存在node中
 * */
private void offerElection() {
    dispatchEvent(EventType.OFFER_START);
    state = State.OFFER;
    if (leaderNode == null) {
        synchronized (this) {
            try {
                if (leaderNode == null) {
                    InetAddress ia = InetAddress.getLocalHost();
                    LeaderNode tmpNode = new LeaderNode();
                    tmpNode.setHostName(ia.getHostName());
                    String path = zooKeeper.create(fullPath, ConversionUtil.objectToBytes(ia.getHostName()), acls, CreateMode.EPHEMERAL_SEQUENTIAL);

                    tmpNode.setNodePath(path);
                    tmpNode.setId(NodeUtil.getNodeId(path));

                    leaderNode = tmpNode;
                }
            } catch (Exception e) {
                becomeFailed(e);
            }
        }
    }
    dispatchEvent(EventType.OFFER_COMPLETE);
}

determineElection

/**
 * 决定竞选结果
 * 1、竞选节点序号最低的赢取选举
 * 2、未赢得选举的节点,监听上一个节点,直到上一个节点被删除,则尝试重新竞选
 * */
private void determineElection() {
    dispatchEvent(EventType.DETERMINE_START);
    state = State.DETERMINE;
    synchronized (this) {
        TreeSet<String> nodePathSet = getNodePathSet();
        if (nodePathSet.isEmpty()) {
            becomeFailed(new Exception("no node"));
            return;
        }
        String leaderPath = nodePathSet.first();
        if (leaderNode.getNodePath().equalsIgnoreCase(leaderPath)) {
            becomeLeader();
        } else {
            becomeReady(nodePathSet.headSet(leaderNode.getNodePath()).last());
        }
    }
    dispatchEvent(EventType.DETERMINE_COMPLETE);
}

becomeLeader

/**
 * 竞选成功
 * */
private void becomeLeader() {
    dispatchEvent(EventType.ELECTED_START);
    state = State.ELECTED;
    dispatchEvent(EventType.ELECTED_COMPLETE);
}

becomeReady

/**
 * 竞选失败进入就绪态
 * */
private void becomeReady(String path) {
    try {
        Stat stat = zooKeeper.exists(path, watcher);

        if (stat == null) {
            determineElection();
        } else {
            dispatchEvent(EventType.READY_START);
            state = State.READY;
            dispatchEvent(EventType.READY_COMPLETE);
        }
    } catch (KeeperException e) {
        becomeFailed(e);
    } catch (InterruptedException e) {
        becomeFailed(e);
    }
}

becomeFailed

/**
 * 当发生异常时,更新为FAILED状态
 * */
private void becomeFailed(Exception e) {
    state = State.FAILED;
    dispatchEvent(EventType.FAILED);
    logger.error("[LeaderElectionSupport#becomeFailed] error : " + e.toString(), e);
}

getNodePathSet

/**
 * 获取参加竞选的节点信息
 * */
private TreeSet<String> getNodePathSet() {
    TreeSet<String> nodeSet = new TreeSet<>();
    try {
        List<String> nodes = zooKeeper.getChildren(dir, false);

        for (String node : nodes) {
            nodeSet.add(dir.concat("/").concat(node));
        }

    } catch (KeeperException e) {
        becomeFailed(e);
    } catch (InterruptedException e) {
        becomeFailed(e);
    }

    return nodeSet;
}

stop

/**
 * Stop.
 * 停止竞选
 */
@Override
public void stop() {
    synchronized (this) {
        dispatchEvent(EventType.STOP_START);
        deleteNode();
        state = State.STOP;
        dispatchEvent(EventType.STOP_COMPLETE);
    }
}

deleteNode

/**
 * 停止时,删除节点,退出竞选
 * */
private void deleteNode() {
    try {
        if (leaderNode != null) {
            synchronized (this) {
                zooKeeper.delete(leaderNode.getNodePath(), -1);
                leaderNode = null;
            }
        }
    } catch (InterruptedException e) {
        becomeFailed(e);
    } catch (KeeperException e) {
        becomeFailed(e);
    }
}

getLeaderHostName

/**
 * Gets get leader host name.
 *
 * @return the get leader host name
 */
@Override
public String getLeaderHostName() {
    synchronized (this) {
        TreeSet<String> nodePathSet = getNodePathSet();

        if (!nodePathSet.isEmpty()) {
            try {
                String leaderPath = nodePathSet.first();
                return (String) ConversionUtil.bytesToObject(zooKeeper.getData(leaderPath, false, null));
            } catch (KeeperException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            } catch (InterruptedException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            } catch (IOException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            } catch (ClassNotFoundException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            }
        }

        return null;
    }
}

getLeaderNodePath

/**
 * Gets get leader node path.
 *
 * @return the get leader node path
 */
@Override
public String getLeaderNodePath() {
    synchronized (this) {
        TreeSet<String> nodePathSet = getNodePathSet();

        return nodePathSet.isEmpty() ? null : nodePathSet.first();
    }

}

LeaderWatcher

/**
 * 内部watcher类,当竞选失败时,watch前一个节点,当前一个节点别移除时,再次发起决策
 * */
private class LeaderWatcher implements Watcher {
    /**
     * Process.
     *
     * @param watchedEvent the watched event
     */
    @Override
    public void process(WatchedEvent watchedEvent) {
        try {
            if (Event.EventType.NodeDeleted.equals(watchedEvent.getType()) && !State.STOP.equals(state)) {
                determineElection();
            }
        } catch (Exception e) {
            logger.error("[LeaderWatcher#process] error : " + e.toString(), e);
        }

    }
}

总结

以上就是我们利用ZooKeeper的临时节点和Watcher特性实现的公平模式分布式竞选。

可以进行简单的选主操作,适用于如执行单机定时任务、心跳检测等场景。实际上是实现的Master-Slave模型。

源代码可见:aloofJr

而对高可用要求较多的复杂选举场景,如分布式存储、同步等,则需要考虑集群一致性、脑裂等各种情况,则需要实现如Paxos、raft、Zab等一致性算法协议。如ZooKeeper集群的选举模式就是使用的Zab算法。
我们后续会进行深入的探讨。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
5天前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
135 2
|
5天前
|
存储 分布式计算 大数据
HBase分布式数据库关键技术与实战:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析了HBase的核心技术,包括数据模型、分布式架构、访问模式和一致性保证,并探讨了其实战应用,如大规模数据存储、实时数据分析及与Hadoop、Spark集成。同时,分享了面试经验,对比了HBase与其他数据库的差异,提出了应对挑战的解决方案,展望了HBase的未来趋势。通过Java API代码示例,帮助读者巩固理解。全面了解和掌握HBase,能为面试和实际工作中的大数据处理提供坚实基础。
55 3
|
5天前
|
Docker 容器 关系型数据库
【PolarDB-X从入门到精通】 第四讲:PolarDB分布式版安装部署(源码编译部署)
本期课程将于4月11日19:00开始直播,内容包括源码编译基础知识和实践操作,课程目标是使学员掌握源码编译部署技能,为未来发展奠定基础,期待大家在课程中取得丰富的学习成果!
【PolarDB-X从入门到精通】 第四讲:PolarDB分布式版安装部署(源码编译部署)
|
3天前
|
前端开发 JavaScript 算法
分布式系统的一致性级别划分及Zookeeper一致性级别分析
分布式系统的一致性级别划分及Zookeeper一致性级别分析
|
5天前
|
存储 大数据 Apache
深入理解ZooKeeper:分布式协调服务的核心与实践
【5月更文挑战第7天】ZooKeeper是Apache的分布式协调服务,确保大规模分布式系统中的数据一致性与高可用性。其特点包括强一致性、高可用性、可靠性、顺序性和实时性。使用ZooKeeper涉及安装配置、启动服务、客户端连接及执行操作。实际应用中,面临性能瓶颈、不可伸缩性和单点故障等问题,可通过水平扩展、集成其他服务和多集群备份来解决。理解ZooKeeper原理和实践,有助于构建高效分布式系统。
|
5天前
|
监控 NoSQL 算法
探秘Redis分布式锁:实战与注意事项
本文介绍了Redis分区容错中的分布式锁概念,包括利用Watch实现乐观锁和使用setnx防止库存超卖。乐观锁通过Watch命令监控键值变化,在事务中执行修改,若键值被改变则事务失败。Java代码示例展示了具体实现。setnx命令用于库存操作,确保无超卖,通过设置锁并检查库存来更新。文章还讨论了分布式锁存在的问题,如客户端阻塞、时钟漂移和单点故障,并提出了RedLock算法来提高可靠性。Redisson作为生产环境的分布式锁实现,提供了可重入锁、读写锁等高级功能。最后,文章对比了Redis、Zookeeper和etcd的分布式锁特性。
136 16
探秘Redis分布式锁:实战与注意事项
|
5天前
|
Dubbo Java 应用服务中间件
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
|
5天前
|
Java 网络安全 Apache
搭建Zookeeper集群:三台服务器,一场分布式之舞
搭建Zookeeper集群:三台服务器,一场分布式之舞
53 0
|
5天前
|
缓存 应用服务中间件 数据库
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(多级缓存设计分析)
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(多级缓存设计分析)
53 1
|
5天前
|
监控 Dubbo 前端开发
快速入门分布式系统与Dubbo+zookeeper Demo
快速入门分布式系统与Dubbo+zookeeper Demo
44 0