SpringBoot整合RocketMQ发送事务消息

简介: RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式

事务消息
1. 概念
分布式事务:一次操作由若干分支操作组成,这些分支操作分属不同应用,分布在不同服务器上。分布式事务需要保证这些分支操作要么全部成功,要么全部失败,分布式事务于普通事务一样,就是为了保证操作结果的一致性
事务消息:RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式
半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了Broker,但是Broker未收到最终确认指令,此时该消息被标记成“暂不能投递”状态,即不能被消费者看到。处于该种状态下的消息即半事务消息
本地事务状态:Producer 回调操作执行的结果为本地事务状态,其会发送给TC,而TC会再发送给TM。TM会根据TC发送来的本地事务状态来决定全局事务确认指令
消息回查:重新查询本地事务的执行状态
关于消息回查,有三个常见的属性设置。它们都在broker加载的配置文件中设置
transactionTimeout=20,指定TM在20秒内应将最终确认状态发送给TC,否则引发消息回查。默认为60秒
transactionCheckMax=5,指定最多回查5次,超过后将丢弃消息并记录错误日志。默认15次。
transactionCheckInterval=10,指定设置的多次消息回查的时间间隔为10秒。默认为60秒。
XA协议
XA(Unix Transaction)是一种分布式事务解决方案,一种分布式事务处理模式,是基于XA协议的。XA协议由Tuxedo(Transaction for Unix has been Extended for Distributed Operation,分布式操作扩展之后的Unix事务系统)首先提出的,并交给X/Open组织,作为资源管理器与事务管理器的接口标准。
XA模式中有三个重要组件:TC、TM、RM。
TC(Transaction Coordinator):事务协调者。维护全局和分支事务的状态,驱动全局事务提交或回滚。RocketMQ中Broker充当着TC
TM(Transaction Manager):事务管理器。定义全局事务的范围:开始全局事务、提交或回滚全局事务。它实际是全局事务的发起者。RocketMQ中事务消息的Producer充当着TM
RM(Resource Manager):资源管理器。管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。RocketMQ中事务消息的Producer及Broker均是RM
XA模式是一个典型的2PC,其执行原理如下:

TM向TC发起指令,开启一个全局事务。
根据业务要求,各个RM会逐个向TC注册分支事务,然后TC会逐个向RM发出预执行指令。
各个RM在接收到指令后会在进行本地事务预执行。
RM将预执行结果Report给TC。当然,这个结果可能是成功,也可能是失败。
TC在接收到各个RM的Report后会将汇总结果上报给TM,根据汇总结果TM会向TC发出确认指 令。 若所有结果都是成功响应,则向TC发送Global Commit指令。 只要有结果是失败响应,则向TC发送Global Rollback指令。
TC在接收到指令后再次向RM发送确认指令。

注意:
事务消息不支持延时消息
对于事务消息要做好幂等性检查,因为事务消息可能不止一次被消费(因为存在回滚后再提交的 情况)
2. 生产者业务接口

public interface TransactionMessageService {

    /**
     * 发送事务消息
     * @param id
     * @param message
     */
    void sendTransactionMessage(String id, String message);
}

3. 生产者业务接口实现类

@Service
public class TransactionMessageServiceImpl implements TransactionMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private static final Logger logger = LoggerFactory.getLogger(TransactionMessageServiceImpl.class);

    @Override
    public void sendTransactionMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("transaction-message-topic:transaction-tags", strMessage, id);
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            logger.info("发送事务消息成功!消息ID为:{}", result.getMsgId());
        }
    }
}

4. 生产事务监听器类

@RocketMQTransactionListener
public class TransactionListener implements RocketMQLocalTransactionListener {

    private static final Logger logger = LoggerFactory.getLogger(TransactionListener.class);

    @Autowired
    private SysUserInfoService userInfoService;
    @Autowired
    private SysLogInfoService logInfoService;

