RocketMQ第九章:手把手教老婆代码实现-事务消息生产者和事务消息消费者 及深入源码探索事务的消息回查

简介: RocketMQ第九章:手把手教老婆代码实现-事务消息生产者和事务消息消费者 及深入源码探索事务的消息回查

RocketMQ使用教程相关系列 目录image.png目录


第一节:介绍


RocketMQ事务消息介绍


事务消息流程介绍


使用限制


第二节:使用场景


第三节:代码实战


事务消息生产者


事务监听器


事务消息消费者


效果:


第四节:checkLocalTransaction不会触发


调整后的生产者


效果:


第二个问题,回查的次数和定时时间是多少?


科学验证:


第一节:介绍

RocketMQ事务消息介绍

在4.3.0版本后,有了事务消息这一个特性,对于分布式事务来说,最常见的还是二阶段提交协议。


事务消息流程介绍image.png上图说明了事务消息的大致方案,其中分为两个流程:


正常事务消息的发送及提交(黑线走的流程)

事务消息的补偿流程(黑线+红线走的流程)

1)正常事务消息的发送及提交


(1) 发送消息(half消息)。


(2) 服务端响应消息写入结果。


(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。


(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)


2)事务消息的补偿流程


补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况,具体流程如下:


(1) 发送消息(half消息)。


(2) 服务端响应消息写入结果。


(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。


(4)此时执行本地事务后,并没有执行Commit或Rollback操作


(5) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”(最多重试15次(由配置参数决定),超过了默认丢弃此消息)


(6) Producer收到回查消息,检查回查消息对应的本地事务的状态


(7) 根据本地事务状态,重新Commit或者Rollback


其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。


3)事务消息状态


事务消息共有三种状态,提交状态、回滚状态、中间状态:


TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。

TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。

TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

使用限制

1. 事务消息不支持延时消息和批量消息。


2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 `transactionCheckMax`参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = `transactionCheckMax` ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 `AbstractTransactionCheckListener` 类来修改这个行为。


3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 `transactionMsgTimeout` 参数。


4. 事务性消息可能不止一次被检查或消费。


5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。


6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。


第二节:使用场景

同步消息解决的问题是:消息一定投递成功(Broker 响应Send_OK状态码时才代表消息发送成功)


事务消息解决的问题是:本地事务+消息投递一起做


例子:李四要给张三转钱1万元。


同步消息


1、 银行发一个同步消息给MQ,给张三加钱1万元

2、MQ ack反馈发送成功了

3、银行给李四扣1万元

可能的问题,ack Send_OK之后,系统抛出异常,没有给李四扣钱,但是消息已经发送出去了,张三加钱成功了。

事务消息


image.png

1、银行发一个事务消息给MQ,给张三加钱1万元
2、Broker precommit成功,回调excuteCommit,真正执行李四扣款1万元
3、扣款成功ACK Commit给MQ
4、MQ收到Commit ACK时,commit消息,系统可以消费这个消息
如果系统扣款异常,则消息虽然prepareCommit在MQ中,但是对系统不可见。另外如果ACK网络丢失或者延时,MQ如果超时未接收到ACK,会发起重试确认到系统。

第三节:代码实战

事务消息生产者

/**
 * 发送事务消息
 */
public class Producer {
   public static void main(String[] args) throws Exception {
      // 1.创建消息生产者producer,并制定生产者组名
      TransactionMQProducer producer = new TransactionMQProducer("demo_transaction_group");
      // 2.指定Nameserver地址
      producer.setNamesrvAddr("192.168.88.131:9876");
      // 添加事务监听器
      TransactionListenerImpl transactionListener = new TransactionListenerImpl();
      producer.setTransactionListener(transactionListener);
      ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
               @Override
               public Thread newThread(Runnable r) {
                  Thread thread = new Thread(r);
                  thread.setName("client-transaction-msg-check-thread");
                  return thread;
               }
            });
      // 设置线程池
      producer.setExecutorService(executorService);
      // 3.启动producer
      producer.start();
      System.out.println("生产者启动");
      String[] tags = { "TAGA", "TAGB", "TAGC" };
      for (int i = 0; i < 3; i++) {
         // 4.创建消息对象,指定主题Topic、Tag和消息体
         /**
          * 参数一:消息主题Topic
          * 参数二:消息Tag
          * 参数三:消息内容
          */
         Message msg = new Message("TransactionTopic", tags[i], ("Hello xuzhu" + i).getBytes());
         // 5.发送消息
         SendResult result = producer.sendMessageInTransaction(msg, "hello-xuzhu_transaction");
         // 发送状态
         SendStatus status = result.getSendStatus();
         System.out.println("发送结果:" + result);
         System.out.println("发送结果状态:" + status);
         // 线程睡2秒
         TimeUnit.SECONDS.sleep(2);
      }
      // 6.关闭生产者producer
      producer.shutdown();
      System.out.println("生产者结束");
   }
}

