SpringBoot整合RocketMQ发送事务消息

简介: RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式

事务消息
1. 概念
分布式事务:一次操作由若干分支操作组成,这些分支操作分属不同应用,分布在不同服务器上。分布式事务需要保证这些分支操作要么全部成功,要么全部失败,分布式事务于普通事务一样,就是为了保证操作结果的一致性
事务消息:RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式
半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了Broker,但是Broker未收到最终确认指令,此时该消息被标记成“暂不能投递”状态,即不能被消费者看到。处于该种状态下的消息即半事务消息
本地事务状态:Producer 回调操作执行的结果为本地事务状态,其会发送给TC,而TC会再发送给TM。TM会根据TC发送来的本地事务状态来决定全局事务确认指令
消息回查:重新查询本地事务的执行状态
关于消息回查,有三个常见的属性设置。它们都在broker加载的配置文件中设置
transactionTimeout=20,指定TM在20秒内应将最终确认状态发送给TC,否则引发消息回查。默认为60秒
transactionCheckMax=5,指定最多回查5次,超过后将丢弃消息并记录错误日志。默认15次。
transactionCheckInterval=10,指定设置的多次消息回查的时间间隔为10秒。默认为60秒。
XA协议
XA(Unix Transaction)是一种分布式事务解决方案,一种分布式事务处理模式,是基于XA协议的。XA协议由Tuxedo(Transaction for Unix has been Extended for Distributed Operation,分布式操作扩展之后的Unix事务系统)首先提出的,并交给X/Open组织,作为资源管理器与事务管理器的接口标准。
XA模式中有三个重要组件:TC、TM、RM。
TC(Transaction Coordinator):事务协调者。维护全局和分支事务的状态,驱动全局事务提交或回滚。RocketMQ中Broker充当着TC
TM(Transaction Manager):事务管理器。定义全局事务的范围:开始全局事务、提交或回滚全局事务。它实际是全局事务的发起者。RocketMQ中事务消息的Producer充当着TM
RM(Resource Manager):资源管理器。管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。RocketMQ中事务消息的Producer及Broker均是RM
XA模式是一个典型的2PC,其执行原理如下:

TM向TC发起指令,开启一个全局事务。
根据业务要求,各个RM会逐个向TC注册分支事务,然后TC会逐个向RM发出预执行指令。
各个RM在接收到指令后会在进行本地事务预执行。
RM将预执行结果Report给TC。当然,这个结果可能是成功,也可能是失败。
TC在接收到各个RM的Report后会将汇总结果上报给TM,根据汇总结果TM会向TC发出确认指 令。 若所有结果都是成功响应,则向TC发送Global Commit指令。 只要有结果是失败响应,则向TC发送Global Rollback指令。
TC在接收到指令后再次向RM发送确认指令。

注意:
事务消息不支持延时消息
对于事务消息要做好幂等性检查,因为事务消息可能不止一次被消费(因为存在回滚后再提交的 情况)
2. 生产者业务接口

public interface TransactionMessageService {

    /**
     * 发送事务消息
     * @param id
     * @param message
     */
    void sendTransactionMessage(String id, String message);
}

3. 生产者业务接口实现类

@Service
public class TransactionMessageServiceImpl implements TransactionMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private static final Logger logger = LoggerFactory.getLogger(TransactionMessageServiceImpl.class);

    @Override
    public void sendTransactionMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("transaction-message-topic:transaction-tags", strMessage, id);
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            logger.info("发送事务消息成功!消息ID为:{}", result.getMsgId());
        }
    }
}

4. 生产事务监听器类

@RocketMQTransactionListener
public class TransactionListener implements RocketMQLocalTransactionListener {

    private static final Logger logger = LoggerFactory.getLogger(TransactionListener.class);

    @Autowired
    private SysUserInfoService userInfoService;
    @Autowired
    private SysLogInfoService logInfoService;

    /**
     * 执行本地事务
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            String rocketmqKeys = msg.getHeaders().get("rocketmq_KEYS").toString();
            logger.info("事务消息key为:{}", rocketmqKeys);
            String payload = new String((byte[]) msg.getPayload());
            logger.info("事务消息为:{}", payload);
            SysUserInfo userInfo = JSONObject.parseObject(payload, SysUserInfo.class);
            userInfoService.saveUserInfoAndLogInfo(userInfo, rocketmqKeys);
        } catch (Exception e) {
            logger.error("发送事务消息异常!异常信息为:{}", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }

    /**
     * 校验本地事务
     * @param msg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String rocketmqKeys = msg.getHeaders().get("rocketmq_KEYS").toString();
        logger.info("事务消息key为:{}", rocketmqKeys);
        SysLogInfo logInfo = logInfoService.selectLogInfoByKey(rocketmqKeys);
        logger.info("查询日志信息为:{}", JSON.toJSONString(logInfo));
        if (null == logInfo) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }
}

5. 消费者类

@Component
@RocketMQMessageListener(topic = "transaction-message-topic", consumerGroup = "transaction-consumer-group")
public class TransactionMessageListener implements RocketMQListener<String> {

    private static final Logger logger = LoggerFactory.getLogger(TransactionMessageListener.class);

    @Override
    public void onMessage(String message) {
        logger.info("接收到事务消息:{}", message);
    }
}

6. 测试

@Test
void transactionMessage() {
    String uuid = UUID.randomUUID().toString().replace("-", "");
    SysUserInfo userInfo = new SysUserInfo();
    userInfo.setUserId(1001L);
    userInfo.setAddr("重庆渝北");
    userInfo.setAge(18);
    userInfo.setUserName("xiaofeng");
    userInfo.setPhone("13509877890");
    userInfo.setSex(1);
    userInfo.setStatus(1);
    transactionMessageService.sendTransactionMessage(uuid, JSON.toJSONString(userInfo));
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 Java 网络架构
|
16天前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
29天前
|
消息中间件 监控 供应链
深度剖析 RocketMQ 事务消息!
本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。
61 2
|
3月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
3月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
666 2
|
3月前
|
消息中间件 Java Maven
|
3月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
3月前
|
消息中间件 监控 RocketMQ
分布式事务实现方案:一文详解RocketMQ事务消息
分布式事务实现方案:一文详解RocketMQ事务消息
|
3月前
|
消息中间件 监控 安全
大事务+MQ普通消息线上问题排查过程技术分享
【8月更文挑战第23天】在复杂的企业级系统中,大事务与消息队列(MQ)的结合使用是一种常见的架构设计,用于解耦系统、提升系统响应性和扩展性。然而,这种设计也带来了其特有的挑战,特别是在处理退款业务等涉及金融交易的高敏感场景时。本文将围绕“大事务+MQ普通消息线上问题排查过程”这一主题,分享一次实际工作中的技术排查经验,旨在为大家提供可借鉴的解决思路和方法。
54 0
|
4月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
290 1