史上最细最强大的RocketMQ实现分布式事务解决方案教程|Java 开发实战(下)

简介: 史上最细最强大的RocketMQ实现分布式事务解决方案教程|Java 开发实战

微服务应用集成MQ

  • 引入依赖
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>
  • 配置文件配置
rocketmq:
  name-server: xxxx:9876
  producer:
    group: base_group_syncMsg
    send-message-timeout: 5000
    retry-times-when-send-failed: 2
    max-message-size: 4194304

bank1 应用实现

提供请求api

@GetMapping(value = "/rocketmq")
    public String transfer(@RequestParam("accountNo")String accountNo, @RequestParam("amount") Double amount){
        //创建事务id,作为消息内容发到mq
        String tx_no = UUID.randomUUID().toString();
        //封装事件实体
        AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo,amount,tx_no);
        //发送消息
        accountInfoService.sendUpdateAccountBalance(accountChangeEvent);
        return "处理成功-账号:{"+accountNo+"}扣减:{"+amount+"}";
    }

扣款请求

image.png

发送消息到MQ

/**
     * 向mq发送转账消息
     * @param accountChangeEvent 事件实体
     */
    @Override
    public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
        //将accountChangeEvent转成json
        JSONObject jsonObject =new JSONObject();
        jsonObject.put("accountChange",accountChangeEvent);
        String jsonString = jsonObject.toJSONString();
        //生成message类型
        Message<String> message = MessageBuilder.withPayload(jsonString).build();
        //发送一条事务消息
        /**
         * String txProducerGroup 生产组
         * String destination 主题,
         * Message<?> message, 消息内容
         * Object arg 参数
         */
         rocketMQTemplate.sendMessageInTransaction("producer_group_bank1","bank",message,null);
    }

监听MQ返回

/**
 * @author 小隐乐乐
 * @date 2021/06/3
 * @description 消费者监听
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.producer.topic}", consumerGroup = "${rocketmq.producer.group}")
public class ConsumerListener implements RocketMQListener<String> {
    /**
     * 注入业务实现
     */
    @Autowired
    AccountInfoService accountInfoService;
    /**
     * 接收消息
     */
    @Override
    public void onMessage(String message) {
        log.info("获取到的消费消息:{}",message);
        //解析
        JSONObject jsonObject = JSONObject.parseObject(message);
        String accountChangeString = jsonObject.getString("accountChange");
        //转成AccountChangeEvent对象
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        //设置账号
        accountChangeEvent.setAccountNo("2");
        //执行业务操作---增加金额
        accountInfoService.addAccountInfoBalance(accountChangeEvent);
    }
}

实现本地业务逻辑

/**
 * @author 小隐乐乐
 * @date 2021/06/3
 * @description 账户业务实现
 */
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
    @Autowired
    AccountInfoDao accountInfoDao;
    //更新账户--增加金额
    @Override
    @Transactional
    public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {
        log.info("bank2更新本地账号,账号:{},金额:{}",accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
        //本地读取事务  防止重复消费
        if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){
            return ;
        }
        //插入数据--增加金额
        accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
        //添加事务记录,用于幂等
        accountInfoDao.addTx(accountChangeEvent.getTxNo());
        //预留错误演示
        if(accountChangeEvent.getAmount() == 250){
            throw new RuntimeException("消息处理异常");
        }
    }
}

bank1 事务回调监听

/**
 * @author 小隐乐乐
 * @date 2021/06/3
 * @description 生产者事务回调监听器
 */
@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_group_bank1")
public class ProducerCallbackListener implements RocketMQLocalTransactionListener {
    @Autowired
    AccountInfoService accountInfoService;
    @Autowired
    AccountInfoDao accountInfoDao;
    /**
     * 事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调
     * @param message 消息
     * @return
     */
    @Override
    @Transactional
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            //解析消息
            String messageString = new String((byte[]) message.getPayload());
            JSONObject jsonObject = JSONObject.parseObject(messageString);
            //转成AccountChangeEvent实体
            String accountChangeString = jsonObject.getString("accountChange");
            //将accountChange(json)转成AccountChangeEvent
            AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
            //执行本地事务,扣减金额
            accountInfoService.doUpdateAccountBalance(accountChangeEvent);
            //当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
            //向mq发送ROLLBACK,mq将消息的状态依旧无法消费
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    /**
     * 事务状态回查,查询是否扣减金额
     * @param message 消息
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        //解析message,转成AccountChangeEvent
        String messageString = new String((byte[]) message.getPayload());
        JSONObject jsonObject = JSONObject.parseObject(messageString);
        String accountChangeString = jsonObject.getString("accountChange");
        //将accountChange(json)转成AccountChangeEvent
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        //事务id
        String txNo = accountChangeEvent.getTxNo();
        int existTx = accountInfoDao.isExistTx(txNo);
        if(existTx>0){
            return RocketMQLocalTransactionState.COMMIT;
        }else{
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}

bank2 应用实现

bank2 监听MQ

/**
 * @author 小隐乐乐
 * @date 2021/06/3
 * @description 消费者监听
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "bank", consumerGroup = "rocketmq.consumer.group")
public class ConsumerListener implements RocketMQListener<String> {
    /**
     * 注入业务实现
     */
    @Autowired
    AccountInfoService accountInfoService;
    /**
     * 接收消息
     */
    @Override
    public void onMessage(String message) {
        log.info("获取到的消费消息:{}",message);
        //解析
        JSONObject jsonObject = JSONObject.parseObject(message);
        String accountChangeString = jsonObject.getString("accountChange");
        //转成AccountChangeEvent对象
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        //设置账号
        accountChangeEvent.setAccountNo("2");
        //执行业务操作---增加金额
        accountInfoService.addAccountInfoBalance(accountChangeEvent);
    }
}

