【工作中问题解决实践 二】分布式消息并发同步处理方案

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 【工作中问题解决实践 二】分布式消息并发同步处理方案

最近遇到一个问题,就是如何批量的进行数据迁移和计算,既要保证快,又要保证准,那么怎么保证呢?分布式及并发是必然要使用的,怎么保证呢,并发的同步(在共享内存并发模型里,同步是显式进行的。程序员必须显式指定某个方法或某段代码需要在线程之间互斥执行。执行操作有序性)是必然要使用的。

单机的实现方式

单机实现主要目的是用来判断批量运行时会报什么错,所以实现起来比较简单,直接开启一个线程池进行处理:

线程池设置

线程池的设置如下,核心线程只使用一个,因为准确性对于同步计算逻辑更重要,更多关于线程池的介绍:【Java并发编程 十二】JUC并发包下线程池

/**
     * 线程池设置
     */
    ThreadFactory threadFactory = new ThreadFactoryBuilder()
        .setNameFormat("syncStoreInfo-pool-%d")
        .setUncaughtExceptionHandler((t, e) -> LOGGER.warn("syncStoreInfo failed", e))
        .get();
    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 1,
        TimeUnit.MINUTES, new LinkedBlockingQueue<>(1000), threadFactory,
        new ThreadPoolExecutor.CallerRunsPolicy());

核心逻辑调用

核心逻辑调用如下,可以看到是通过线程池并发的对全部coopNos进行了一个消费。

/**
     * 同步全部合作下的全部门店
     *
     * @param coopNos the coop nos
     */
    public void syncBatchCooperation(List<String> coopNos) {
        if (CollectionUtils.isEmpty(coopNos)) {
            return;
        }
        for (String coopNo : coopNos) {
           executor.submit(() -> {
                try {
                    LOGGER.info("开始同步单个合作下的门店数据,coopNo = " + coopNo);
                    // 核心业务逻辑
                    this.syncOneCooperation(coopNo);
                } catch (Exception e) {
                    LOGGER.warn(MsgBuilder.build("合作下门店同步错误,coopNo = {}, errorMsg = {} ", coopNo, ExceptionUtils.getStackTrace(e)), e);
                }
            });
        }
    }

分布式实现方式

分布式的实现方式是为了让多台机器同时消费coopNos,增加速度,同时使用RedissonClient来保证分布式的一致性处理

核心逻辑调用

请求到来时先将代处理的消息都塞到分布式的redis队列中,具体的消费由分布式集群分组处理

/**
     * 同步全部合作下的全部门店
     *
     * @param coopNos the coop nos
     */
    public void syncBatchCooperation(List<String> coopNos) {
        if (CollectionUtils.isEmpty(coopNos)) {
            return;
        }
        for (String coopNo : coopNos) {
            // 线上运行时用redis队列+多线程,多台机器同时消费
            addCoopToAsyncWork(coopNo);
        }
    }

RedissonClient

引入RedissonClient,并且设置一个sortedSet队列,按照加入时间打分。将所有待处理的数据放到一个分布式消息队列中,然后由下游从队列中取数据进行处理。RedissonClient可以保证对多个计算节点保证同步。之前用C#做的时候搞的过于复杂了,参见这篇Blog:【Redis原理机制 六】Redis分布式锁深入、改进策略及RedLock算法

/**
     * 引入RedissonClient 
     */
    @Resource
    private RedissonClient redissonClient;
   /**
     * 队列设置
     */
    private RScoredSortedSet<String> coopNoQueue;
    /**
     * 是否运行
     */
    private boolean isRunning = false;
    /**
     * 队列初始化设置
     */
    @PostConstruct
    public void init() {
        coopNoQueue = redissonClient.getScoredSortedSet("COOP:STORE:SYNC:", StringCodec.INSTANCE);
    }
    /**
     * 队列增加元素
     */
    private void addCoopToAsyncWork(String data) {
        coopNoQueue.add(Double.longBitsToDouble(System.currentTimeMillis()), JsonUtils.obj2Str(data));
    }

SmartLifecycle

有些场景我们需要在Spring 所有的bean 完成初始化后紧接着执行一些任务或者启动需要的异步服务,首先实现接口SmartLifecycle并重写实现方法,通过SmartLifecycle进行容器级的生命周期管理,常见有几种解决方案:

  • j2ee 注解 启动前@PostConstruct 销毁前@PreDestroy 基于j2ee 规范
  • springboot 的 org.springframework.boot.CommandLineRunner
  • spring org.springframework.context.SmartLifecycle

SmartLifecycle 不仅仅能在初始化后执行一个逻辑,还能在关闭前执行一个逻辑,这里我们就在开始时执行并发消费逻辑,结束时结束消费逻辑、关闭线程池

