五分钟带你玩转rocketMQ(十)实战分布式事务

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 什么是事务性消息?它可以看作是两阶段提交消息实现,以确保分布式系统中的最终一致性。事务消息确保本地事务的执行和消息的发送可以原子化地执行


什么是事务性消息?

它可以看作是两阶段提交消息实现,以确保分布式系统中的最终一致性。事务消息确保本地事务的执行和消息的发送可以原子化地执行。

使用限制

(1)事务性消息没有调度和批处理支持。

(2)为了避免单个消息被检查过多而导致半队列消息累积,我们将单个消息的检查次数默认限制为15次,但是如果一条消息被检查过,用户可以通过更改代理配置中的“transactionCheckMax”参数来更改此限制“transactionCheckMax”次,默认情况下,broker将丢弃此消息并同时打印错误日志。用户可以通过重写“AbstractTransactionCheckListener”类来更改此行为。

(3)事务消息将在代理配置中的参数“transactionTimeout”确定的一段时间后进行检查。用户也可以在发送事务性消息时通过设置用户属性“CHECK_IMMUNITY_TIME_IN_SECONDS”来更改此限制,此参数优先于“transactionMsgTimeout”参数。

(4)事务性消息可能被多次检查或使用。

(5)提交给用户目标主题的不可信消息可能失败。目前,这取决于日志记录。RocketMQ本身的高可用机制保证了高可用性。如果要确保事务消息不会丢失,并且保证事务完整性,建议使用同步双写。机制。

(6)事务性消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务性消息允许向后查询。MQ服务器按其生产者id查询客户机。

应用程序

1、交易状态

事务性消息有三种状态:

(1)TransactionStatus.CommitTransaction:commit transaction,表示允许消费者使用此消息。

(2)TransactionStatus.rollback transaction:回滚事务,表示消息将被删除,不允许使用。

(3)TransactionStatus.Unknown:中间状态,表示需要MQ进行回查以确定状态。

原理图

image.png

假设有A,B两服务,A服务为优惠券服务,B服务为结算服务。

1.A服务发给B服务一个半消息(B服务消费不到)

2.当B服务返回接收成功后 A服务开展优惠券逻辑

3.优惠券数据落到数据库 此时有3个结果

   1.A服务成功 发送B服务commit 修改B服务的消息状态 B服务可以执行结算

   2.A服务失败发送rollback 修改B服务状态为失败 删除消息

   3.网络原因 需要进行事务回查

4.B服务执行逻辑(如果B服务失败 需要手动判断 rocketMq的官方文档提出不提出解决方案)

代码处理

(1)创建事务生产者

使用TransactionMQProducer类创建producer客户机,并指定唯一的producerGroup,然后可以设置自定义线程池来处理检查请求。在执行本地事务后,需要根据执行结果回复MQ,回复状态在上一节描述。