image.png

事务消息消费者

public class Consumer {
   public static void main(String[] args) throws Exception {
      // 1.创建消费者Consumer,制定消费者组名
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_transaction_group");
      // 2.指定Nameserver地址
      consumer.setNamesrvAddr("192.168.88.131:9876");
      // 3.订阅主题Topic和Tag
      consumer.subscribe("TransactionTopic", "*");
      // 4.设置回调函数,处理消息
      consumer.registerMessageListener(new MessageListenerConcurrently() {
         // 接受消息内容
         @Override
         public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
               System.out.println(
                     "consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         }
      });
      // 5.启动消费者consumer
      consumer.start();
      System.out.println("消费者启动");
   }
}

效果:

从断点跟进,启动生产者时,会进入本地事务方法里

从本地事务方法里可知道,只有第一条数据有成功,一会看下,能消费几条数据image.pngimage.png第四节:checkLocalTransaction不会触发

细心的同学,可能会发现,事务监听器里有这么一个方法checkLocalTransaction,如果你断点,会发现怎么都执行不到这个方法。


原因是:事务一直没有rollback或者commit的时候才会触发回查


基于这个理论,我们调整下代码


调整后的生产者

/**
 * 发送事务消息
 */
public class Producer {
   public static void main(String[] args) throws Exception {
      // 1.创建消息生产者producer,并制定生产者组名
      TransactionMQProducer producer = new TransactionMQProducer("demo_transaction_group");
      // 2.指定Nameserver地址
      producer.setNamesrvAddr("192.168.88.131:9876");
      // 添加事务监听器
      TransactionListenerImpl transactionListener = new TransactionListenerImpl();
      producer.setTransactionListener(transactionListener);
      ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
               @Override
               public Thread newThread(Runnable r) {
                  Thread thread = new Thread(r);
                  thread.setName("client-transaction-msg-check-thread");
                  return thread;
               }
            });
      // 设置线程池
      producer.setExecutorService(executorService);
      // 3.启动producer
      producer.start();
      System.out.println("生产者启动");
      String[] tags = { "TAGC", "TAGA", "TAGB" };
      for (int i = 0; i < 3; i++) {
         // 4.创建消息对象,指定主题Topic、Tag和消息体
         /**
          * 参数一:消息主题Topic
          * 参数二:消息Tag
          * 参数三:消息内容
          */
         Message msg = new Message("TransactionTopic", tags[i], ("Hello xuzhu" + i).getBytes());
         // 5.发送消息
         SendResult result = producer.sendMessageInTransaction(msg, "hello-xuzhu_transaction");
         // 发送状态
         SendStatus status = result.getSendStatus();
         System.out.println("发送结果:" + result);
         System.out.println("发送结果状态:" + status);
         // 线程睡120秒
         TimeUnit.SECONDS.sleep(120);
      }
      // 6.关闭生产者producer
      producer.shutdown();
      System.out.println("生产者结束");
   }
}

image.png

第二个问题,回查的次数和定时时间是多少?

网上找的资料都说是默认15次,但定时时间没有资料有说明

不信这个邪了,我下载了rocketmq的源码,最后在BrokerConfig类中找到了答案image.pngimage.pngimage.png同学,你坚持到这里,说明有所收获了吧,给自己加个鸡腿,犒劳下自己。顺便给博主点关注+一键三连。感谢感谢


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
13天前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
5月前
|
消息中间件 网络安全 开发工具
消息队列 MQ产品使用合集之使用grpc proxy,生产者心跳并没有发送至Default中,如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
25天前
|
消息中间件 监控 供应链
深度剖析 RocketMQ 事务消息!
本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。
56 2
|
20天前
|
传感器 数据可视化 网络协议
DIY可视化整合MQTT生成UniApp源码
DIY可视化整合MQTT生成UniApp源码
36 0
|
3月前
|
消息中间件 负载均衡 API
RocketMQ生产者负载均衡(轮询机制)核心原理
文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。
|
3月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
3月前
|
物联网 C# Windows
看看如何使用 C# 代码让 MQTT 进行完美通信
看看如何使用 C# 代码让 MQTT 进行完美通信
499 0
|
3月前
|
Java
MQTT(EMQX) - Java 调用 MQTT Demo 代码
MQTT(EMQX) - Java 调用 MQTT Demo 代码
127 0
MQTT(EMQX) - Java 调用 MQTT Demo 代码
|
3月前
|
消息中间件 开发工具
【Azure Event Hub】原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使用Event Hub呢?
【Azure Event Hub】原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使用Event Hub呢?
|
3月前
|
消息中间件 监控 RocketMQ
分布式事务实现方案:一文详解RocketMQ事务消息
分布式事务实现方案:一文详解RocketMQ事务消息

热门文章

最新文章