/**
    * 是否运行
    */
    private boolean isRunning = false;
    /**
    * 容器启动时调用:运行中
    */
    @Override
    public void start() {
        isRunning = true;
        executor.execute(() -> {
            while (isRunning) {
                try {
                    Collection<String> coopNos = coopNoQueue.pollFirst(10);
                    if (CollectionUtils.isEmpty(coopNos)) {
                        TimeUnit.SECONDS.sleep(1);
                        continue;
                    }
                    LOGGER.info("开始同步一组合作下门店,合作数量: {}, coopNos : {}", coopNos.size(), JsonUtils.obj2Str(coopNos));
                    CountDownLatch count = new CountDownLatch(coopNos.size());
                    coopNos.forEach(coopNo -> {
                        try {
                            LOGGER.info("开始同步单个合作下门店,coopNo = " + coopNo);
                            syncOneCooperation(coopNo);
                        } catch (Exception e) {
                            LOGGER.warn(MsgBuilder.build("同步单个合作门店错误,coopNo = {}, errorMsg = {} ", coopNo, ExceptionUtils.getStackTrace(e)), e);
                        } finally {
                            count.countDown();
                        }
                    });
                    count.await();
                    LOGGER.info("同步本组合作下门店结束,数量:{}", coopNos.size());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (Exception e) {
                    LOGGER.warn("合作下门店同步结束", e);
                }
            }
        });
    }
   /**
    * 容器停止时调用:结束方法调用、关闭线程池
    */
    @Override
    @SneakyThrows
    public void stop() {
        isRunning = false;
        executor.shutdown();
        executor.awaitTermination(30L, TimeUnit.SECONDS);
    }
   /**
    * 检测bean容器是否运行
    */
    @Override
    public boolean isRunning() {
        return isRunning;
    }

CountDownLatch

我们使用CountDownLatch来进行线程同步,保证每组10个coopNos执行完成后再进行下一组执行,一定程度上保证分组数据的同步处理【Java并发编程 十】JUC并发包下的工具类,防止本地的coopNos数量不停变化,导致foreach里的执行逻辑有误。

CountDownLatch count = new CountDownLatch(coopNos.size());
        coopNos.forEach(coopNo -> {
            try {
                 LOGGER.info("开始同步单个合作下门店,coopNo = " + coopNo);
                 syncOneCooperation(coopNo);
                } catch (Exception e) {
            LOGGER.warn(MsgBuilder.build("同步单个合作门店错误,coopNo = {}, errorMsg = {} ", coopNo, ExceptionUtils.getStackTrace(e)), e);
                } finally {
                   count.countDown();
               }
           });
    count.await();


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
29天前
|
消息中间件 架构师 数据库
本地消息表事务:10Wqps 高并发分布式事务的 终极方案,大厂架构师的 必备方案
45岁资深架构师尼恩分享了一篇关于分布式事务的文章,详细解析了如何在10Wqps高并发场景下实现分布式事务。文章从传统单体架构到微服务架构下分布式事务的需求背景出发,介绍了Seata这一开源分布式事务解决方案及其AT和TCC两种模式。随后,文章深入探讨了经典ebay本地消息表方案,以及如何使用RocketMQ消息队列替代数据库表来提高性能和可靠性。尼恩还分享了如何结合延迟消息进行事务数据的定时对账,确保最终一致性。最后,尼恩强调了高端面试中需要准备“高大上”的答案,并提供了多个技术领域的深度学习资料,帮助读者提升技术水平,顺利通过面试。
本地消息表事务:10Wqps 高并发分布式事务的 终极方案,大厂架构师的 必备方案
|
23天前
|
运维 Kubernetes 调度
阿里云容器服务 ACK One 分布式云容器企业落地实践
阿里云容器服务ACK提供强大的产品能力,支持弹性、调度、可观测、成本治理和安全合规。针对拥有IDC或三方资源的企业,ACK One分布式云容器平台能够有效解决资源管理、多云多集群管理及边缘计算等挑战,实现云上云下统一管理,提升业务效率与稳定性。
|
25天前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
176 7
|
30天前
|
机器学习/深度学习 存储 运维
分布式机器学习系统:设计原理、优化策略与实践经验
本文详细探讨了分布式机器学习系统的发展现状与挑战,重点分析了数据并行、模型并行等核心训练范式,以及参数服务器、优化器等关键组件的设计与实现。文章还深入讨论了混合精度训练、梯度累积、ZeRO优化器等高级特性,旨在提供一套全面的技术解决方案,以应对超大规模模型训练中的计算、存储及通信挑战。
64 4
|
2月前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
70 8
|
1月前
|
缓存 NoSQL Java
Spring Boot中的分布式缓存方案
Spring Boot提供了简便的方式来集成和使用分布式缓存。通过Redis和Memcached等缓存方案,可以显著提升应用的性能和扩展性。合理配置和优化缓存策略,可以有效避免常见的缓存问题,保证系统的稳定性和高效运行。
47 3
|
2月前
|
NoSQL 安全 PHP
hyperf-wise-locksmith,一个高效的PHP分布式锁方案
`hyperf-wise-locksmith` 是 Hyperf 框架下的互斥锁库,支持文件锁、分布式锁、红锁及协程锁,有效防止分布式环境下的竞争条件。本文介绍了其安装、特性和应用场景,如在线支付系统的余额扣减,确保操作的原子性。
33 4
|
2月前
|
NoSQL 算法 关系型数据库
分布式 ID 详解 ( 5大分布式 ID 生成方案 )
本文详解分布式全局唯一ID及其5种实现方案,关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
分布式 ID 详解 ( 5大分布式 ID 生成方案 )
|
3月前
|
JSON 分布式计算 前端开发
前端的全栈之路Meteor篇(七):轻量的NoSql分布式数据协议同步协议DDP深度剖析
本文深入探讨了DDP(Distributed Data Protocol)协议,这是一种在Meteor框架中广泛使用的发布/订阅协议,支持实时数据同步。文章详细介绍了DDP的主要特点、消息类型、协议流程及其在Meteor中的应用,包括实时数据同步、用户界面响应、分布式计算、多客户端协作和离线支持等。通过学习DDP,开发者可以构建响应迅速、适应性强的现代Web应用。
|
3月前
|
存储 缓存 NoSQL
分布式架构下 Session 共享的方案
【10月更文挑战第15天】在实际应用中,需要根据具体的业务需求、系统架构和性能要求等因素,选择合适的 Session 共享方案。同时,还需要不断地进行优化和调整,以确保系统的稳定性和可靠性。