1. @SpringBootConfiguration
2. public class MQProducerConfiguration {
3. 
4. public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);
5. /**
6.      * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
7.      */
8. @Value("${rocketmq.producer.groupName}")
9. private String groupName;
10. @Value("${rocketmq.producer.namesrvAddr}")
11. private String namesrvAddr;
12. /**
13.      * 消息最大大小,默认4M
14.      */
15. @Value("${rocketmq.producer.maxMessageSize}")
16. private Integer maxMessageSize;
17. /**
18.      * 消息发送超时时间,默认3秒
19.      */
20. @Value("${rocketmq.producer.sendMsgTimeout}")
21. private Integer sendMsgTimeout;
22. /**
23.      * 消息发送失败重试次数,默认2次
24.      */
25. @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
26. private Integer retryTimesWhenSendFailed;
27. 
28. @Bean(name = "customRocketMQProducer")
29. public DefaultMQProducer getRocketMQProducer() throws Exception {
30. if (StringUtils.isEmpty(this.groupName)) {
31. throw new Exception("groupName is blank");
32.         }
33. if (StringUtils.isEmpty(this.namesrvAddr)) {
34. throw new Exception("nameServerAddr is blank");
35.         }
36. 
37. //事务消息需要
38.         TransactionListener transactionListener = new TransactionListenerImpl();
39.         TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
40.         ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
41. @Override
42. public Thread newThread(Runnable r) {
43.                 Thread thread = new Thread(r);
44.                 thread.setName("client-transaction-msg-check-thread");
45. return thread;
46.             }
47.         });
48. //事务消息需要
49.         producer.setExecutorService(executorService);
50.         producer.setTransactionListener(transactionListener);
51. 
52.         producer.setNamesrvAddr(this.namesrvAddr);
53. //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
54. //producer.setInstanceName(instanceName);
55. if (this.maxMessageSize != null) {
56.             producer.setMaxMessageSize(this.maxMessageSize);
57.         }
58. if (this.sendMsgTimeout != null) {
59.             producer.setSendMsgTimeout(this.sendMsgTimeout);
60.         }
61. //如果发送消息失败,设置重试次数,默认为2次
62. if (this.retryTimesWhenSendFailed != null) {
63.             producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
64.         }
65. try {
66.             producer.start();
67.             LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]"
68.                     , this.groupName, this.namesrvAddr));
69.         } catch (MQClientException e) {
70.             LOGGER.error(String.format("producer is error {}"
71.                     , e.getMessage(), e));
72. throw new Exception(e);
73.         }
74. return producer;
75.     }
76. }

(2)实现TransactionListener接口

“executeLocalTransaction”方法用于在发送半消息成功时执行本地事务。它返回上一节中提到的三种事务状态之一。

“checkLocalTransaction”方法用于检查本地事务状态并响应MQ检查请求。它还返回上一节中提到的三种事务状态之一。

1. package cn.baocl.rocketmq.controllor;
2. 
3. import org.apache.rocketmq.client.producer.LocalTransactionState;
4. import org.apache.rocketmq.client.producer.TransactionListener;
5. import org.apache.rocketmq.common.message.Message;
6. import org.apache.rocketmq.common.message.MessageExt;
7. 
8. import java.util.concurrent.ConcurrentHashMap;
9. import java.util.concurrent.atomic.AtomicInteger;
10. 
11. public class TransactionListenerImpl implements TransactionListener {
12. 
13. private AtomicInteger transactionIndex = new AtomicInteger(0);
14. 
15. private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
16. 
17. //执行本地事务(一般都是写一张日志表,放入本次交易的TransactionId,判断本次交易是否成功)
18. @Override
19. public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
20. //执行业务
21. //do thing
22. if(true){
23. //业务执行成功
24. //插入日志表
25. //返回成功
26. return LocalTransactionState.COMMIT_MESSAGE;
27.         }else{
28. return LocalTransactionState.ROLLBACK_MESSAGE;
29.         }
30.     }
31. 
32. //回查事务 当发送给broken 本地事务的状态状态丢失了 broken会再次根据id验证本地事务(查询日志表是否有日志)
33. @Override
34. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
35. String transactionId = msg.getTransactionId();
36. //根据transactionId查询日志表条数 如果条数大于0 就表示已经执行过以上逻辑了
37. Integer countStatus = Integer.valueOf(transactionId);
38. if (countStatus > 0) {
39. return LocalTransactionState.COMMIT_MESSAGE;
40.         } else {
41. return LocalTransactionState.ROLLBACK_MESSAGE;
42.         }
43.     }
44. }

(3)调用

