【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式竞选

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介:

前言

上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁-升级版中,我们通过利用ZooKeeper的临时节点和Watcher特性,实现了一个分布式锁。
本文我们结合实际场景,完成一个分布式竞争选举。

设计

这里我们实现一个公平的选举方式,即先参加选举的优先被选为leader。
具体的实现思路 参考了ZooKeeper提供的官方示例:zookeeper-recipes-election

  • START:服务器开始竞选
  • OFFER:创建临时顺序结点
  • DETERMINE:开始决策,将临时节点按末尾序号从小到大排序,如果当前节点的序号最小,则竞选成功,否则,则Watch前一个节点,当前一个节点被删除时,再次进行决策
  • ELECTED:当前节点是序号最小的节点,竞选成功
  • READY:当前节点不是序号最小的节点,竞选不成功,Watch前一个节点,进入READY态
  • FAILED:当出现异常情况时,为失败状态
  • STOP:结束竞选

LeaderElectionSupport

public class LeaderElectionSupport implements LeaderElection{
    private static Logger logger = LoggerFactory.getLogger(LeaderElectionSupport.class);

    //ZooKeeper客户端,进行ZooKeeper操作
    private ZooKeeper zooKeeper;

    //根节点名称
    private String dir;

    //节点前缀
    private String node;

    //ZooKeeper鉴权信息
    private List<ACL> acls;

    //要加锁节点
    private String fullPath;

    //选举状态
    private State state;

    //监听器
    private Set<LeaderElectionListener> listeners;

    //存当前节点的信息
    private volatile LeaderNode leaderNode;

    //监察器
    private Watcher watcher;


    /**
     * Constructor.
     *
     * @param zooKeeper the zoo keeper
     * @param dir       the dir
     * @param node      the node
     * @param acls      the acls
     */
    public LeaderElectionSupport(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
        this.zooKeeper = zooKeeper;
        this.dir = dir;
        this.node = node;
        this.acls = acls;
        this.fullPath = dir.concat("/").concat(this.node);
        init();

        state = State.STOP;
        listeners = Collections.synchronizedSet(new HashSet<>());
    }

