SpringBoot整合RocketMQ发送事务消息

简介: RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式

事务消息
1. 概念
分布式事务:一次操作由若干分支操作组成,这些分支操作分属不同应用,分布在不同服务器上。分布式事务需要保证这些分支操作要么全部成功,要么全部失败,分布式事务于普通事务一样,就是为了保证操作结果的一致性
事务消息:RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式
半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了Broker,但是Broker未收到最终确认指令,此时该消息被标记成“暂不能投递”状态,即不能被消费者看到。处于该种状态下的消息即半事务消息
本地事务状态:Producer 回调操作执行的结果为本地事务状态,其会发送给TC,而TC会再发送给TM。TM会根据TC发送来的本地事务状态来决定全局事务确认指令
消息回查:重新查询本地事务的执行状态
关于消息回查,有三个常见的属性设置。它们都在broker加载的配置文件中设置
transactionTimeout=20,指定TM在20秒内应将最终确认状态发送给TC,否则引发消息回查。默认为60秒
transactionCheckMax=5,指定最多回查5次,超过后将丢弃消息并记录错误日志。默认15次。
transactionCheckInterval=10,指定设置的多次消息回查的时间间隔为10秒。默认为60秒。
XA协议
XA(Unix Transaction)是一种分布式事务解决方案,一种分布式事务处理模式,是基于XA协议的。XA协议由Tuxedo(Transaction for Unix has been Extended for Distributed Operation,分布式操作扩展之后的Unix事务系统)首先提出的,并交给X/Open组织,作为资源管理器与事务管理器的接口标准。
XA模式中有三个重要组件:TC、TM、RM。
TC(Transaction Coordinator):事务协调者。维护全局和分支事务的状态,驱动全局事务提交或回滚。RocketMQ中Broker充当着TC
TM(Transaction Manager):事务管理器。定义全局事务的范围:开始全局事务、提交或回滚全局事务。它实际是全局事务的发起者。RocketMQ中事务消息的Producer充当着TM
RM(Resource Manager):资源管理器。管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。RocketMQ中事务消息的Producer及Broker均是RM
XA模式是一个典型的2PC,其执行原理如下:

TM向TC发起指令,开启一个全局事务。
根据业务要求,各个RM会逐个向TC注册分支事务,然后TC会逐个向RM发出预执行指令。
各个RM在接收到指令后会在进行本地事务预执行。
RM将预执行结果Report给TC。当然,这个结果可能是成功,也可能是失败。
TC在接收到各个RM的Report后会将汇总结果上报给TM,根据汇总结果TM会向TC发出确认指 令。 若所有结果都是成功响应,则向TC发送Global Commit指令。 只要有结果是失败响应,则向TC发送Global Rollback指令。
TC在接收到指令后再次向RM发送确认指令。

注意:
事务消息不支持延时消息
对于事务消息要做好幂等性检查,因为事务消息可能不止一次被消费(因为存在回滚后再提交的 情况)
2. 生产者业务接口

public interface TransactionMessageService {

    /**
     * 发送事务消息
     * @param id
     * @param message
     */
    void sendTransactionMessage(String id, String message);
}

3. 生产者业务接口实现类

@Service
public class TransactionMessageServiceImpl implements TransactionMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private static final Logger logger = LoggerFactory.getLogger(TransactionMessageServiceImpl.class);

    @Override
    public void sendTransactionMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("transaction-message-topic:transaction-tags", strMessage, id);
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            logger.info("发送事务消息成功!消息ID为:{}", result.getMsgId());
        }
    }
}

4. 生产事务监听器类

@RocketMQTransactionListener
public class TransactionListener implements RocketMQLocalTransactionListener {

    private static final Logger logger = LoggerFactory.getLogger(TransactionListener.class);

    @Autowired
    private SysUserInfoService userInfoService;
    @Autowired
    private SysLogInfoService logInfoService;

    /**
     * 执行本地事务
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            String rocketmqKeys = msg.getHeaders().get("rocketmq_KEYS").toString();
            logger.info("事务消息key为:{}", rocketmqKeys);
            String payload = new String((byte[]) msg.getPayload());
            logger.info("事务消息为:{}", payload);
            SysUserInfo userInfo = JSONObject.parseObject(payload, SysUserInfo.class);
            userInfoService.saveUserInfoAndLogInfo(userInfo, rocketmqKeys);
        } catch (Exception e) {
            logger.error("发送事务消息异常!异常信息为:{}", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }

    /**
     * 校验本地事务
     * @param msg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String rocketmqKeys = msg.getHeaders().get("rocketmq_KEYS").toString();
        logger.info("事务消息key为:{}", rocketmqKeys);
        SysLogInfo logInfo = logInfoService.selectLogInfoByKey(rocketmqKeys);
        logger.info("查询日志信息为:{}", JSON.toJSONString(logInfo));
        if (null == logInfo) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }
}

5. 消费者类

@Component
@RocketMQMessageListener(topic = "transaction-message-topic", consumerGroup = "transaction-consumer-group")
public class TransactionMessageListener implements RocketMQListener<String> {

    private static final Logger logger = LoggerFactory.getLogger(TransactionMessageListener.class);

    @Override
    public void onMessage(String message) {
        logger.info("接收到事务消息:{}", message);
    }
}

6. 测试

@Test
void transactionMessage() {
    String uuid = UUID.randomUUID().toString().replace("-", "");
    SysUserInfo userInfo = new SysUserInfo();
    userInfo.setUserId(1001L);
    userInfo.setAddr("重庆渝北");
    userInfo.setAge(18);
    userInfo.setUserName("xiaofeng");
    userInfo.setPhone("13509877890");
    userInfo.setSex(1);
    userInfo.setStatus(1);
    transactionMessageService.sendTransactionMessage(uuid, JSON.toJSONString(userInfo));
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
8月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
5月前
|
消息中间件 Java Maven
|
6月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
419 1
|
6月前
|
消息中间件 调度 RocketMQ
【RocketMQ系列六】RocketMQ事务消息
【RocketMQ系列六】RocketMQ事务消息
1050 1
|
7月前
|
消息中间件 Java Kafka
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
|
6月前
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成
|
7月前
|
消息中间件 Java RocketMQ
教程:Spring Boot整合RocketMQ的配置与优化
教程:Spring Boot整合RocketMQ的配置与优化
|
7月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
Java 应用服务中间件 Maven
传统maven项目和现在spring boot项目的区别
Spring Boot:传统 Web 项目与采用 Spring Boot 项目区别
519 0
传统maven项目和现在spring boot项目的区别
|
XML Java 数据库连接
创建springboot项目的基本流程——以宠物类别为例
创建springboot项目的基本流程——以宠物类别为例
159 0
创建springboot项目的基本流程——以宠物类别为例