五分钟带你玩转rocketMQ(十)实战分布式事务

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 什么是事务性消息?它可以看作是两阶段提交消息实现,以确保分布式系统中的最终一致性。事务消息确保本地事务的执行和消息的发送可以原子化地执行


什么是事务性消息?

它可以看作是两阶段提交消息实现,以确保分布式系统中的最终一致性。事务消息确保本地事务的执行和消息的发送可以原子化地执行。

使用限制

(1)事务性消息没有调度和批处理支持。

(2)为了避免单个消息被检查过多而导致半队列消息累积,我们将单个消息的检查次数默认限制为15次,但是如果一条消息被检查过,用户可以通过更改代理配置中的“transactionCheckMax”参数来更改此限制“transactionCheckMax”次,默认情况下,broker将丢弃此消息并同时打印错误日志。用户可以通过重写“AbstractTransactionCheckListener”类来更改此行为。

(3)事务消息将在代理配置中的参数“transactionTimeout”确定的一段时间后进行检查。用户也可以在发送事务性消息时通过设置用户属性“CHECK_IMMUNITY_TIME_IN_SECONDS”来更改此限制,此参数优先于“transactionMsgTimeout”参数。

(4)事务性消息可能被多次检查或使用。

(5)提交给用户目标主题的不可信消息可能失败。目前,这取决于日志记录。RocketMQ本身的高可用机制保证了高可用性。如果要确保事务消息不会丢失,并且保证事务完整性,建议使用同步双写。机制。

(6)事务性消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务性消息允许向后查询。MQ服务器按其生产者id查询客户机。

应用程序

1、交易状态

事务性消息有三种状态:

(1)TransactionStatus.CommitTransaction:commit transaction,表示允许消费者使用此消息。

(2)TransactionStatus.rollback transaction:回滚事务,表示消息将被删除,不允许使用。

(3)TransactionStatus.Unknown:中间状态,表示需要MQ进行回查以确定状态。

原理图

image.png

假设有A,B两服务,A服务为优惠券服务,B服务为结算服务。

1.A服务发给B服务一个半消息(B服务消费不到)

2.当B服务返回接收成功后 A服务开展优惠券逻辑

3.优惠券数据落到数据库 此时有3个结果

   1.A服务成功 发送B服务commit 修改B服务的消息状态 B服务可以执行结算

   2.A服务失败发送rollback 修改B服务状态为失败 删除消息

   3.网络原因 需要进行事务回查

4.B服务执行逻辑(如果B服务失败 需要手动判断 rocketMq的官方文档提出不提出解决方案)

代码处理

(1)创建事务生产者

使用TransactionMQProducer类创建producer客户机,并指定唯一的producerGroup,然后可以设置自定义线程池来处理检查请求。在执行本地事务后,需要根据执行结果回复MQ,回复状态在上一节描述。

1. @SpringBootConfiguration
2. public class MQProducerConfiguration {
3. 
4. public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);
5. /**
6.      * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
7.      */
8. @Value("${rocketmq.producer.groupName}")
9. private String groupName;
10. @Value("${rocketmq.producer.namesrvAddr}")
11. private String namesrvAddr;
12. /**
13.      * 消息最大大小,默认4M
14.      */
15. @Value("${rocketmq.producer.maxMessageSize}")
16. private Integer maxMessageSize;
17. /**
18.      * 消息发送超时时间,默认3秒
19.      */
20. @Value("${rocketmq.producer.sendMsgTimeout}")
21. private Integer sendMsgTimeout;
22. /**
23.      * 消息发送失败重试次数,默认2次
24.      */
25. @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
26. private Integer retryTimesWhenSendFailed;
27. 
28. @Bean(name = "customRocketMQProducer")
29. public DefaultMQProducer getRocketMQProducer() throws Exception {
30. if (StringUtils.isEmpty(this.groupName)) {
31. throw new Exception("groupName is blank");
32.         }
33. if (StringUtils.isEmpty(this.namesrvAddr)) {
34. throw new Exception("nameServerAddr is blank");
35.         }
36. 
37. //事务消息需要
38.         TransactionListener transactionListener = new TransactionListenerImpl();
39.         TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
40.         ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
41. @Override
42. public Thread newThread(Runnable r) {
43.                 Thread thread = new Thread(r);
44.                 thread.setName("client-transaction-msg-check-thread");
45. return thread;
46.             }
47.         });
48. //事务消息需要
49.         producer.setExecutorService(executorService);
50.         producer.setTransactionListener(transactionListener);
51. 
52.         producer.setNamesrvAddr(this.namesrvAddr);
53. //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
54. //producer.setInstanceName(instanceName);
55. if (this.maxMessageSize != null) {
56.             producer.setMaxMessageSize(this.maxMessageSize);
57.         }
58. if (this.sendMsgTimeout != null) {
59.             producer.setSendMsgTimeout(this.sendMsgTimeout);
60.         }
61. //如果发送消息失败,设置重试次数,默认为2次
62. if (this.retryTimesWhenSendFailed != null) {
63.             producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
64.         }
65. try {
66.             producer.start();
67.             LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]"
68.                     , this.groupName, this.namesrvAddr));
69.         } catch (MQClientException e) {
70.             LOGGER.error(String.format("producer is error {}"
71.                     , e.getMessage(), e));
72. throw new Exception(e);
73.         }
74. return producer;
75.     }
76. }

