SpringBoot整合RocketMQ发送事务消息

简介: RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式

事务消息
1. 概念
分布式事务:一次操作由若干分支操作组成,这些分支操作分属不同应用,分布在不同服务器上。分布式事务需要保证这些分支操作要么全部成功,要么全部失败,分布式事务于普通事务一样,就是为了保证操作结果的一致性
事务消息:RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式
半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了Broker,但是Broker未收到最终确认指令,此时该消息被标记成“暂不能投递”状态,即不能被消费者看到。处于该种状态下的消息即半事务消息
本地事务状态:Producer 回调操作执行的结果为本地事务状态,其会发送给TC,而TC会再发送给TM。TM会根据TC发送来的本地事务状态来决定全局事务确认指令
消息回查:重新查询本地事务的执行状态
关于消息回查,有三个常见的属性设置。它们都在broker加载的配置文件中设置
transactionTimeout=20,指定TM在20秒内应将最终确认状态发送给TC,否则引发消息回查。默认为60秒
transactionCheckMax=5,指定最多回查5次,超过后将丢弃消息并记录错误日志。默认15次。
transactionCheckInterval=10,指定设置的多次消息回查的时间间隔为10秒。默认为60秒。
XA协议
XA(Unix Transaction)是一种分布式事务解决方案,一种分布式事务处理模式,是基于XA协议的。XA协议由Tuxedo(Transaction for Unix has been Extended for Distributed Operation,分布式操作扩展之后的Unix事务系统)首先提出的,并交给X/Open组织,作为资源管理器与事务管理器的接口标准。
XA模式中有三个重要组件:TC、TM、RM。
TC(Transaction Coordinator):事务协调者。维护全局和分支事务的状态,驱动全局事务提交或回滚。RocketMQ中Broker充当着TC
TM(Transaction Manager):事务管理器。定义全局事务的范围:开始全局事务、提交或回滚全局事务。它实际是全局事务的发起者。RocketMQ中事务消息的Producer充当着TM
RM(Resource Manager):资源管理器。管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。RocketMQ中事务消息的Producer及Broker均是RM
XA模式是一个典型的2PC,其执行原理如下:

TM向TC发起指令,开启一个全局事务。
根据业务要求,各个RM会逐个向TC注册分支事务,然后TC会逐个向RM发出预执行指令。
各个RM在接收到指令后会在进行本地事务预执行。
RM将预执行结果Report给TC。当然,这个结果可能是成功,也可能是失败。
TC在接收到各个RM的Report后会将汇总结果上报给TM,根据汇总结果TM会向TC发出确认指 令。 若所有结果都是成功响应,则向TC发送Global Commit指令。 只要有结果是失败响应,则向TC发送Global Rollback指令。
TC在接收到指令后再次向RM发送确认指令。

注意:
事务消息不支持延时消息
对于事务消息要做好幂等性检查,因为事务消息可能不止一次被消费(因为存在回滚后再提交的 情况)
2. 生产者业务接口

public interface TransactionMessageService {

    /**
     * 发送事务消息
     * @param id
     * @param message
     */
    void sendTransactionMessage(String id, String message);
}

3. 生产者业务接口实现类

@Service
public class TransactionMessageServiceImpl implements TransactionMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private static final Logger logger = LoggerFactory.getLogger(TransactionMessageServiceImpl.class);

    @Override
    public void sendTransactionMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("transaction-message-topic:transaction-tags", strMessage, id);
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            logger.info("发送事务消息成功!消息ID为:{}", result.getMsgId());
        }
    }
}

4. 生产事务监听器类

@RocketMQTransactionListener
public class TransactionListener implements RocketMQLocalTransactionListener {

    private static final Logger logger = LoggerFactory.getLogger(TransactionListener.class);

    @Autowired
    private SysUserInfoService userInfoService;
    @Autowired
    private SysLogInfoService logInfoService;

    /**
     * 执行本地事务
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            String rocketmqKeys = msg.getHeaders().get("rocketmq_KEYS").toString();
            logger.info("事务消息key为:{}", rocketmqKeys);
            String payload = new String((byte[]) msg.getPayload());
            logger.info("事务消息为:{}", payload);
            SysUserInfo userInfo = JSONObject.parseObject(payload, SysUserInfo.class);
            userInfoService.saveUserInfoAndLogInfo(userInfo, rocketmqKeys);
        } catch (Exception e) {
            logger.error("发送事务消息异常!异常信息为:{}", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }

    /**
     * 校验本地事务
     * @param msg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String rocketmqKeys = msg.getHeaders().get("rocketmq_KEYS").toString();
        logger.info("事务消息key为:{}", rocketmqKeys);
        SysLogInfo logInfo = logInfoService.selectLogInfoByKey(rocketmqKeys);
        logger.info("查询日志信息为:{}", JSON.toJSONString(logInfo));
        if (null == logInfo) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }
}

5. 消费者类

@Component
@RocketMQMessageListener(topic = "transaction-message-topic", consumerGroup = "transaction-consumer-group")
public class TransactionMessageListener implements RocketMQListener<String> {

    private static final Logger logger = LoggerFactory.getLogger(TransactionMessageListener.class);

    @Override
    public void onMessage(String message) {
        logger.info("接收到事务消息:{}", message);
    }
}

6. 测试

@Test
void transactionMessage() {
    String uuid = UUID.randomUUID().toString().replace("-", "");
    SysUserInfo userInfo = new SysUserInfo();
    userInfo.setUserId(1001L);
    userInfo.setAddr("重庆渝北");
    userInfo.setAge(18);
    userInfo.setUserName("xiaofeng");
    userInfo.setPhone("13509877890");
    userInfo.setSex(1);
    userInfo.setStatus(1);
    transactionMessageService.sendTransactionMessage(uuid, JSON.toJSONString(userInfo));
}
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3天前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
24 1
|
3天前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的广播交换机(SpringAMQP 实现Fanout广播交换机)
SpringBoot实现RabbitMQ的广播交换机(SpringAMQP 实现Fanout广播交换机)
19 2
|
3天前
|
消息中间件 JSON Java
RabbitMQ的springboot项目集成使用-01
RabbitMQ的springboot项目集成使用-01
|
3天前
|
消息中间件 Java Spring
Springboot 集成Rabbitmq之延时队列
Springboot 集成Rabbitmq之延时队列
7 0
|
3天前
|
消息中间件 Java
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
35 1
|
3天前
|
消息中间件 安全 Java
SpringBoot基于RabbitMQ实现消息可靠性
SpringBoot基于RabbitMQ实现消息可靠性
38 0
|
3天前
|
消息中间件 Java
SpringBoot实现RabbitMQ的通配符交换机(SpringAMQP 实现Topic交换机)
SpringBoot实现RabbitMQ的通配符交换机(SpringAMQP 实现Topic交换机)
11 1
|
3天前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的定向交换机(SpringAMQP 实现Direct定向交换机)
SpringBoot实现RabbitMQ的定向交换机(SpringAMQP 实现Direct定向交换机)
21 1
|
3天前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的WorkQueue(SpringAMQP 实现WorkQueue)
SpringBoot实现RabbitMQ的WorkQueue(SpringAMQP 实现WorkQueue)
21 2
QGS
|
3天前
|
NoSQL 关系型数据库 MySQL
手拉手Springboot+RocketMQ+Redis抢单实现10W级QPS
手拉手Springboot+RocketMQ+Redis抢单实现10W级QPS
QGS
34 3