【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列

本文涉及的产品
云原生网关 MSE Higress,422元/月
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 前言上文【从入门到放弃-ZooKeeper】ZooKeeper入门中,我们学习了ZooKeeper的简单安装和cli使用。接下来我们开始基于java API的实战编程。本文先来写一个分布式队列的代码实现。

前言

上文【从入门到放弃-ZooKeeper】ZooKeeper入门中,我们学习了ZooKeeper的简单安装和cli使用。
接下来我们开始基于java API的实战编程。本文先来写一个分布式队列的代码实现。

设计

我们来写一个先进先出的分布式无界公平队列。参考我们之前介绍的【从入门到放弃-Java】并发编程-JUC-ConcurrentLinkedQueue【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue。我们直接继承AbstractQueue类,并实现Queue接口。
主要重写offer、poll、peek、size方法。
我们使用ZooKeeper的持久化顺序节点来实现分布式队列。
offer是入队,入队时新创建一个持久化顺序节点,节点后缀会根据ZooKeeper的特性自动累加。
poll的出队,获取根节点下的所有节点,根据后缀数字排序,数组最小的是最先入队的,因此要最先出队。
peek,获取到最下入队的数据,和poll的区别是,peek只获取数据,不出队,不删除已经消费的节点。
size获取队列长度,实现方式是,获取根节点下的节点数量即可。这个方法在并发时可能会有问题。慎用。

DistributedQueue

//继承AbstractQueue类并实现Queue接口
public class DistributedQueue<E> extends AbstractQueue<E> implements Queue<E> {
    private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class);
    
    //ZooKeeper客户端,进行ZooKeeper操作
    private ZooKeeper zooKeeper;

    //根节点名称
    private String dir;

    //数据节点名称,顺序节点在插入口会变为 node{00000000xx} 格式
    private String node;

    //ZooKeeper鉴权信息
    private List<ACL> acls;

    /**
     * Constructor.
     *
     * @param zooKeeper the zoo keeper
     * @param dir       the dir
     * @param node      the node
     * @param acls      the acls
     */
   public DistributedQueue (ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
        this.zooKeeper = zooKeeper;
        this.dir = dir;
        this.node = node;
        this.acls = acls;
        init();
    }


    private void init() {
        //需要先判断根节点是否存在,不存在的话,创建子节点时会出错。
        try {
            Stat stat = zooKeeper.exists(dir, false);
            if (stat == null) {
                zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            logger.error("[DistributedQueue#init] error : " + e.toString(), e);
        }
    }
}
AI 代码解读

offer

/**
 * Offer boolean.
 *
 * @param o the o
 * @return the boolean
 */
@Override
public boolean offer(E o) {
    //构建要插入的节点名称
    String fullPath = dir.concat("/").concat(node);
    try {
        //创建子节点成功则返回入队成功
      zooKeeper.create(fullPath, objectToBytes(o), acls, CreateMode.PERSISTENT_SEQUENTIAL);
        return true;
    } catch (Exception e) {
        logger.error("[DistributedQueue#offer] error : " + e.toString(), e);
    }
    return false;
}
AI 代码解读

poll

/**
 * Poll e.
 *
 * @return the e
 */
@Override
public E poll() {
    try {
        //获取根节点所有子节点信息。
        List<String> children = zooKeeper.getChildren(dir, null);
        //如果队列是空的则返回null
        if (children == null || children.isEmpty()) {
            return null;
        }

        //将子节点名称排序
        Collections.sort(children);
        for (String child : children) {
            //拼接子节点的具体名称
            String fullPath = dir.concat("/").concat(child);
            try {
                //如果获取数据成功,则类型转换后,返回,并删除改队列中该节点
                byte[] bytes = zooKeeper.getData(fullPath, false, null);
                E data = (E) bytesToObject(bytes);
                zooKeeper.delete(fullPath, -1);
                return data;
            } catch (Exception e) {
                logger.warn("[DistributedQueue#poll] warn : " + e.toString(), e);
            }
        }

    } catch (Exception e) {
        logger.error("[DistributedQueue#peek] poll : " + e.toString(), e);
    }

    return null;
}
AI 代码解读

peek

/**
 * Peek e.
 *
 * @return the e
 */
