【工作中问题解决实践 二】分布式消息并发同步处理方案

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 【工作中问题解决实践 二】分布式消息并发同步处理方案

最近遇到一个问题,就是如何批量的进行数据迁移和计算,既要保证快,又要保证准,那么怎么保证呢?分布式及并发是必然要使用的,怎么保证呢,并发的同步(在共享内存并发模型里,同步是显式进行的。程序员必须显式指定某个方法或某段代码需要在线程之间互斥执行。执行操作有序性)是必然要使用的。

单机的实现方式

单机实现主要目的是用来判断批量运行时会报什么错,所以实现起来比较简单,直接开启一个线程池进行处理:

线程池设置

线程池的设置如下,核心线程只使用一个,因为准确性对于同步计算逻辑更重要,更多关于线程池的介绍:【Java并发编程 十二】JUC并发包下线程池

/**
     * 线程池设置
     */
    ThreadFactory threadFactory = new ThreadFactoryBuilder()
        .setNameFormat("syncStoreInfo-pool-%d")
        .setUncaughtExceptionHandler((t, e) -> LOGGER.warn("syncStoreInfo failed", e))
        .get();
    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 1,
        TimeUnit.MINUTES, new LinkedBlockingQueue<>(1000), threadFactory,
        new ThreadPoolExecutor.CallerRunsPolicy());

核心逻辑调用

核心逻辑调用如下,可以看到是通过线程池并发的对全部coopNos进行了一个消费。

/**
     * 同步全部合作下的全部门店
     *
     * @param coopNos the coop nos
     */
    public void syncBatchCooperation(List<String> coopNos) {
        if (CollectionUtils.isEmpty(coopNos)) {
            return;
        }
        for (String coopNo : coopNos) {
           executor.submit(() -> {
                try {
                    LOGGER.info("开始同步单个合作下的门店数据,coopNo = " + coopNo);
                    // 核心业务逻辑
                    this.syncOneCooperation(coopNo);
                } catch (Exception e) {
                    LOGGER.warn(MsgBuilder.build("合作下门店同步错误,coopNo = {}, errorMsg = {} ", coopNo, ExceptionUtils.getStackTrace(e)), e);
                }
            });
        }
    }

分布式实现方式

分布式的实现方式是为了让多台机器同时消费coopNos,增加速度,同时使用RedissonClient来保证分布式的一致性处理

核心逻辑调用

请求到来时先将代处理的消息都塞到分布式的redis队列中,具体的消费由分布式集群分组处理

/**
     * 同步全部合作下的全部门店
     *
     * @param coopNos the coop nos
     */
    public void syncBatchCooperation(List<String> coopNos) {
        if (CollectionUtils.isEmpty(coopNos)) {
            return;
        }
        for (String coopNo : coopNos) {
            // 线上运行时用redis队列+多线程,多台机器同时消费
            addCoopToAsyncWork(coopNo);
        }
    }

RedissonClient

引入RedissonClient,并且设置一个sortedSet队列,按照加入时间打分。将所有待处理的数据放到一个分布式消息队列中,然后由下游从队列中取数据进行处理。RedissonClient可以保证对多个计算节点保证同步。之前用C#做的时候搞的过于复杂了,参见这篇Blog:【Redis原理机制 六】Redis分布式锁深入、改进策略及RedLock算法

/**
     * 引入RedissonClient 
     */
    @Resource
    private RedissonClient redissonClient;
   /**
     * 队列设置
     */
    private RScoredSortedSet<String> coopNoQueue;
    /**
     * 是否运行
     */
    private boolean isRunning = false;
    /**
     * 队列初始化设置
     */
    @PostConstruct
    public void init() {
        coopNoQueue = redissonClient.getScoredSortedSet("COOP:STORE:SYNC:", StringCodec.INSTANCE);
    }
    /**
     * 队列增加元素
     */
    private void addCoopToAsyncWork(String data) {
        coopNoQueue.add(Double.longBitsToDouble(System.currentTimeMillis()), JsonUtils.obj2Str(data));
    }

SmartLifecycle

有些场景我们需要在Spring 所有的bean 完成初始化后紧接着执行一些任务或者启动需要的异步服务,首先实现接口SmartLifecycle并重写实现方法,通过SmartLifecycle进行容器级的生命周期管理,常见有几种解决方案:

  • j2ee 注解 启动前@PostConstruct 销毁前@PreDestroy 基于j2ee 规范
  • springboot 的 org.springframework.boot.CommandLineRunner
  • spring org.springframework.context.SmartLifecycle

