五分钟带你玩转rocketMQ(十)实战分布式事务

简介: 什么是事务性消息?它可以看作是两阶段提交消息实现,以确保分布式系统中的最终一致性。事务消息确保本地事务的执行和消息的发送可以原子化地执行


什么是事务性消息?

它可以看作是两阶段提交消息实现,以确保分布式系统中的最终一致性。事务消息确保本地事务的执行和消息的发送可以原子化地执行。

使用限制

(1)事务性消息没有调度和批处理支持。

(2)为了避免单个消息被检查过多而导致半队列消息累积,我们将单个消息的检查次数默认限制为15次,但是如果一条消息被检查过,用户可以通过更改代理配置中的“transactionCheckMax”参数来更改此限制“transactionCheckMax”次,默认情况下,broker将丢弃此消息并同时打印错误日志。用户可以通过重写“AbstractTransactionCheckListener”类来更改此行为。

(3)事务消息将在代理配置中的参数“transactionTimeout”确定的一段时间后进行检查。用户也可以在发送事务性消息时通过设置用户属性“CHECK_IMMUNITY_TIME_IN_SECONDS”来更改此限制,此参数优先于“transactionMsgTimeout”参数。

(4)事务性消息可能被多次检查或使用。

(5)提交给用户目标主题的不可信消息可能失败。目前,这取决于日志记录。RocketMQ本身的高可用机制保证了高可用性。如果要确保事务消息不会丢失,并且保证事务完整性,建议使用同步双写。机制。

(6)事务性消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务性消息允许向后查询。MQ服务器按其生产者id查询客户机。

应用程序

1、交易状态

事务性消息有三种状态:

(1)TransactionStatus.CommitTransaction:commit transaction,表示允许消费者使用此消息。

(2)TransactionStatus.rollback transaction:回滚事务,表示消息将被删除,不允许使用。

(3)TransactionStatus.Unknown:中间状态,表示需要MQ进行回查以确定状态。

原理图

image.png

假设有A,B两服务,A服务为优惠券服务,B服务为结算服务。

1.A服务发给B服务一个半消息(B服务消费不到)

2.当B服务返回接收成功后 A服务开展优惠券逻辑

3.优惠券数据落到数据库 此时有3个结果

   1.A服务成功 发送B服务commit 修改B服务的消息状态 B服务可以执行结算

   2.A服务失败发送rollback 修改B服务状态为失败 删除消息

   3.网络原因 需要进行事务回查

4.B服务执行逻辑(如果B服务失败 需要手动判断 rocketMq的官方文档提出不提出解决方案)

代码处理

(1)创建事务生产者

使用TransactionMQProducer类创建producer客户机,并指定唯一的producerGroup,然后可以设置自定义线程池来处理检查请求。在执行本地事务后,需要根据执行结果回复MQ,回复状态在上一节描述。

1. @SpringBootConfiguration
2. public class MQProducerConfiguration {
3. 
4. public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);
5. /**
6.      * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
7.      */
8. @Value("${rocketmq.producer.groupName}")
9. private String groupName;
10. @Value("${rocketmq.producer.namesrvAddr}")
11. private String namesrvAddr;
12. /**
13.      * 消息最大大小,默认4M
14.      */
15. @Value("${rocketmq.producer.maxMessageSize}")
16. private Integer maxMessageSize;
17. /**
18.      * 消息发送超时时间,默认3秒
19.      */
20. @Value("${rocketmq.producer.sendMsgTimeout}")
21. private Integer sendMsgTimeout;
22. /**
23.      * 消息发送失败重试次数,默认2次
24.      */
25. @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
26. private Integer retryTimesWhenSendFailed;
27. 
28. @Bean(name = "customRocketMQProducer")
29. public DefaultMQProducer getRocketMQProducer() throws Exception {
30. if (StringUtils.isEmpty(this.groupName)) {
31. throw new Exception("groupName is blank");
32.         }
33. if (StringUtils.isEmpty(this.namesrvAddr)) {
34. throw new Exception("nameServerAddr is blank");
35.         }
36. 
37. //事务消息需要
38.         TransactionListener transactionListener = new TransactionListenerImpl();
39.         TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
40.         ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
41. @Override
42. public Thread newThread(Runnable r) {
43.                 Thread thread = new Thread(r);
44.                 thread.setName("client-transaction-msg-check-thread");
45. return thread;
46.             }
47.         });
48. //事务消息需要
49.         producer.setExecutorService(executorService);
50.         producer.setTransactionListener(transactionListener);
51. 
52.         producer.setNamesrvAddr(this.namesrvAddr);
53. //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
54. //producer.setInstanceName(instanceName);
55. if (this.maxMessageSize != null) {
56.             producer.setMaxMessageSize(this.maxMessageSize);
57.         }
58. if (this.sendMsgTimeout != null) {
59.             producer.setSendMsgTimeout(this.sendMsgTimeout);
60.         }
61. //如果发送消息失败,设置重试次数,默认为2次
62. if (this.retryTimesWhenSendFailed != null) {
63.             producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
64.         }
65. try {
66.             producer.start();
67.             LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]"
68.                     , this.groupName, this.namesrvAddr));
69.         } catch (MQClientException e) {
70.             LOGGER.error(String.format("producer is error {}"
71.                     , e.getMessage(), e));
72. throw new Exception(e);
73.         }
74. return producer;
75.     }
76. }