@Override
public E peek() {
   
    try {
        //获取根节点所有子节点信息。
        List<String> children = zooKeeper.getChildren(dir, null);
        //如果队列是空的则返回null
        if (children == null || children.isEmpty()) {
            return null;
        }

        //将子节点名称排序
        Collections.sort(children);
        
        for (String child : children) {
            //拼接子节点的具体名称
            String fullPath = dir.concat("/").concat(child);
            try {
                //如果获取数据成功,则类型转换后,返回,不会删除改队列中该节点
                byte[] bytes = zooKeeper.getData(fullPath, false, null);
                E data = (E) bytesToObject(bytes);
                return data;
            } catch (Exception e) {
                logger.warn("[DistributedQueue#peek] warn : " + e.toString(), e);
            }
        }

    } catch (Exception e) {
        logger.error("[DistributedQueue#peek] warn : " + e.toString(), e);
    }

    return null;
}
AI 代码解读

size

/**
 * Size int.
 *
 * @return the int
 */
@Override
public int size() {
    try {
        //获取根节点的子节点名称
        List<String> children = zooKeeper.getChildren(dir, null);
        //返回子结点信息数量
        return children.size();
    } catch (Exception e) {
        logger.error("[DistributedQueue#offer] size : " + e.toString(), e);
    }

    return 0;
}
AI 代码解读

总结

上面我们一起学习了如何利用持久性顺序节点,创建一个分布式先进先出队列。源代码可见:aloofJr
如果有好的优化建议,欢迎一起讨论。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
aloof_
+关注
目录
打赏
0
1
1
0
417
分享
相关文章
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
236 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
396 0
分布式爬虫框架Scrapy-Redis实战指南
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
331 7
让回归模型不再被异常值"带跑偏",MSE和Cauchy损失函数在噪声数据环境下的实战对比
本文探讨了MSE与Cauchy损失函数在线性回归中的表现,特别是在含噪声数据环境下的差异。研究发现,MSE虽具良好数学性质,但对异常值敏感;而Cauchy通过其对数惩罚机制降低异常值影响,展现出更强稳定性。实验结果表明,Cauchy损失函数在处理含噪声数据时参数估计更接近真实值,为实际应用提供了更鲁棒的选择。
66 1
让回归模型不再被异常值"带跑偏",MSE和Cauchy损失函数在噪声数据环境下的实战对比
SpringBoot中@Scheduled和Quartz的区别是什么?分布式定时任务框架选型实战
本文对比分析了SpringBoot中的`@Scheduled`与Quartz定时任务框架。`@Scheduled`轻量易用,适合单机简单场景,但存在多实例重复执行、无持久化等缺陷;Quartz功能强大,支持分布式调度、任务持久化、动态调整和失败重试,适用于复杂企业级需求。文章通过特性对比、代码示例及常见问题解答,帮助开发者理解两者差异,合理选择方案。记住口诀:单机简单用注解,多节点上Quartz;若是任务要可靠,持久化配置不能少。
229 4
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
HarmonyOS Next 是华为新一代操作系统,专注于分布式技术的深度应用与生态融合。本文通过技术特点、应用场景及实战案例,全面解析其核心技术架构与开发流程。重点介绍分布式软总线2.0、数据管理、任务调度等升级特性,并提供基于 ArkTS 的原生开发支持。通过开发跨设备协同音乐播放应用,展示分布式能力的实际应用,涵盖项目配置、主界面设计、分布式服务实现及部署调试步骤。此外,深入分析分布式数据同步原理、任务调度优化及常见问题解决方案,帮助开发者掌握 HarmonyOS Next 的核心技术和实战技巧。
765 76
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
Python 高级编程与实战:深入理解并发编程与分布式系统
在前几篇文章中,我们探讨了 Python 的基础语法、面向对象编程、函数式编程、元编程、性能优化、调试技巧、数据科学、机器学习、Web 开发、API 设计、网络编程和异步IO。本文将深入探讨 Python 在并发编程和分布式系统中的应用,并通过实战项目帮助你掌握这些技术。
Python 高级编程与实战:构建分布式系统
本文深入探讨了 Python 中的分布式系统,介绍了 ZeroMQ、Celery 和 Dask 等工具的使用方法,并通过实战项目帮助读者掌握这些技术。ZeroMQ 是高性能异步消息库,支持多种通信模式;Celery 是分布式任务队列,支持异步任务执行;Dask 是并行计算库,适用于大规模数据处理。文章结合具体代码示例,帮助读者理解如何使用这些工具构建分布式系统。
大道至简-基于ACK的Deepseek满血版分布式推理部署实战
大道至简-基于ACK的Deepseek满血版分布式推理部署实战
247 5

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问