【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
简介: 前言上文【从入门到放弃-ZooKeeper】ZooKeeper入门中,我们学习了ZooKeeper的简单安装和cli使用。接下来我们开始基于java API的实战编程。本文先来写一个分布式队列的代码实现。

前言

上文【从入门到放弃-ZooKeeper】ZooKeeper入门中,我们学习了ZooKeeper的简单安装和cli使用。
接下来我们开始基于java API的实战编程。本文先来写一个分布式队列的代码实现。

设计

我们来写一个先进先出的分布式无界公平队列。参考我们之前介绍的【从入门到放弃-Java】并发编程-JUC-ConcurrentLinkedQueue【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue。我们直接继承AbstractQueue类,并实现Queue接口。
主要重写offer、poll、peek、size方法。
我们使用ZooKeeper的持久化顺序节点来实现分布式队列。
offer是入队,入队时新创建一个持久化顺序节点,节点后缀会根据ZooKeeper的特性自动累加。
poll的出队,获取根节点下的所有节点,根据后缀数字排序,数组最小的是最先入队的,因此要最先出队。
peek,获取到最下入队的数据,和poll的区别是,peek只获取数据,不出队,不删除已经消费的节点。
size获取队列长度,实现方式是,获取根节点下的节点数量即可。这个方法在并发时可能会有问题。慎用。

DistributedQueue

//继承AbstractQueue类并实现Queue接口
public class DistributedQueue<E> extends AbstractQueue<E> implements Queue<E> {
    private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class);
    
    //ZooKeeper客户端,进行ZooKeeper操作
    private ZooKeeper zooKeeper;

    //根节点名称
    private String dir;

    //数据节点名称,顺序节点在插入口会变为 node{00000000xx} 格式
    private String node;

    //ZooKeeper鉴权信息
    private List<ACL> acls;

    /**
     * Constructor.
     *
     * @param zooKeeper the zoo keeper
     * @param dir       the dir
     * @param node      the node
     * @param acls      the acls
     */
   public DistributedQueue (ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
        this.zooKeeper = zooKeeper;
        this.dir = dir;
        this.node = node;
        this.acls = acls;
        init();
    }


    private void init() {
        //需要先判断根节点是否存在,不存在的话,创建子节点时会出错。
        try {
            Stat stat = zooKeeper.exists(dir, false);
            if (stat == null) {
                zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            logger.error("[DistributedQueue#init] error : " + e.toString(), e);
        }
    }
}

offer

/**
 * Offer boolean.
 *
 * @param o the o
 * @return the boolean
 */
@Override
public boolean offer(E o) {
    //构建要插入的节点名称
    String fullPath = dir.concat("/").concat(node);
    try {
        //创建子节点成功则返回入队成功
      zooKeeper.create(fullPath, objectToBytes(o), acls, CreateMode.PERSISTENT_SEQUENTIAL);
        return true;
    } catch (Exception e) {
        logger.error("[DistributedQueue#offer] error : " + e.toString(), e);
    }
    return false;
}

poll

/**
 * Poll e.
 *
 * @return the e
 */
@Override
public E poll() {
    try {
        //获取根节点所有子节点信息。
        List<String> children = zooKeeper.getChildren(dir, null);
        //如果队列是空的则返回null
        if (children == null || children.isEmpty()) {
            return null;
        }

        //将子节点名称排序
        Collections.sort(children);
        for (String child : children) {
            //拼接子节点的具体名称
            String fullPath = dir.concat("/").concat(child);
            try {
                //如果获取数据成功,则类型转换后,返回,并删除改队列中该节点
                byte[] bytes = zooKeeper.getData(fullPath, false, null);
                E data = (E) bytesToObject(bytes);
                zooKeeper.delete(fullPath, -1);
                return data;
            } catch (Exception e) {
                logger.warn("[DistributedQueue#poll] warn : " + e.toString(), e);
            }
        }

    } catch (Exception e) {
        logger.error("[DistributedQueue#peek] poll : " + e.toString(), e);
    }

    return null;
}

peek

/**
 * Peek e.
 *
 * @return the e
 */
@Override
public E peek() {
   
    try {
        //获取根节点所有子节点信息。
        List<String> children = zooKeeper.getChildren(dir, null);
        //如果队列是空的则返回null
        if (children == null || children.isEmpty()) {
            return null;
        }

        //将子节点名称排序
        Collections.sort(children);
        
        for (String child : children) {
            //拼接子节点的具体名称
            String fullPath = dir.concat("/").concat(child);
            try {
                //如果获取数据成功,则类型转换后,返回,不会删除改队列中该节点
                byte[] bytes = zooKeeper.getData(fullPath, false, null);
                E data = (E) bytesToObject(bytes);
                return data;
            } catch (Exception e) {
                logger.warn("[DistributedQueue#peek] warn : " + e.toString(), e);
            }
        }

    } catch (Exception e) {
        logger.error("[DistributedQueue#peek] warn : " + e.toString(), e);
    }

    return null;
}