(2)实现TransactionListener接口

“executeLocalTransaction”方法用于在发送半消息成功时执行本地事务。它返回上一节中提到的三种事务状态之一。

“checkLocalTransaction”方法用于检查本地事务状态并响应MQ检查请求。它还返回上一节中提到的三种事务状态之一。

1. package cn.baocl.rocketmq.controllor;
2. 
3. import org.apache.rocketmq.client.producer.LocalTransactionState;
4. import org.apache.rocketmq.client.producer.TransactionListener;
5. import org.apache.rocketmq.common.message.Message;
6. import org.apache.rocketmq.common.message.MessageExt;
7. 
8. import java.util.concurrent.ConcurrentHashMap;
9. import java.util.concurrent.atomic.AtomicInteger;
10. 
11. public class TransactionListenerImpl implements TransactionListener {
12. 
13. private AtomicInteger transactionIndex = new AtomicInteger(0);
14. 
15. private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
16. 
17. //执行本地事务(一般都是写一张日志表,放入本次交易的TransactionId,判断本次交易是否成功)
18. @Override
19. public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
20. //执行业务
21. //do thing
22. if(true){
23. //业务执行成功
24. //插入日志表
25. //返回成功
26. return LocalTransactionState.COMMIT_MESSAGE;
27.         }else{
28. return LocalTransactionState.ROLLBACK_MESSAGE;
29.         }
30.     }
31. 
32. //回查事务 当发送给broken 本地事务的状态状态丢失了 broken会再次根据id验证本地事务(查询日志表是否有日志)
33. @Override
34. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
35. String transactionId = msg.getTransactionId();
36. //根据transactionId查询日志表条数 如果条数大于0 就表示已经执行过以上逻辑了
37. Integer countStatus = Integer.valueOf(transactionId);
38. if (countStatus > 0) {
39. return LocalTransactionState.COMMIT_MESSAGE;
40.         } else {
41. return LocalTransactionState.ROLLBACK_MESSAGE;
42.         }
43.     }
44. }

(3)调用

1. package cn.baocl.rocketmq.controllor;
2. 
3. import org.apache.rocketmq.client.exception.MQBrokerException;
4. import org.apache.rocketmq.client.exception.MQClientException;
5. import org.apache.rocketmq.client.producer.DefaultMQProducer;
6. import org.apache.rocketmq.client.producer.SendResult;
7. import org.apache.rocketmq.common.message.Message;
8. import org.apache.rocketmq.remoting.common.RemotingHelper;
9. import org.apache.rocketmq.remoting.exception.RemotingException;
10. import org.slf4j.Logger;
11. import org.slf4j.LoggerFactory;
12. import org.springframework.web.bind.annotation.RequestMapping;
13. import org.springframework.web.bind.annotation.RestController;
14. 
15. import javax.annotation.Resource;
16. import java.io.UnsupportedEncodingException;
17. 
18. 
19. @RestController
20. @RequestMapping("/test")
21. public class TestControllor {
22. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
23. 
24. /**
25.      * 使用RocketMq的生产者
26.      */
27. @Resource(name = "customRocketMQProducer")
28. private DefaultMQProducer defaultMQProducer;
29. 
30. @RequestMapping("/send")
31. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
32.         String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
33. 
34. for (int i = 0; i < 10; i++) {
35. try {
36. Message msg =
37. new Message("DemoTopic", tags[i % tags.length], "KEY" + i,
38.                                 ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
39. //sendMessageInTransaction :1).检查transactionListener是否存在
40. //                          2).调用父类执行事务消息发送
41. SendResult sendResult = defaultMQProducer.sendMessageInTransaction(msg, null);
42.                 System.out.println("发送消息为");
43.                 System.out.printf("%s%n", sendResult);
44.                 Thread.sleep(10);
45.             } catch (MQClientException | UnsupportedEncodingException e) {
46.                 e.printStackTrace();
47.             }
48.         }
49. 
50. for (int i = 0; i < 100000; i++) {
51.             Thread.sleep(1000);
52.         }
53.     }
54. }

