RocketMQ事务消息

简介: 事务消息

什么是事务消息?

事务消息可以认为是两阶段提交消息的实现,用来确保分布式系统中的最终一致性。事务消息保证执行本地事务的执行和消息发送的原子性。

使用约束

  1. 消息事务不支持定时和批量。
  2. 为了避免一个消息被多次检查,导致半数队列消息堆积,我们限制单个消息的默认检查次数为15次,但用户可以改变这个限制通过修改broker的配置文件中的 transactionCheckMax参数。如果一个消息检查次数超过transactionCheckMax,默认情况下,broker将会丢弃这个消息并同时打印错误日志。用户可以改变这种行为通过覆盖 AbstractTransactionCheckListener 类。
  3. 由broker的配置文件中参数 transactionTimeou t决定的特点时间段之后检查事务消息。当发送事务消息时,通过设置用户配置CHECK_IMMUNITY_TIME_IN_SECONDS,用户也可以改变这个限制。这个参数优先于 transactionMsgTimeout 参数。
  4. 一个事务消息可能被检查或消费多次。
  5. 提交过的消息重新放到用户目标主题可能会失败。目前,它依赖日志记录。通过RocketMQ自身高可用机制确保高可用。如果你想确保事务消息不丢失并且保证事务完整性,建议使用同步双写机制。
  6. 事务消息的生产者ID不能与其他类型消息的生产者ID共享。不像其他类型消息,事务消息允许回查。MQ server通过生产者ID查询客户端。

应用

事务状态

三种事务消息状态:

  • TransactionStatus.CommitTransaction:提交事务,允许消费者消费这个消息。
  • TransactionStatus.RollbackTransaction:回滚事务,消息将会被删除或不再允许消费。
  • TransactionStatus.Unknown:中间状态,MQ需要重新检查来确定状态。

发送事务消息

创建事务生产者

使用TransactionMQProducer类创建producer客户端,指定唯一producerGroup,你可以设置一个自定义线程池来处理检查请求。执行本地事务后,你需要根据执行结果恢复MQ,并回复上面描述的状态。

实现事务监听器接口

当发送半消息成功时,使用 executeLocalTransaction 方法执行本地事务。它返回三种事务状态的一种。
使用 checkLocalTransaction 方法检查本地事务状态和响应MQ检查请求。它同样返回三种事务状态的一种。

代码

@RequestMapping(value = "/sendTransaction")
    public String sendTransaction(HttpServletRequest request)throws Exception{
        TransactionMQProducer producer=new TransactionMQProducer("sendTransaction_producer_group");
        try{
            String mesage=request.getParameter("message");
            producer.setNamesrvAddr(NAMESERVER_ADDR);

            //设置线程池
            producer.setExecutorService(new ThreadPoolExecutor(2,5,100, TimeUnit.SECONDS,new ArrayBlockingQueue(2000),new ThreadFactory(){
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread=new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
            }));
            //事务监听器
            producer.setTransactionListener(new TransactionListener() {
                private AtomicInteger transactionIndex = new AtomicInteger(0);
                private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

                @Override
                public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                    int value=transactionIndex.getAndIncrement();
                    Integer status=value%3;
                    localTrans.put(msg.getTransactionId(),status);
                    return LocalTransactionState.UNKNOW;
                }

                @Override
                public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                    Integer  status =localTrans.get(msg.getTransactionId());
                    if (null != status) {
                        switch (status) {
                            //case 0:
                                //return LocalTransactionState.UNKNOW;
                            case 1:
                                return LocalTransactionState.COMMIT_MESSAGE;
                            case 2:
                                return LocalTransactionState.ROLLBACK_MESSAGE;
                            default:
                                return LocalTransactionState.UNKNOW;
                        }
                    }
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
            });

            producer.start();
            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for(int i=0;i<10;i++){
                Message msg=new Message("Test_filter",tags[i % tags.length],"KEY" + i,(mesage+i).getBytes());
                msg.putUserProperty("a",String.valueOf(i));
                SendResult sendResult = producer.sendMessageInTransaction(msg,null);
                logger.error("返回结果:"+sendResult);
                Thread.sleep(10);
            }
            return "发送成功";
        }catch (Exception e){
            logger.error(e.getMessage());
            return "发送失败";
        }finally {
            for (int i = 0; i < 100000; i++) {
                Thread.sleep(1000);
            }
            producer.shutdown();
        }
    }
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
5月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
2月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
2月前
|
消息中间件 监控 RocketMQ
分布式事务实现方案:一文详解RocketMQ事务消息
分布式事务实现方案:一文详解RocketMQ事务消息
|
2月前
|
消息中间件 监控 安全
大事务+MQ普通消息线上问题排查过程技术分享
【8月更文挑战第23天】在复杂的企业级系统中,大事务与消息队列(MQ)的结合使用是一种常见的架构设计,用于解耦系统、提升系统响应性和扩展性。然而,这种设计也带来了其特有的挑战,特别是在处理退款业务等涉及金融交易的高敏感场景时。本文将围绕“大事务+MQ普通消息线上问题排查过程”这一主题,分享一次实际工作中的技术排查经验,旨在为大家提供可借鉴的解决思路和方法。
44 0
|
3月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
173 1
|
3月前
|
消息中间件 调度 RocketMQ
【RocketMQ系列六】RocketMQ事务消息
【RocketMQ系列六】RocketMQ事务消息
639 1
|
4月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
4月前
|
消息中间件 网络性能优化 RocketMQ
消息队列 MQ产品使用合集之本地事务还没有执行完就触发了回查是什么导致的
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
5月前
|
消息中间件 存储 Apache
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
613 2
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
下一篇
无影云桌面