size

/**
 * Size int.
 *
 * @return the int
 */
@Override
public int size() {
    try {
        //获取根节点的子节点名称
        List<String> children = zooKeeper.getChildren(dir, null);
        //返回子结点信息数量
        return children.size();
    } catch (Exception e) {
        logger.error("[DistributedQueue#offer] size : " + e.toString(), e);
    }

    return 0;
}

总结

上面我们一起学习了如何利用持久性顺序节点,创建一个分布式先进先出队列。源代码可见:aloofJr
如果有好的优化建议,欢迎一起讨论。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
3月前
|
数据管理 API 调度
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
HarmonyOS Next 是华为新一代操作系统,专注于分布式技术的深度应用与生态融合。本文通过技术特点、应用场景及实战案例,全面解析其核心技术架构与开发流程。重点介绍分布式软总线2.0、数据管理、任务调度等升级特性,并提供基于 ArkTS 的原生开发支持。通过开发跨设备协同音乐播放应用,展示分布式能力的实际应用,涵盖项目配置、主界面设计、分布式服务实现及部署调试步骤。此外,深入分析分布式数据同步原理、任务调度优化及常见问题解决方案,帮助开发者掌握 HarmonyOS Next 的核心技术和实战技巧。
327 76
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
|
10天前
|
机器学习/深度学习 分布式计算 API
Python 高级编程与实战:深入理解并发编程与分布式系统
在前几篇文章中,我们探讨了 Python 的基础语法、面向对象编程、函数式编程、元编程、性能优化、调试技巧、数据科学、机器学习、Web 开发、API 设计、网络编程和异步IO。本文将深入探讨 Python 在并发编程和分布式系统中的应用,并通过实战项目帮助你掌握这些技术。
|
8天前
|
消息中间件 分布式计算 并行计算
Python 高级编程与实战:构建分布式系统
本文深入探讨了 Python 中的分布式系统,介绍了 ZeroMQ、Celery 和 Dask 等工具的使用方法,并通过实战项目帮助读者掌握这些技术。ZeroMQ 是高性能异步消息库,支持多种通信模式;Celery 是分布式任务队列,支持异步任务执行;Dask 是并行计算库,适用于大规模数据处理。文章结合具体代码示例,帮助读者理解如何使用这些工具构建分布式系统。
|
12天前
|
人工智能 Kubernetes 异构计算
大道至简-基于ACK的Deepseek满血版分布式推理部署实战
大道至简-基于ACK的Deepseek满血版分布式推理部署实战
|
1月前
|
人工智能 Kubernetes 异构计算
大道至简-基于ACK的Deepseek满血版分布式推理部署实战
本教程演示如何在ACK中多机分布式部署DeepSeek R1满血版。
|
2月前
|
存储 缓存 Java
Java中的分布式缓存与Memcached集成实战
通过在Java项目中集成Memcached,可以显著提升系统的性能和响应速度。合理的缓存策略、分布式架构设计和异常处理机制是实现高效缓存的关键。希望本文提供的实战示例和优化建议能够帮助开发者更好地应用Memcached,实现高性能的分布式缓存解决方案。
50 9
|
3月前
|
物联网 调度 vr&ar
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
鸿蒙技术分享:HarmonyOS Next 深度解析 随着万物互联时代的到来,华为发布的 HarmonyOS Next 在技术架构和生态体验上实现了重大升级。本文从技术架构、生态优势和开发实践三方面深入探讨其特点,并通过跨设备笔记应用实战案例,展示其强大的分布式能力和多设备协作功能。核心亮点包括新一代微内核架构、统一开发语言 ArkTS 和多模态交互支持。开发者可借助 DevEco Studio 4.0 快速上手,体验高效、灵活的开发过程。 239个字符
266 13
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
|
3月前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
3月前
|
NoSQL Java Redis
秒杀抢购场景下实战JVM级别锁与分布式锁
在电商系统中,秒杀抢购活动是一种常见的营销手段。它通过设定极低的价格和有限的商品数量,吸引大量用户在特定时间点抢购,从而迅速增加销量、提升品牌曝光度和用户活跃度。然而,这种活动也对系统的性能和稳定性提出了极高的要求。特别是在秒杀开始的瞬间,系统需要处理海量的并发请求,同时确保数据的准确性和一致性。 为了解决这些问题,系统开发者们引入了锁机制。锁机制是一种用于控制对共享资源的并发访问的技术,它能够确保在同一时间只有一个进程或线程能够操作某个资源,从而避免数据不一致或冲突。在秒杀抢购场景下,锁机制显得尤为重要,它能够保证商品库存的扣减操作是原子性的,避免出现超卖或数据不一致的情况。
112 10
|
4月前
|
存储 运维 NoSQL
分布式读写锁的奥义:上古世代 ZooKeeper 的进击
本文作者将介绍女娲对社区 ZooKeeper 在分布式读写锁实践细节上的思考,希望帮助大家理解分布式读写锁背后的原理。
126 11

热门文章

最新文章