RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理

简介: RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
}
}

代码@1:如果请求为提交事务,进入事务消息提交处理流程。


代码@2:提交消息,别被这名字误导了,该方法主要是根据commitLogOffset从commitlog文件中查找消息返回OperationResult实例。


74789c44d6e3d35fe04b9d752c2a83a9_70.png


● private MessageExt prepareMessage :消息对象。


● private int responseCode:查找结果。


● private String responseRemark :错误提示。


代码@3:如果成功查找到消息,则继续处理,否则返回给客户端,消息未找到错误信息。


代码@4:验证消息必要字段。


● 验证消息的生产组与请求信息中的生产者组是否一致。


● 验证消息的队列偏移量(queueOffset)与请求信息中的偏移量是否一致。


● 验证消息的commitLogOffset与请求信息中的CommitLogOffset是否一致。


代码@5:调用endMessageTransaction方法,该方法主要的目的就是恢复事务消息的真实的主题、队列,并设置事务ID。


代码@6:设置消息的相关属性,这一步应该直接在endMessageTransaction中实现就好,统一恢复原消息的数量,特别关注的是取消了事务相关的系统标记。


代码@7:发送最终消息,其实现原理非常简单,调用MessageStore将消息存储在commitlog文件中,此时的消息,会被转发到原消息主题对应的消费队列,被消费者消费。


代码@8:删除预处理消息(prepare),其实是将消息存储在主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC的主题中,代表这些消息已经被处理(提交或回滚)。


上述就是事务消息提交的流程,事务回滚类似,接下来大概分析一下事务消息回滚的流程。


EndTransactionProcessor#processRequest
else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); // @1
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); // @2
}
return res;
}
}
相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
打赏
0
0
0
0
80
分享
相关文章
手把手教你在 Windows 环境中搭建 MQTT 服务器
手把手教你在 Windows 环境中搭建 MQTT 服务器
515 0
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
深度剖析 RocketMQ 事务消息!
本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。
279 2
Tomcat源码分析 (一)----- 手撕Java Web服务器需要准备哪些工作
本文探讨了后端开发中Web服务器的重要性,特别是Tomcat框架的地位与作用。通过解析Tomcat的内部机制,文章引导读者理解其复杂性,并提出了一种实践方式——手工构建简易Web服务器,以此加深对Web服务器运作原理的认识。文章还详细介绍了HTTP协议的工作流程,包括请求与响应的具体格式,并通过Socket编程在Java中的应用实例,展示了客户端与服务器间的数据交换过程。最后,通过一个简单的Java Web服务器实现案例,说明了如何处理HTTP请求及响应,强调虽然构建基本的Web服务器相对直接,但诸如Tomcat这样的成熟框架提供了更为丰富和必要的功能。
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
云服务器 ECS产品使用问题之变成回滚服务实例,该如何解决
云服务器ECS(Elastic Compute Service)是各大云服务商阿里云提供的一种基础云计算服务,它允许用户租用云端计算资源来部署和运行各种应用程序。以下是一个关于如何使用ECS产品的综合指南。
104 13
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
564 1
【RocketMQ系列六】RocketMQ事务消息
【RocketMQ系列六】RocketMQ事务消息
1227 1
消息队列 MQ产品使用合集之客户端和服务器之间的保活心跳检测间隔是怎么设置的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
阿里云特惠云服务器99元与199元配置与性能和适用场景解析:高性价比之选
2025年,阿里云长效特惠活动继续推出两款极具吸引力的特惠云服务器套餐:99元1年的经济型e实例2核2G云服务器和199元1年的通用算力型u1实例2核4G云服务器。这两款云服务器不仅价格亲民,而且性能稳定可靠,为入门级用户和普通企业级用户提供了理想的选择。本文将对这两款云服务器进行深度剖析,包括配置介绍、实例规格、使用场景、性能表现以及购买策略等方面,帮助用户更好地了解这两款云服务器,以供参考和选择。