史上最细最强大的RocketMQ实现分布式事务解决方案教程|Java 开发实战(下)

简介: 史上最细最强大的RocketMQ实现分布式事务解决方案教程|Java 开发实战

微服务应用集成MQ

  • 引入依赖
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>
  • 配置文件配置
rocketmq:
  name-server: xxxx:9876
  producer:
    group: base_group_syncMsg
    send-message-timeout: 5000
    retry-times-when-send-failed: 2
    max-message-size: 4194304

bank1 应用实现

提供请求api

@GetMapping(value = "/rocketmq")
    public String transfer(@RequestParam("accountNo")String accountNo, @RequestParam("amount") Double amount){
        //创建事务id,作为消息内容发到mq
        String tx_no = UUID.randomUUID().toString();
        //封装事件实体
        AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo,amount,tx_no);
        //发送消息
        accountInfoService.sendUpdateAccountBalance(accountChangeEvent);
        return "处理成功-账号:{"+accountNo+"}扣减:{"+amount+"}";
    }

扣款请求

image.png

发送消息到MQ

/**
     * 向mq发送转账消息
     * @param accountChangeEvent 事件实体
     */
    @Override
    public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
        //将accountChangeEvent转成json
        JSONObject jsonObject =new JSONObject();
        jsonObject.put("accountChange",accountChangeEvent);
        String jsonString = jsonObject.toJSONString();
        //生成message类型
        Message<String> message = MessageBuilder.withPayload(jsonString).build();
        //发送一条事务消息
        /**
         * String txProducerGroup 生产组
         * String destination 主题,
         * Message<?> message, 消息内容
         * Object arg 参数
         */
         rocketMQTemplate.sendMessageInTransaction("producer_group_bank1","bank",message,null);
    }

监听MQ返回

/**
 * @author 小隐乐乐
 * @date 2021/06/3
 * @description 消费者监听
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.producer.topic}", consumerGroup = "${rocketmq.producer.group}")
public class ConsumerListener implements RocketMQListener<String> {
    /**
     * 注入业务实现
     */
    @Autowired
    AccountInfoService accountInfoService;
    /**
     * 接收消息
     */
    @Override
    public void onMessage(String message) {
        log.info("获取到的消费消息:{}",message);
        //解析
        JSONObject jsonObject = JSONObject.parseObject(message);
        String accountChangeString = jsonObject.getString("accountChange");
        //转成AccountChangeEvent对象
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        //设置账号
        accountChangeEvent.setAccountNo("2");
        //执行业务操作---增加金额
        accountInfoService.addAccountInfoBalance(accountChangeEvent);
    }
}

实现本地业务逻辑

/**
 * @author 小隐乐乐
 * @date 2021/06/3
 * @description 账户业务实现
 */
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
    @Autowired
    AccountInfoDao accountInfoDao;
    //更新账户--增加金额
    @Override
    @Transactional
    public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {
        log.info("bank2更新本地账号,账号:{},金额:{}",accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
        //本地读取事务  防止重复消费
        if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){
            return ;
        }
        //插入数据--增加金额
        accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
        //添加事务记录,用于幂等
        accountInfoDao.addTx(accountChangeEvent.getTxNo());
        //预留错误演示
        if(accountChangeEvent.getAmount() == 250){
            throw new RuntimeException("消息处理异常");
        }
    }
}

bank1 事务回调监听

/**
 * @author 小隐乐乐
 * @date 2021/06/3
 * @description 生产者事务回调监听器
 */
@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_group_bank1")
public class ProducerCallbackListener implements RocketMQLocalTransactionListener {
    @Autowired
    AccountInfoService accountInfoService;
    @Autowired
    AccountInfoDao accountInfoDao;
    /**
     * 事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调
     * @param message 消息
     * @return
     */
    @Override
    @Transactional
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            //解析消息
            String messageString = new String((byte[]) message.getPayload());
            JSONObject jsonObject = JSONObject.parseObject(messageString);
            //转成AccountChangeEvent实体
            String accountChangeString = jsonObject.getString("accountChange");
            //将accountChange(json)转成AccountChangeEvent
            AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
            //执行本地事务,扣减金额
            accountInfoService.doUpdateAccountBalance(accountChangeEvent);
            //当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
            //向mq发送ROLLBACK,mq将消息的状态依旧无法消费
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    /**
     * 事务状态回查,查询是否扣减金额
     * @param message 消息
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        //解析message,转成AccountChangeEvent
        String messageString = new String((byte[]) message.getPayload());
        JSONObject jsonObject = JSONObject.parseObject(messageString);
        String accountChangeString = jsonObject.getString("accountChange");
        //将accountChange(json)转成AccountChangeEvent
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        //事务id
        String txNo = accountChangeEvent.getTxNo();
        int existTx = accountInfoDao.isExistTx(txNo);
        if(existTx>0){
            return RocketMQLocalTransactionState.COMMIT;
        }else{
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}

bank2 应用实现

bank2 监听MQ

/**
 * @author 小隐乐乐
 * @date 2021/06/3
 * @description 消费者监听
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "bank", consumerGroup = "rocketmq.consumer.group")
public class ConsumerListener implements RocketMQListener<String> {
    /**
     * 注入业务实现
     */
    @Autowired
    AccountInfoService accountInfoService;
    /**
     * 接收消息
     */
    @Override
    public void onMessage(String message) {
        log.info("获取到的消费消息:{}",message);
        //解析
        JSONObject jsonObject = JSONObject.parseObject(message);
        String accountChangeString = jsonObject.getString("accountChange");
        //转成AccountChangeEvent对象
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        //设置账号
        accountChangeEvent.setAccountNo("2");
        //执行业务操作---增加金额
        accountInfoService.addAccountInfoBalance(accountChangeEvent);
    }
}

