史上最细最强大的RocketMQ实现分布式事务解决方案教程|Java 开发实战(下)

简介: 史上最细最强大的RocketMQ实现分布式事务解决方案教程|Java 开发实战

微服务应用集成MQ

  • 引入依赖
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>
  • 配置文件配置
rocketmq:
  name-server: xxxx:9876
  producer:
    group: base_group_syncMsg
    send-message-timeout: 5000
    retry-times-when-send-failed: 2
    max-message-size: 4194304

bank1 应用实现

提供请求api

@GetMapping(value = "/rocketmq")
    public String transfer(@RequestParam("accountNo")String accountNo, @RequestParam("amount") Double amount){
        //创建事务id,作为消息内容发到mq
        String tx_no = UUID.randomUUID().toString();
        //封装事件实体
        AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo,amount,tx_no);
        //发送消息
        accountInfoService.sendUpdateAccountBalance(accountChangeEvent);
        return "处理成功-账号:{"+accountNo+"}扣减:{"+amount+"}";
    }

扣款请求

image.png

发送消息到MQ

/**
     * 向mq发送转账消息
     * @param accountChangeEvent 事件实体
     */
    @Override
    public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
        //将accountChangeEvent转成json
        JSONObject jsonObject =new JSONObject();
        jsonObject.put("accountChange",accountChangeEvent);
        String jsonString = jsonObject.toJSONString();
        //生成message类型
        Message<String> message = MessageBuilder.withPayload(jsonString).build();
        //发送一条事务消息
        /**
         * String txProducerGroup 生产组
         * String destination 主题,
         * Message<?> message, 消息内容
         * Object arg 参数
         */
         rocketMQTemplate.sendMessageInTransaction("producer_group_bank1","bank",message,null);
    }

监听MQ返回

/**
 * @author 小隐乐乐
 * @date 2021/06/3
 * @description 消费者监听
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.producer.topic}", consumerGroup = "${rocketmq.producer.group}")
public class ConsumerListener implements RocketMQListener<String> {
    /**
     * 注入业务实现
     */
    @Autowired
    AccountInfoService accountInfoService;
    /**
     * 接收消息
     */
    @Override
    public void onMessage(String message) {
        log.info("获取到的消费消息:{}",message);
        //解析
        JSONObject jsonObject = JSONObject.parseObject(message);
        String accountChangeString = jsonObject.getString("accountChange");
        //转成AccountChangeEvent对象
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        //设置账号
        accountChangeEvent.setAccountNo("2");
        //执行业务操作---增加金额
        accountInfoService.addAccountInfoBalance(accountChangeEvent);
    }
}

实现本地业务逻辑

/**
 * @author 小隐乐乐
 * @date 2021/06/3
 * @description 账户业务实现
 */
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
    @Autowired
    AccountInfoDao accountInfoDao;
    //更新账户--增加金额
    @Override
    @Transactional
    public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {
        log.info("bank2更新本地账号,账号:{},金额:{}",accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
        //本地读取事务  防止重复消费
        if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){
            return ;
        }
        //插入数据--增加金额
        accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
        //添加事务记录,用于幂等
        accountInfoDao.addTx(accountChangeEvent.getTxNo());
        //预留错误演示
        if(accountChangeEvent.getAmount() == 250){
            throw new RuntimeException("消息处理异常");
        }
    }
}

bank1 事务回调监听

/**
 * @author 小隐乐乐
 * @date 2021/06/3
 * @description 生产者事务回调监听器
 */
@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_group_bank1")
public class ProducerCallbackListener implements RocketMQLocalTransactionListener {
    @Autowired
    AccountInfoService accountInfoService;
    @Autowired
    AccountInfoDao accountInfoDao;
    /**
     * 事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调
     * @param message 消息
     * @return
     */
    @Override
    @Transactional
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            //解析消息
            String messageString = new String((byte[]) message.getPayload());
            JSONObject jsonObject = JSONObject.parseObject(messageString);
            //转成AccountChangeEvent实体
            String accountChangeString = jsonObject.getString("accountChange");
            //将accountChange(json)转成AccountChangeEvent
            AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
            //执行本地事务,扣减金额
            accountInfoService.doUpdateAccountBalance(accountChangeEvent);
            //当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
            //向mq发送ROLLBACK,mq将消息的状态依旧无法消费
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    /**
     * 事务状态回查,查询是否扣减金额
     * @param message 消息
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        //解析message,转成AccountChangeEvent
        String messageString = new String((byte[]) message.getPayload());
        JSONObject jsonObject = JSONObject.parseObject(messageString);
        String accountChangeString = jsonObject.getString("accountChange");
        //将accountChange(json)转成AccountChangeEvent
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        //事务id
        String txNo = accountChangeEvent.getTxNo();
        int existTx = accountInfoDao.isExistTx(txNo);
        if(existTx>0){
            return RocketMQLocalTransactionState.COMMIT;
        }else{
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}

bank2 应用实现

bank2 监听MQ

/**
 * @author 小隐乐乐
 * @date 2021/06/3
 * @description 消费者监听
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "bank", consumerGroup = "rocketmq.consumer.group")
public class ConsumerListener implements RocketMQListener<String> {
    /**
     * 注入业务实现
     */
    @Autowired
    AccountInfoService accountInfoService;
    /**
     * 接收消息
     */
    @Override
    public void onMessage(String message) {
        log.info("获取到的消费消息:{}",message);
        //解析
        JSONObject jsonObject = JSONObject.parseObject(message);
        String accountChangeString = jsonObject.getString("accountChange");
        //转成AccountChangeEvent对象
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        //设置账号
        accountChangeEvent.setAccountNo("2");
        //执行业务操作---增加金额
        accountInfoService.addAccountInfoBalance(accountChangeEvent);
    }
}