    /**
     * 执行本地事务
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            String rocketmqKeys = msg.getHeaders().get("rocketmq_KEYS").toString();
            logger.info("事务消息key为:{}", rocketmqKeys);
            String payload = new String((byte[]) msg.getPayload());
            logger.info("事务消息为:{}", payload);
            SysUserInfo userInfo = JSONObject.parseObject(payload, SysUserInfo.class);
            userInfoService.saveUserInfoAndLogInfo(userInfo, rocketmqKeys);
        } catch (Exception e) {
            logger.error("发送事务消息异常!异常信息为:{}", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }

    /**
     * 校验本地事务
     * @param msg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String rocketmqKeys = msg.getHeaders().get("rocketmq_KEYS").toString();
        logger.info("事务消息key为:{}", rocketmqKeys);
        SysLogInfo logInfo = logInfoService.selectLogInfoByKey(rocketmqKeys);
        logger.info("查询日志信息为:{}", JSON.toJSONString(logInfo));
        if (null == logInfo) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }
}

5. 消费者类

@Component
@RocketMQMessageListener(topic = "transaction-message-topic", consumerGroup = "transaction-consumer-group")
public class TransactionMessageListener implements RocketMQListener<String> {

    private static final Logger logger = LoggerFactory.getLogger(TransactionMessageListener.class);

    @Override
    public void onMessage(String message) {
        logger.info("接收到事务消息:{}", message);
    }
}

6. 测试

@Test
void transactionMessage() {
    String uuid = UUID.randomUUID().toString().replace("-", "");
    SysUserInfo userInfo = new SysUserInfo();
    userInfo.setUserId(1001L);
    userInfo.setAddr("重庆渝北");
    userInfo.setAge(18);
    userInfo.setUserName("xiaofeng");
    userInfo.setPhone("13509877890");
    userInfo.setSex(1);
    userInfo.setStatus(1);
    transactionMessageService.sendTransactionMessage(uuid, JSON.toJSONString(userInfo));
}
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 数据可视化 Java
Linxu下RocketMq及可视化界面的搭建
Linxu下RocketMq配置信息及可视化界面的搭建
2585 0
|
消息中间件 Java 程序员
SpringBoot整合RocketMQ,尝尝几大高级特性!
作为一名程序员,您一定熟悉RocketMQ的功能,包括支持事务、顺序和延迟消息等。在程序员界有一句名言,“Talk is cheap. Show me the code” 。本文将通过实际案例来引出解决方案,并通过代码实现,让您在学习本节的过程中能够确切地掌握实际编码技能
567 0
SpringBoot整合RocketMQ,尝尝几大高级特性!
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
1644 1
|
消息中间件 Java RocketMQ
教程:Spring Boot整合RocketMQ的配置与优化
教程:Spring Boot整合RocketMQ的配置与优化
|
消息中间件 调度 RocketMQ
【RocketMQ系列六】RocketMQ事务消息
【RocketMQ系列六】RocketMQ事务消息
3692 1
|
消息中间件 中间件 程序员
分布式事务大揭秘:使用MQ实现最终一致性
本文由小米分享,介绍分布式事务中的MQ最终一致性实现,以RocketMQ为例。RocketMQ的事务消息机制包括准备消息、本地事务执行、确认/回滚消息及事务状态检查四个步骤。这种机制通过消息队列协调多系统操作,确保数据最终一致。MQ最终一致性具有系统解耦、提高可用性和灵活事务管理等优点,广泛应用于分布式系统中。文章还讨论了RocketMQ的事务消息处理流程和失败情况下的处理策略,帮助读者理解如何在实际应用中解决分布式事务问题。
1991 6
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成
|
消息中间件 数据库 RocketMQ
Springboot+RocketMQ通过事务消息优雅的实现订单支付功能
RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账, A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银 行账户扣除一万元”这个操作同时成功或者同时失败。RocketMQ采用两阶段提交的方式实现事务消息。
832 0
|
存储 JSON Java
spring boot3登录开发-1(整合jwt)
spring boot3登录开发-1(整合jwt)
1009 1
|
NoSQL Java Redis
SpringBoot整合Redis及StringRedisTemplate的使用
SpringBoot整合Redis及StringRedisTemplate的使用
520 0