    /**
     * 初始化根节点、检查器等
     * */
    private void init() {
        try {
            watcher = new LeaderWatcher();
            Stat stat = zooKeeper.exists(dir, false);
            if (stat == null) {
                zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            logger.error("[LeaderElectionSupport#init] error : " + e.toString(), e);
        }
    }
}

start

/**
 * Start.
 * 开始竞选
 */
@Override
public void start() {
    synchronized (this) {
        state = State.START;
        dispatchEvent(EventType.START);
        offerElection();
        determineElection();
    }

}

offerElection

/**
 * 创建临时节点,参加竞选,并将主机信息保存在node中
 * */
private void offerElection() {
    dispatchEvent(EventType.OFFER_START);
    state = State.OFFER;
    if (leaderNode == null) {
        synchronized (this) {
            try {
                if (leaderNode == null) {
                    InetAddress ia = InetAddress.getLocalHost();
                    LeaderNode tmpNode = new LeaderNode();
                    tmpNode.setHostName(ia.getHostName());
                    String path = zooKeeper.create(fullPath, ConversionUtil.objectToBytes(ia.getHostName()), acls, CreateMode.EPHEMERAL_SEQUENTIAL);

                    tmpNode.setNodePath(path);
                    tmpNode.setId(NodeUtil.getNodeId(path));

                    leaderNode = tmpNode;
                }
            } catch (Exception e) {
                becomeFailed(e);
            }
        }
    }
    dispatchEvent(EventType.OFFER_COMPLETE);
}

determineElection

/**
 * 决定竞选结果
 * 1、竞选节点序号最低的赢取选举
 * 2、未赢得选举的节点,监听上一个节点,直到上一个节点被删除,则尝试重新竞选
 * */
private void determineElection() {
    dispatchEvent(EventType.DETERMINE_START);
    state = State.DETERMINE;
    synchronized (this) {
        TreeSet<String> nodePathSet = getNodePathSet();
        if (nodePathSet.isEmpty()) {
            becomeFailed(new Exception("no node"));
            return;
        }
        String leaderPath = nodePathSet.first();
        if (leaderNode.getNodePath().equalsIgnoreCase(leaderPath)) {
            becomeLeader();
        } else {
            becomeReady(nodePathSet.headSet(leaderNode.getNodePath()).last());
        }
    }
    dispatchEvent(EventType.DETERMINE_COMPLETE);
}

becomeLeader

/**
 * 竞选成功
 * */
private void becomeLeader() {
    dispatchEvent(EventType.ELECTED_START);
    state = State.ELECTED;
    dispatchEvent(EventType.ELECTED_COMPLETE);
}

becomeReady

/**
 * 竞选失败进入就绪态
 * */
private void becomeReady(String path) {
    try {
        Stat stat = zooKeeper.exists(path, watcher);

        if (stat == null) {
            determineElection();
        } else {
            dispatchEvent(EventType.READY_START);
            state = State.READY;
            dispatchEvent(EventType.READY_COMPLETE);
        }
    } catch (KeeperException e) {
        becomeFailed(e);
    } catch (InterruptedException e) {
        becomeFailed(e);
    }
}

becomeFailed

/**
 * 当发生异常时,更新为FAILED状态
 * */
private void becomeFailed(Exception e) {
    state = State.FAILED;
    dispatchEvent(EventType.FAILED);
    logger.error("[LeaderElectionSupport#becomeFailed] error : " + e.toString(), e);
}

getNodePathSet

/**
 * 获取参加竞选的节点信息
 * */
private TreeSet<String> getNodePathSet() {
    TreeSet<String> nodeSet = new TreeSet<>();
    try {
        List<String> nodes = zooKeeper.getChildren(dir, false);

        for (String node : nodes) {
            nodeSet.add(dir.concat("/").concat(node));
        }

    } catch (KeeperException e) {
        becomeFailed(e);
    } catch (InterruptedException e) {
        becomeFailed(e);
    }

    return nodeSet;
}

stop

/**
 * Stop.
 * 停止竞选
 */
@Override
public void stop() {
    synchronized (this) {
        dispatchEvent(EventType.STOP_START);
        deleteNode();
        state = State.STOP;
        dispatchEvent(EventType.STOP_COMPLETE);
    }
}

deleteNode

/**
 * 停止时,删除节点,退出竞选
 * */
private void deleteNode() {
    try {
        if (leaderNode != null) {
            synchronized (this) {
                zooKeeper.delete(leaderNode.getNodePath(), -1);
                leaderNode = null;
            }
        }
    } catch (InterruptedException e) {
        becomeFailed(e);
    } catch (KeeperException e) {
        becomeFailed(e);
    }
}

getLeaderHostName

/**
 * Gets get leader host name.
 *
 * @return the get leader host name
 */
@Override
public String getLeaderHostName() {
    synchronized (this) {
        TreeSet<String> nodePathSet = getNodePathSet();

        if (!nodePathSet.isEmpty()) {
            try {
                String leaderPath = nodePathSet.first();
                return (String) ConversionUtil.bytesToObject(zooKeeper.getData(leaderPath, false, null));
            } catch (KeeperException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            } catch (InterruptedException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            } catch (IOException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            } catch (ClassNotFoundException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            }
        }

        return null;
    }
}

getLeaderNodePath

/**
 * Gets get leader node path.
 *
 * @return the get leader node path
 */
@Override
public String getLeaderNodePath() {
    synchronized (this) {
        TreeSet<String> nodePathSet = getNodePathSet();

        return nodePathSet.isEmpty() ? null : nodePathSet.first();
    }

}

LeaderWatcher

/**
 * 内部watcher类,当竞选失败时,watch前一个节点,当前一个节点别移除时,再次发起决策
 * */
private class LeaderWatcher implements Watcher {
    /**
     * Process.
     *
     * @param watchedEvent the watched event
     */
    @Override
    public void process(WatchedEvent watchedEvent) {
        try {
            if (Event.EventType.NodeDeleted.equals(watchedEvent.getType()) && !State.STOP.equals(state)) {
                determineElection();
            }
        } catch (Exception e) {
            logger.error("[LeaderWatcher#process] error : " + e.toString(), e);
        }

    }
}

总结

以上就是我们利用ZooKeeper的临时节点和Watcher特性实现的公平模式分布式竞选。

可以进行简单的选主操作,适用于如执行单机定时任务、心跳检测等场景。实际上是实现的Master-Slave模型。

源代码可见:aloofJr

而对高可用要求较多的复杂选举场景,如分布式存储、同步等,则需要考虑集群一致性、脑裂等各种情况,则需要实现如Paxos、raft、Zab等一致性算法协议。如ZooKeeper集群的选举模式就是使用的Zab算法。
我们后续会进行深入的探讨。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
13天前
|
数据管理 API 调度
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
HarmonyOS Next 是华为新一代操作系统,专注于分布式技术的深度应用与生态融合。本文通过技术特点、应用场景及实战案例,全面解析其核心技术架构与开发流程。重点介绍分布式软总线2.0、数据管理、任务调度等升级特性,并提供基于 ArkTS 的原生开发支持。通过开发跨设备协同音乐播放应用,展示分布式能力的实际应用,涵盖项目配置、主界面设计、分布式服务实现及部署调试步骤。此外,深入分析分布式数据同步原理、任务调度优化及常见问题解决方案,帮助开发者掌握 HarmonyOS Next 的核心技术和实战技巧。
141 76
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
|
13天前
|
物联网 调度 vr&ar
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
鸿蒙技术分享:HarmonyOS Next 深度解析 随着万物互联时代的到来,华为发布的 HarmonyOS Next 在技术架构和生态体验上实现了重大升级。本文从技术架构、生态优势和开发实践三方面深入探讨其特点,并通过跨设备笔记应用实战案例,展示其强大的分布式能力和多设备协作功能。核心亮点包括新一代微内核架构、统一开发语言 ArkTS 和多模态交互支持。开发者可借助 DevEco Studio 4.0 快速上手,体验高效、灵活的开发过程。 239个字符
180 13
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
|
10天前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
20天前
|
NoSQL Java Redis
秒杀抢购场景下实战JVM级别锁与分布式锁
在电商系统中,秒杀抢购活动是一种常见的营销手段。它通过设定极低的价格和有限的商品数量,吸引大量用户在特定时间点抢购,从而迅速增加销量、提升品牌曝光度和用户活跃度。然而,这种活动也对系统的性能和稳定性提出了极高的要求。特别是在秒杀开始的瞬间,系统需要处理海量的并发请求,同时确保数据的准确性和一致性。 为了解决这些问题,系统开发者们引入了锁机制。锁机制是一种用于控制对共享资源的并发访问的技术,它能够确保在同一时间只有一个进程或线程能够操作某个资源,从而避免数据不一致或冲突。在秒杀抢购场景下,锁机制显得尤为重要,它能够保证商品库存的扣减操作是原子性的,避免出现超卖或数据不一致的情况。
50 10
|
2月前
|
存储 运维 NoSQL
分布式读写锁的奥义:上古世代 ZooKeeper 的进击
本文作者将介绍女娲对社区 ZooKeeper 在分布式读写锁实践细节上的思考,希望帮助大家理解分布式读写锁背后的原理。
|
3月前
|
消息中间件 关系型数据库 Java
‘分布式事务‘ 圣经:从入门到精通,架构师尼恩最新、最全详解 (50+图文4万字全面总结 )
本文 是 基于尼恩之前写的一篇 分布式事务的文章 升级而来 , 尼恩之前写的 分布式事务的文章, 在全网阅读量 100万次以上 , 被很多培训机构 作为 顶级教程。 此文修改了 老版本的 一个大bug , 大家不要再看老版本啦。
|
3月前
|
NoSQL Java Redis
开发实战:使用Redisson实现分布式延时消息,订单30分钟关闭的另外一种实现!
本文详细介绍了 Redisson 延迟队列(DelayedQueue)的实现原理,包括基本使用、内部数据结构、基本流程、发送和获取延时消息以及初始化延时队列等内容。文章通过代码示例和流程图,逐步解析了延迟消息的发送、接收及处理机制,帮助读者深入了解 Redisson 延迟队列的工作原理。
|
3月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
61 2
|
3月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
30天前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
101 5