消息消费

log.info("获取到的消费消息:{}",message);
        //解析
        JSONObject jsonObject = JSONObject.parseObject(message);
        String accountChangeString = jsonObject.getString("accountChange");
        //转成AccountChangeEvent对象
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);

执行本地扣款事务

accountInfoService.addAccountInfoBalance(accountChangeEvent);

总结

终于搞完了,写demo还是很费事的。

分布式事务解决方案很多,到底需不需要分布式事务,也是需要我们技术人员去考量的。那么如果需要,我相信,本篇文章作为RocketMQ实现消息队列分布式事务的快速上手文章,相信你不容错过。如果觉得写的不错,我准备出专栏,哈哈哈。

躺平,在追求梦想的人身上不是一个好选择,技术的脚步,是一直向前的,努力吧,少年们!!!


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
人工智能 Kubernetes 数据可视化
Kubernetes下的分布式采集系统设计与实战:趋势监测失效引发的架构进化
本文回顾了一次关键词监测任务在容器集群中失效的全过程,分析了中转IP复用、调度节奏和异常处理等隐性风险,并提出通过解耦架构、动态IP分发和行为模拟优化采集策略,最终实现稳定高效的数据抓取与分析。
Kubernetes下的分布式采集系统设计与实战:趋势监测失效引发的架构进化
|
7月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
568 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
7月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
235 12
|
9月前
|
数据采集 存储 数据可视化
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
966 0
分布式爬虫框架Scrapy-Redis实战指南
|
8月前
|
人工智能 安全 Java
智慧工地源码,Java语言开发,微服务架构,支持分布式和集群部署,多端覆盖
智慧工地是“互联网+建筑工地”的创新模式,基于物联网、移动互联网、BIM、大数据、人工智能等技术,实现对施工现场人员、设备、材料、安全等环节的智能化管理。其解决方案涵盖数据大屏、移动APP和PC管理端,采用高性能Java微服务架构,支持分布式与集群部署,结合Redis、消息队列等技术确保系统稳定高效。通过大数据驱动决策、物联网实时监测预警及AI智能视频监控,消除数据孤岛,提升项目可控性与安全性。智慧工地提供专家级远程管理服务,助力施工质量和安全管理升级,同时依托可扩展平台、多端应用和丰富设备接口,满足多样化需求,推动建筑行业数字化转型。
297 5
|
5月前
|
数据采集 缓存 NoSQL
分布式新闻数据采集系统的同步效率优化实战
本文介绍了一个针对高频新闻站点的分布式爬虫系统优化方案。通过引入异步任务机制、本地缓存池、Redis pipeline 批量写入及身份池策略,系统采集效率提升近两倍,数据同步延迟显著降低,实现了分钟级热点追踪能力,为实时舆情监控与分析提供了高效、稳定的数据支持。
201 1
分布式新闻数据采集系统的同步效率优化实战
|
7月前
|
安全 JavaScript 前端开发
HarmonyOS NEXT~HarmonyOS 语言仓颉:下一代分布式开发语言的技术解析与应用实践
HarmonyOS语言仓颉是华为专为HarmonyOS生态系统设计的新型编程语言,旨在解决分布式环境下的开发挑战。它以“编码创造”为理念,具备分布式原生、高性能与高效率、安全可靠三大核心特性。仓颉语言通过内置分布式能力简化跨设备开发,提供统一的编程模型和开发体验。文章从语言基础、关键特性、开发实践及未来展望四个方面剖析其技术优势,助力开发者掌握这一新兴工具,构建全场景分布式应用。
749 35
|
6月前
|
缓存 NoSQL 算法
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
1797 7
|
7月前
|
监控 Java 调度
SpringBoot中@Scheduled和Quartz的区别是什么?分布式定时任务框架选型实战
本文对比分析了SpringBoot中的`@Scheduled`与Quartz定时任务框架。`@Scheduled`轻量易用,适合单机简单场景,但存在多实例重复执行、无持久化等缺陷;Quartz功能强大,支持分布式调度、任务持久化、动态调整和失败重试,适用于复杂企业级需求。文章通过特性对比、代码示例及常见问题解答,帮助开发者理解两者差异,合理选择方案。记住口诀:单机简单用注解,多节点上Quartz;若是任务要可靠,持久化配置不能少。
704 4