阿里忽略了消费者报错了的情况,可以根据业务自定义

参考:https://my.oschina.net/u/3768341/blog/1616193

       https://segmentfault.com/a/1190000019755235?utm_source=tag-newest

       https://blog.csdn.net/hosaos/article/details/90050276

       https://www.jianshu.com/p/cc5c10221aa1


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
消息中间件 RocketMQ 微服务
RocketMQ 分布式事务消息实战指南
RocketMQ 分布式事务消息实战指南
414 1
|
1月前
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
35 0
|
27天前
|
消息中间件 Java RocketMQ
RocketMQ实战教程之RocketMQ安装
这是一篇关于RocketMQ安装的实战教程,主要介绍了在CentOS系统上使用传统安装和Docker两种方式安装RocketMQ。首先,系统需要是64位,并且已经安装了JDK 1.8。传统安装包括下载安装包,解压并启动NameServer和Broker。Docker安装则涉及安装docker和docker-compose,然后通过docker-compose.yaml文件配置并启动服务。教程还提供了启动命令和解决问题的提示。
|
1月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。
|
1月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
43 0
|
13天前
|
消息中间件 中间件 程序员
分布式事务大揭秘:使用MQ实现最终一致性
本文由小米分享,介绍分布式事务中的MQ最终一致性实现,以RocketMQ为例。RocketMQ的事务消息机制包括准备消息、本地事务执行、确认/回滚消息及事务状态检查四个步骤。这种机制通过消息队列协调多系统操作,确保数据最终一致。MQ最终一致性具有系统解耦、提高可用性和灵活事务管理等优点,广泛应用于分布式系统中。文章还讨论了RocketMQ的事务消息处理流程和失败情况下的处理策略,帮助读者理解如何在实际应用中解决分布式事务问题。
21 6
|
10天前
|
消息中间件 监控 调度
构建Python中的分布式系统结合Celery与RabbitMQ
在当今的软件开发中,构建高效的分布式系统是至关重要的。Python作为一种流行的编程语言,提供了许多工具和库来帮助开发人员构建分布式系统。其中,Celery和RabbitMQ是两个强大的工具,它们结合在一起可以为你的Python应用程序提供可靠的异步任务队列和消息传递机制。
|
24天前
|
消息中间件 存储 Apache
RocketMQ实战教程之常见概念和模型
Apache RocketMQ 实战教程介绍了其核心概念和模型。消息是基本的数据传输单元,主题是消息的分类容器,支持字节、数字和短划线命名,最长64个字符。消息类型包括普通、顺序、事务和定时/延时消息。消息队列是实际存储和传输消息的容器,是主题的分区。消费者分组是一组行为一致的消费者的逻辑集合,也有命名限制。此外,文档还提到了一些使用约束和建议,如主题和消费者组名的命名规则,消息大小限制,请求超时时间等。RocketMQ 提供了多种消息模型,包括发布/订阅模型,有助于理解和优化消息处理。
|
25天前
|
消息中间件 存储 Java
RocketMQ实战教程之NameServer与BrokerServer
这是一个关于RocketMQ实战教程的概要,主要讨论NameServer和BrokerServer的角色。NameServer负责管理所有BrokerServer,而BrokerServer存储和传输消息。生产者和消费者通过NameServer找到合适的Broker进行交互,不需要直接知道Broker的具体信息。工作流程包括生产者向NameServer查询后发送消息到Broker,以及消费者同样通过NameServer获取消息进行消费。这种设计类似于服务注册中心的概念,便于系统扩展和集群管理。
|
30天前
|
消息中间件 Cloud Native 自动驾驶
RocketMQ实战教程之MQ简介
Apache RocketMQ 是一个云原生的消息流平台,支持消息、事件和流处理,适用于云边端一体化场景。官网提供详细文档和下载资源:[RocketMQ官网](https://rocketmq.apache.org/zh/)。示例中提到了RocketMQ在物联网(如小米台灯)和自动驾驶等领域的应用。要开始使用,可从[下载页面](https://rocketmq.apache.org/zh/download)获取软件。

热门文章

最新文章