(2)实现TransactionListener接口

“executeLocalTransaction”方法用于在发送半消息成功时执行本地事务。它返回上一节中提到的三种事务状态之一。

“checkLocalTransaction”方法用于检查本地事务状态并响应MQ检查请求。它还返回上一节中提到的三种事务状态之一。

1. package cn.baocl.rocketmq.controllor;
2. 
3. import org.apache.rocketmq.client.producer.LocalTransactionState;
4. import org.apache.rocketmq.client.producer.TransactionListener;
5. import org.apache.rocketmq.common.message.Message;
6. import org.apache.rocketmq.common.message.MessageExt;
7. 
8. import java.util.concurrent.ConcurrentHashMap;
9. import java.util.concurrent.atomic.AtomicInteger;
10. 
11. public class TransactionListenerImpl implements TransactionListener {
12. 
13. private AtomicInteger transactionIndex = new AtomicInteger(0);
14. 
15. private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
16. 
17. //执行本地事务(一般都是写一张日志表,放入本次交易的TransactionId,判断本次交易是否成功)
18. @Override
19. public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
20. //执行业务
21. //do thing
22. if(true){
23. //业务执行成功
24. //插入日志表
25. //返回成功
26. return LocalTransactionState.COMMIT_MESSAGE;
27.         }else{
28. return LocalTransactionState.ROLLBACK_MESSAGE;
29.         }
30.     }
31. 
32. //回查事务 当发送给broken 本地事务的状态状态丢失了 broken会再次根据id验证本地事务(查询日志表是否有日志)
33. @Override
34. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
35. String transactionId = msg.getTransactionId();
36. //根据transactionId查询日志表条数 如果条数大于0 就表示已经执行过以上逻辑了
37. Integer countStatus = Integer.valueOf(transactionId);
38. if (countStatus > 0) {
39. return LocalTransactionState.COMMIT_MESSAGE;
40.         } else {
41. return LocalTransactionState.ROLLBACK_MESSAGE;
42.         }
43.     }
44. }

(3)调用