消息消费

log.info("获取到的消费消息:{}",message);
        //解析
        JSONObject jsonObject = JSONObject.parseObject(message);
        String accountChangeString = jsonObject.getString("accountChange");
        //转成AccountChangeEvent对象
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);

执行本地扣款事务

accountInfoService.addAccountInfoBalance(accountChangeEvent);

总结

终于搞完了,写demo还是很费事的。

分布式事务解决方案很多,到底需不需要分布式事务,也是需要我们技术人员去考量的。那么如果需要,我相信,本篇文章作为RocketMQ实现消息队列分布式事务的快速上手文章,相信你不容错过。如果觉得写的不错,我准备出专栏,哈哈哈。

躺平,在追求梦想的人身上不是一个好选择,技术的脚步,是一直向前的,努力吧,少年们!!!


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
17天前
|
设计模式 安全 Java
Java并发编程实战:使用synchronized关键字实现线程安全
【4月更文挑战第6天】Java中的`synchronized`关键字用于处理多线程并发,确保共享资源的线程安全。它可以修饰方法或代码块,实现互斥访问。当用于方法时,锁定对象实例或类对象;用于代码块时,锁定指定对象。过度使用可能导致性能问题,应注意避免锁持有时间过长、死锁,并考虑使用`java.util.concurrent`包中的高级工具。正确理解和使用`synchronized`是编写线程安全程序的关键。
|
4天前
|
IDE Java 数据库连接
使用 Java 进行桌面应用开发
【4月更文挑战第19天】Java 是一款广泛应用于企业级、网络和桌面应用开发的编程语言。其跨平台特性使Java程序能在不同操作系统上运行,而JDK提供了开发所需工具和库。使用Swing等GUI库构建用户界面,结合JDBC进行数据库操作,Socket实现网络通信。虽然面临性能和用户体验的挑战,但通过优化和选用合适的IDE,Java仍能开发出高效稳定的桌面应用。
|
4天前
|
存储 Java 数据库连接
java DDD 领域驱动设计思想的概念与实战
【4月更文挑战第19天】在Java开发中,领域驱动设计(Domain-Driven Design, DDD) 是一种软件设计方法论,强调以领域模型为中心的软件开发。这种方法通过丰富的领域模型来捕捉业务领域的复杂性,并通过软件满足核心业务需求。领域驱动设计不仅是一种技术策略,而且还是一种与业务专家紧密合作的思维方式
21 2
|
5天前
|
前端开发 Java Go
开发语言详解(python、java、Go(Golong)。。。。)
开发语言详解(python、java、Go(Golong)。。。。)
|
5天前
|
人工智能 前端开发 Java
Java语言开发的AI智慧导诊系统源码springboot+redis 3D互联网智导诊系统源码
智慧导诊解决盲目就诊问题,减轻分诊工作压力。降低挂错号比例,优化就诊流程,有效提高线上线下医疗机构接诊效率。可通过人体画像选择症状部位,了解对应病症信息和推荐就医科室。
38 10
|
5天前
|
Java 关系型数据库 MySQL
一套java+ spring boot与vue+ mysql技术开发的UWB高精度工厂人员定位全套系统源码有应用案例
UWB (ULTRA WIDE BAND, UWB) 技术是一种无线载波通讯技术,它不采用正弦载波,而是利用纳秒级的非正弦波窄脉冲传输数据,因此其所占的频谱范围很宽。一套UWB精确定位系统,最高定位精度可达10cm,具有高精度,高动态,高容量,低功耗的应用。
一套java+ spring boot与vue+ mysql技术开发的UWB高精度工厂人员定位全套系统源码有应用案例
|
12天前
|
运维 NoSQL 算法
Java开发-深入理解Redis Cluster的工作原理
综上所述,Redis Cluster通过数据分片、节点发现、主从复制、数据迁移、故障检测和客户端路由等机制,实现了一个分布式的、高可用的Redis解决方案。它允许数据分布在多个节点上,提供了自动故障转移和读写分离的功能,适用于需要大规模、高性能、高可用性的应用场景。
16 0
|
14天前
|
Java API 开发者
Java 8新特性之函数式编程实战
【4月更文挑战第9天】本文将深入探讨Java 8的新特性之一——函数式编程,通过实例演示如何运用Lambda表达式、Stream API等技术,提高代码的简洁性和执行效率。
|
14天前
|
人工智能 小程序 Java
JAVA开发智慧学校系统源码+人脸电子班牌布局
智慧校园是通过利用物联网,大数据技术来改变师生和校园资源相互交互的方式,以便提高交互的明确性、灵活性和响应速度,从而实现智慧化服务和管理的校园模式。
|
17天前
|
安全 前端开发 Java
Java Web开发知识点学习总结
Java Web开发涉及Java基础、Servlet、JSP、数据库操作(SQL+JDBC)、MVC设计模式、Spring框架、Hibernate ORM、Web服务(SOAP&RESTful)、安全认证(HTTP Basic/Digest/OAuth)及性能优化(缓存、异步、负载均衡)。
17 3