SmartLifecycle 不仅仅能在初始化后执行一个逻辑,还能在关闭前执行一个逻辑,这里我们就在开始时执行并发消费逻辑,结束时结束消费逻辑、关闭线程池

/**
    * 是否运行
    */
    private boolean isRunning = false;
    /**
    * 容器启动时调用:运行中
    */
    @Override
    public void start() {
        isRunning = true;
        executor.execute(() -> {
            while (isRunning) {
                try {
                    Collection<String> coopNos = coopNoQueue.pollFirst(10);
                    if (CollectionUtils.isEmpty(coopNos)) {
                        TimeUnit.SECONDS.sleep(1);
                        continue;
                    }
                    LOGGER.info("开始同步一组合作下门店,合作数量: {}, coopNos : {}", coopNos.size(), JsonUtils.obj2Str(coopNos));
                    CountDownLatch count = new CountDownLatch(coopNos.size());
                    coopNos.forEach(coopNo -> {
                        try {
                            LOGGER.info("开始同步单个合作下门店,coopNo = " + coopNo);
                            syncOneCooperation(coopNo);
                        } catch (Exception e) {
                            LOGGER.warn(MsgBuilder.build("同步单个合作门店错误,coopNo = {}, errorMsg = {} ", coopNo, ExceptionUtils.getStackTrace(e)), e);
                        } finally {
                            count.countDown();
                        }
                    });
                    count.await();
                    LOGGER.info("同步本组合作下门店结束,数量:{}", coopNos.size());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (Exception e) {
                    LOGGER.warn("合作下门店同步结束", e);
                }
            }
        });
    }
   /**
    * 容器停止时调用:结束方法调用、关闭线程池
    */
    @Override
    @SneakyThrows
    public void stop() {
        isRunning = false;
        executor.shutdown();
        executor.awaitTermination(30L, TimeUnit.SECONDS);
    }
   /**
    * 检测bean容器是否运行
    */
    @Override
    public boolean isRunning() {
        return isRunning;
    }

CountDownLatch

我们使用CountDownLatch来进行线程同步,保证每组10个coopNos执行完成后再进行下一组执行,一定程度上保证分组数据的同步处理【Java并发编程 十】JUC并发包下的工具类,防止本地的coopNos数量不停变化,导致foreach里的执行逻辑有误。

CountDownLatch count = new CountDownLatch(coopNos.size());
        coopNos.forEach(coopNo -> {
            try {
                 LOGGER.info("开始同步单个合作下门店,coopNo = " + coopNo);
                 syncOneCooperation(coopNo);
                } catch (Exception e) {
            LOGGER.warn(MsgBuilder.build("同步单个合作门店错误,coopNo = {}, errorMsg = {} ", coopNo, ExceptionUtils.getStackTrace(e)), e);
                } finally {
                   count.countDown();
               }
           });
    count.await();


相关文章
|
1月前
|
人工智能 安全 Java
分布式 Multi Agent 安全高可用探索与实践
在人工智能加速发展的今天,AI Agent 正在成为推动“人工智能+”战略落地的核心引擎。无论是技术趋势还是政策导向,都预示着一场深刻的变革正在发生。如果你也在探索 Agent 的应用场景,欢迎关注 AgentScope 项目,或尝试使用阿里云 MSE + Higress + Nacos 构建属于你的 AI 原生应用。一起,走进智能体的新世界。
435 37
|
1月前
|
关系型数据库 Apache 微服务
《聊聊分布式》分布式系统基石:深入理解CAP理论及其工程实践
CAP理论指出分布式系统中一致性、可用性、分区容错性三者不可兼得,必须根据业务需求进行权衡。实际应用中,不同场景选择不同策略:金融系统重一致(CP),社交应用重可用(AP),内网系统可选CA。现代架构更趋向动态调整与混合策略,灵活应对复杂需求。
|
3月前
|
数据采集 消息中间件 监控
单机与分布式:社交媒体热点采集的实践经验
在舆情监控与数据分析中,单机脚本适合小规模采集如微博热榜,而小红书等大规模、高时效性需求则需分布式架构。通过Redis队列、代理IP与多节点协作,可提升采集效率与稳定性,适应数据规模与变化速度。架构选择应根据实际需求,兼顾扩展性与维护成本。
104 2
|
6月前
|
人工智能 安全 应用服务中间件
阿里巴巴 MCP 分布式落地实践:快速转换 HSF 到 MCP server
本文分享了阿里巴巴内部将大规模HSF服务快速转换为MCP Server的实践经验,通过Higress网关实现MCP协议卸载,无需修改代码即可接入MCP生态。文章分析了MCP生态面临的挑战,如协议快速迭代和SDK不稳定性,并详细介绍了操作步骤及组件功能。强调MCP虽非终极解决方案,但作为AI业务工程化的起点具有重要意义。最后总结指出,MCP只是AI原生应用发展的第一步,未来还有更多可能性值得探索。
1146 48
|
2月前
|
消息中间件 缓存 监控
中间件架构设计与实践:构建高性能分布式系统的核心基石
摘要 本文系统探讨了中间件技术及其在分布式系统中的核心价值。作者首先定义了中间件作为连接系统组件的&quot;神经网络&quot;,强调其在数据传输、系统稳定性和扩展性中的关键作用。随后详细分类了中间件体系,包括通信中间件(如RabbitMQ/Kafka)、数据中间件(如Redis/MyCAT)等类型。文章重点剖析了消息中间件的实现机制,通过Spring Boot代码示例展示了消息生产者的完整实现,涵盖消息ID生成、持久化、批量发送及重试机制等关键技术点。最后,作者指出中间件架构设计对系统性能的决定性影响,
|
4月前
|
数据采集 缓存 NoSQL
分布式新闻数据采集系统的同步效率优化实战
本文介绍了一个针对高频新闻站点的分布式爬虫系统优化方案。通过引入异步任务机制、本地缓存池、Redis pipeline 批量写入及身份池策略,系统采集效率提升近两倍,数据同步延迟显著降低,实现了分钟级热点追踪能力,为实时舆情监控与分析提供了高效、稳定的数据支持。
151 1
分布式新闻数据采集系统的同步效率优化实战
|
6月前
|
监控 Linux 应用服务中间件
Linux多节点多硬盘部署MinIO:分布式MinIO集群部署指南搭建高可用架构实践
通过以上步骤,已成功基于已有的 MinIO 服务,扩展为一个 MinIO 集群。该集群具有高可用性和容错性,适合生产环境使用。如果有任何问题,请检查日志或参考MinIO 官方文档。作者联系方式vx:2743642415。
2142 57
|
6月前
|
安全 JavaScript 前端开发
HarmonyOS NEXT~HarmonyOS 语言仓颉:下一代分布式开发语言的技术解析与应用实践
HarmonyOS语言仓颉是华为专为HarmonyOS生态系统设计的新型编程语言,旨在解决分布式环境下的开发挑战。它以“编码创造”为理念,具备分布式原生、高性能与高效率、安全可靠三大核心特性。仓颉语言通过内置分布式能力简化跨设备开发,提供统一的编程模型和开发体验。文章从语言基础、关键特性、开发实践及未来展望四个方面剖析其技术优势,助力开发者掌握这一新兴工具,构建全场景分布式应用。
661 35
|
6月前
|
人工智能 负载均衡 Java
Spring AI Alibaba 发布企业级 MCP 分布式部署方案
本文介绍了Spring AI Alibaba MCP的开发与应用,旨在解决企业级AI Agent在分布式环境下的部署和动态更新问题。通过集成Nacos,Spring AI Alibaba实现了流量负载均衡及节点变更动态感知等功能。开发者可方便地将企业内部业务系统发布为MCP服务或开发自己的AI Agent。文章详细描述了如何通过代理应用接入存量业务系统,以及全新MCP服务的开发流程,并提供了完整的配置示例和源码链接。未来,Spring AI Alibaba计划结合Nacos3的mcp-registry与mcp-router能力,进一步优化Agent开发体验。
2420 14
|
6月前
|
NoSQL 算法 安全
redis分布式锁在高并发场景下的方案设计与性能提升
本文探讨了Redis分布式锁在主从架构下失效的问题及其解决方案。首先通过CAP理论分析,Redis遵循AP原则,导致锁可能失效。针对此问题,提出两种解决方案:Zookeeper分布式锁(追求CP一致性)和Redlock算法(基于多个Redis实例提升可靠性)。文章还讨论了可能遇到的“坑”,如加从节点引发超卖问题、建议Redis节点数为奇数以及持久化策略对锁的影响。最后,从性能优化角度出发,介绍了减少锁粒度和分段锁的策略,并结合实际场景(如下单重复提交、支付与取消订单冲突)展示了分布式锁的应用方法。
493 3