1. package cn.baocl.rocketmq.controllor;
2. 
3. import org.apache.rocketmq.client.exception.MQBrokerException;
4. import org.apache.rocketmq.client.exception.MQClientException;
5. import org.apache.rocketmq.client.producer.DefaultMQProducer;
6. import org.apache.rocketmq.client.producer.SendResult;
7. import org.apache.rocketmq.common.message.Message;
8. import org.apache.rocketmq.remoting.common.RemotingHelper;
9. import org.apache.rocketmq.remoting.exception.RemotingException;
10. import org.slf4j.Logger;
11. import org.slf4j.LoggerFactory;
12. import org.springframework.web.bind.annotation.RequestMapping;
13. import org.springframework.web.bind.annotation.RestController;
14. 
15. import javax.annotation.Resource;
16. import java.io.UnsupportedEncodingException;
17. 
18. 
19. @RestController
20. @RequestMapping("/test")
21. public class TestControllor {
22. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
23. 
24. /**
25.      * 使用RocketMq的生产者
26.      */
27. @Resource(name = "customRocketMQProducer")
28. private DefaultMQProducer defaultMQProducer;
29. 
30. @RequestMapping("/send")
31. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
32.         String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
33. 
34. for (int i = 0; i < 10; i++) {
35. try {
36. Message msg =
37. new Message("DemoTopic", tags[i % tags.length], "KEY" + i,
38.                                 ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
39. //sendMessageInTransaction :1).检查transactionListener是否存在
40. //                          2).调用父类执行事务消息发送
41. SendResult sendResult = defaultMQProducer.sendMessageInTransaction(msg, null);
42.                 System.out.println("发送消息为");
43.                 System.out.printf("%s%n", sendResult);
44.                 Thread.sleep(10);
45.             } catch (MQClientException | UnsupportedEncodingException e) {
46.                 e.printStackTrace();
47.             }
48.         }
49. 
50. for (int i = 0; i < 100000; i++) {
51.             Thread.sleep(1000);
52.         }
53.     }
54. }

阿里忽略了消费者报错了的情况,可以根据业务自定义

参考:https://my.oschina.net/u/3768341/blog/1616193

       https://segmentfault.com/a/1190000019755235?utm_source=tag-newest

       https://blog.csdn.net/hosaos/article/details/90050276

       https://www.jianshu.com/p/cc5c10221aa1


相关实践学习
5分钟轻松打造应对流量洪峰的稳定商城交易系统
本实验通过SAE极速部署一个微服务电商商城,同时结合RocketMQ异步解耦、削峰填谷的能力,带大家体验面对流量洪峰仍旧稳定可靠的商城交易系统!
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
打赏
0
0
0
0
27
分享
相关文章
Kubernetes下的分布式采集系统设计与实战:趋势监测失效引发的架构进化
本文回顾了一次关键词监测任务在容器集群中失效的全过程,分析了中转IP复用、调度节奏和异常处理等隐性风险,并提出通过解耦架构、动态IP分发和行为模拟优化采集策略,最终实现稳定高效的数据抓取与分析。
Kubernetes下的分布式采集系统设计与实战:趋势监测失效引发的架构进化
RocketMQ实战—3.基于RocketMQ升级订单系统架构
本文主要介绍了基于MQ实现订单系统核心流程的异步化改造、基于MQ实现订单系统和第三方系统的解耦、基于MQ实现将订单数据同步给大数据团队、秒杀系统的技术难点以及秒杀商详页的架构设计和基于MQ实现秒杀系统的异步化架构。
340 64
RocketMQ实战—3.基于RocketMQ升级订单系统架构
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
250 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
105 11
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
RocketMQ实战—6.生产优化及运维方案
本文围绕RocketMQ集群的使用与优化,详细探讨了六个关键问题。首先,介绍了如何通过ACL配置实现RocketMQ集群的权限控制,防止不同团队间误用Topic。其次,讲解了消息轨迹功能的开启与追踪流程,帮助定位和排查问题。接着,分析了百万消息积压的处理方法,包括直接丢弃、扩容消费者或通过新Topic间接扩容等策略。此外,提出了针对RocketMQ集群崩溃的金融级高可用方案,确保消息不丢失。同时,讨论了为RocketMQ增加限流功能的重要性及实现方式,以提升系统稳定性。最后,分享了从Kafka迁移到RocketMQ的双写双读方案,确保数据一致性与平稳过渡。
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
418 0
分布式爬虫框架Scrapy-Redis实战指南
分布式新闻数据采集系统的同步效率优化实战
本文介绍了一个针对高频新闻站点的分布式爬虫系统优化方案。通过引入异步任务机制、本地缓存池、Redis pipeline 批量写入及身份池策略,系统采集效率提升近两倍,数据同步延迟显著降低,实现了分钟级热点追踪能力,为实时舆情监控与分析提供了高效、稳定的数据支持。
分布式新闻数据采集系统的同步效率优化实战

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等