消息消费

log.info("获取到的消费消息:{}",message);
        //解析
        JSONObject jsonObject = JSONObject.parseObject(message);
        String accountChangeString = jsonObject.getString("accountChange");
        //转成AccountChangeEvent对象
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);

执行本地扣款事务

accountInfoService.addAccountInfoBalance(accountChangeEvent);

总结

终于搞完了,写demo还是很费事的。

分布式事务解决方案很多,到底需不需要分布式事务,也是需要我们技术人员去考量的。那么如果需要,我相信,本篇文章作为RocketMQ实现消息队列分布式事务的快速上手文章,相信你不容错过。如果觉得写的不错,我准备出专栏,哈哈哈。

躺平,在追求梦想的人身上不是一个好选择,技术的脚步,是一直向前的,努力吧,少年们!!!


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6月前
|
安全 Java 开发者
告别NullPointerException:Java Optional实战指南
告别NullPointerException:Java Optional实战指南
336 119
|
7月前
|
Java 开发者
Java并发编程:CountDownLatch实战解析
Java并发编程:CountDownLatch实战解析
563 100
|
7月前
|
人工智能 Java API
Java AI智能体实战:使用LangChain4j构建能使用工具的AI助手
随着AI技术的发展,AI智能体(Agent)能够通过使用工具来执行复杂任务,从而大幅扩展其能力边界。本文介绍如何在Java中使用LangChain4j框架构建一个能够使用外部工具的AI智能体。我们将通过一个具体示例——一个能获取天气信息和执行数学计算的AI助手,详细讲解如何定义工具、创建智能体并处理执行流程。本文包含完整的代码示例和架构说明,帮助Java开发者快速上手AI智能体的开发。
2850 8
|
7月前
|
人工智能 Java API
Java与大模型集成实战:构建智能Java应用的新范式
随着大型语言模型(LLM)的API化,将其强大的自然语言处理能力集成到现有Java应用中已成为提升应用智能水平的关键路径。本文旨在为Java开发者提供一份实用的集成指南。我们将深入探讨如何使用Spring Boot 3框架,通过HTTP客户端与OpenAI GPT(或兼容API)进行高效、安全的交互。内容涵盖项目依赖配置、异步非阻塞的API调用、请求与响应的结构化处理、异常管理以及一些面向生产环境的最佳实践,并附带完整的代码示例,助您快速将AI能力融入Java生态。
1219 12
|
6月前
|
存储 人工智能 算法
从零掌握贪心算法Java版:LeetCode 10题实战解析(上)
在算法世界里,有一种思想如同生活中的"见好就收"——每次做出当前看来最优的选择,寄希望于通过局部最优达成全局最优。这种思想就是贪心算法,它以其简洁高效的特点,成为解决最优问题的利器。今天我们就来系统学习贪心算法的核心思想,并通过10道LeetCode经典题目实战演练,带你掌握这种"步步为营"的解题思维。
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
1305 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
68165 2
3 张图带你彻底理解 RocketMQ 事务消息
|
消息中间件 NoSQL 关系型数据库
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
实战:如何防止mq消费方消息重复消费 如果因为网络延迟等原因,mq无法及时接收到消费方的应答,导致mq重试。(计算机网络)。在重试过程中造成重复消费的问题
3286 1
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
|
消息中间件 Java uml
5张图带你理解 RocketMQ 顺序消息实现机制
5张图带你理解 RocketMQ 顺序消息实现机制
1216 1
5张图带你理解 RocketMQ 顺序消息实现机制
|
消息中间件 缓存 算法
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
568 0
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?