1. package cn.baocl.rocketmq.controllor;
2. 
3. import org.apache.rocketmq.client.exception.MQBrokerException;
4. import org.apache.rocketmq.client.exception.MQClientException;
5. import org.apache.rocketmq.client.producer.DefaultMQProducer;
6. import org.apache.rocketmq.client.producer.SendResult;
7. import org.apache.rocketmq.common.message.Message;
8. import org.apache.rocketmq.remoting.common.RemotingHelper;
9. import org.apache.rocketmq.remoting.exception.RemotingException;
10. import org.slf4j.Logger;
11. import org.slf4j.LoggerFactory;
12. import org.springframework.web.bind.annotation.RequestMapping;
13. import org.springframework.web.bind.annotation.RestController;
14. 
15. import javax.annotation.Resource;
16. import java.io.UnsupportedEncodingException;
17. 
18. 
19. @RestController
20. @RequestMapping("/test")
21. public class TestControllor {
22. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
23. 
24. /**
25.      * 使用RocketMq的生产者
26.      */
27. @Resource(name = "customRocketMQProducer")
28. private DefaultMQProducer defaultMQProducer;
29. 
30. @RequestMapping("/send")
31. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
32.         String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
33. 
34. for (int i = 0; i < 10; i++) {
35. try {
36. Message msg =
37. new Message("DemoTopic", tags[i % tags.length], "KEY" + i,
38.                                 ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
39. //sendMessageInTransaction :1).检查transactionListener是否存在
40. //                          2).调用父类执行事务消息发送
41. SendResult sendResult = defaultMQProducer.sendMessageInTransaction(msg, null);
42.                 System.out.println("发送消息为");
43.                 System.out.printf("%s%n", sendResult);
44.                 Thread.sleep(10);
45.             } catch (MQClientException | UnsupportedEncodingException e) {
46.                 e.printStackTrace();
47.             }
48.         }
49. 
50. for (int i = 0; i < 100000; i++) {
51.             Thread.sleep(1000);
52.         }
53.     }
54. }

阿里忽略了消费者报错了的情况,可以根据业务自定义

参考:https://my.oschina.net/u/3768341/blog/1616193

       https://segmentfault.com/a/1190000019755235?utm_source=tag-newest

       https://blog.csdn.net/hosaos/article/details/90050276

       https://www.jianshu.com/p/cc5c10221aa1


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
28天前
|
消息中间件 网络协议 C#
C#使用Socket实现分布式事件总线,不依赖第三方MQ
`CodeWF.EventBus.Socket` 是一个轻量级的、基于Socket的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。
C#使用Socket实现分布式事件总线,不依赖第三方MQ
|
1月前
|
NoSQL Java Redis
开发实战:使用Redisson实现分布式延时消息,订单30分钟关闭的另外一种实现!
本文详细介绍了 Redisson 延迟队列(DelayedQueue)的实现原理,包括基本使用、内部数据结构、基本流程、发送和获取延时消息以及初始化延时队列等内容。文章通过代码示例和流程图,逐步解析了延迟消息的发送、接收及处理机制,帮助读者深入了解 Redisson 延迟队列的工作原理。
|
5月前
|
消息中间件 NoSQL Java
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
228 0
|
6月前
|
消息中间件 Java RocketMQ
RocketMQ实战教程之RocketMQ安装
这是一篇关于RocketMQ安装的实战教程,主要介绍了在CentOS系统上使用传统安装和Docker两种方式安装RocketMQ。首先,系统需要是64位,并且已经安装了JDK 1.8。传统安装包括下载安装包,解压并启动NameServer和Broker。Docker安装则涉及安装docker和docker-compose,然后通过docker-compose.yaml文件配置并启动服务。教程还提供了启动命令和解决问题的提示。
|
1月前
|
消息中间件 数据采集 中间件
RabbitMQ的使用—实战
RabbitMQ的使用—实战
|
2月前
|
消息中间件 缓存 Java
RocketMQ的JAVA落地实战
RocketMQ作为一款高性能、高可靠、高实时、分布式特点的消息中间件,其核心作用主要体现在异步处理、削峰填谷以及系统解耦三个方面。
175 0
|
3月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
93 2
|
4月前
|
消息中间件 新零售 弹性计算
云消息队列 RabbitMQ 版入门训练营,解锁对比开源优势与零基础实战
欢迎加入「云消息队列 RabbitMQ 版入门训练营」。
170 17
|
3月前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
73 8