如何使用消息队列的事务消息(下)

简介: 如何使用消息队列的事务消息

若MQ不支持半消息,是否有其他的解决方案




利用数据库的事务消息表。

把消息信息的快照和对业务数据的操作作为数据库事务操作数据库,操作成功后从数据库读取消息信息发送给broker,收到发送成功的回执后删除数据库中的消息快照。我个人觉得这种方案在不支持半消息的队列方案里也是一种选择,不知道您觉得这种实现方案有没有什么问题。


如果有个生产者和消费者都可访问,并且性能还不错的数据库,肯定使用这个数据库实现事务较好。

然而大部分事务消息使用的场景是


  • 没有这样的数据库
  • 或由于设计、安全或者网络原因,生产者消费者不能共享数据库
  • 或数据库的性能达不到要求


如果先创建订单,当前服务由于不可抗拒因素不能正常工作,没给购物车系统发送消息,这种情况加就会出现:订单已创建且购物车没有清空。

而发送半消息,可通过定期查询事务状态然后根据然后具体的业务回滚操作或者重新发送消息(保持业务的幂等性)。



消费端做幂等处理来保障消息不会重复消费

  1. 可以采用状态机的方式
  2. 消息数据唯一键+redis setnx来保障
  3. 本地消息表,要确保插入本地消息表和执行消息消费业务在同一事务里



RocketMQ分布式事务


RocketMQ事务实现增加了事务反查机制来解决事务消息提交失败的问题。

如果Producer(即订单模块),在提交或回滚事务消息时发生网络异常,Broker没有收到提交或回滚请求,Broker会定期去Producer反查该事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚该事务。



要支持事务反查机制,业务代码需实现一个反查本地事务状态的接口,告知RocketMQ本地事务是成功还是失败。


如果反查的服务器数据不一致,它是认为本地事务失败还是继续多次反查呢?

反查接口的定义,它检查的是本地事务(在我们这个例子里面就是数据库事务)有没有执行成功,并不比较数据是否一致。


该例中反查本地事务逻辑简单,只要根据消息中订单ID,在订单库中查询该订单是否存在,若订单存在则返回成功,否则返回失败。

RocketMQ会自动根据事务反查的结果提交或者回滚事务消息。


反查本地事务的实现并不依赖消息的发送方,即订单服务的某节点的任何数据。

这种情况下,即使发送事务消息的订单服务节点宕机,RocketMQ依然可通过其他订单服务节点执行反查,确保事务完整性。



RocketMQ事务消息流程图

image.png


如果本地事务提交失败,已发出去的消息是无法撤回的,会导致数据不一致。



若插入消息表成功后,消费方崩溃导致消费失败

因为消费失败,会自动重试,所以不会丢消息,但可能重复消费。


回查时间和次数怎么设置

如果发布者本地事务执行太久还没执行完,消息中心就来回查是不是有问题,所以应可以把发消息放本地事务的后面吧,另外次数定义也是经验值吧


反查一般是定一个事务超时时间,超时之前会不定期回查。


事务回查感觉需要特定业务自己实现,消息体里需要带回查的参数来判断本地事务结果

  • 有无通用做法,比如发消息前产生唯一id写入数据库,后面回查用这个id来查事务结果就好了,而无需具体逻辑自己做
    目前还没有通用的做法,需要业务方自己实现。

RocketMQ事务消息代码实现本案例

代码实现订单下单:


首先通过producer.sendMessageInTransaction()方法发半消息给MQ



image.png


此时会在TransactionListener中的executeLocalTransaction()方法阻塞,然后在这个方法里面进行订单创建并提交本地事务

如果commit成功,则返回COMMIT状态

否则是ROLLBACK状态,如果正常返回COMMIT或者ROLLBACK的话,不会存在第3步的反查情况。


image.png


如果上面的本地事务提交成功以后,此节点突然断电,那么checkLocalTransaction()反查方法就会在某个时候被MQ调用,此方法会根据消息中的订单号去数据库确认订单是否存在,存在就返回COMMIT状态,否则是ROLLBACK状态。


image.png


购物车在另一模块,只要收到MQ消息就将本次订单的商品从购物车中删除即可。


RocketMQ事务消息完整实现ACID了吗


A:本地事务的操作1,与往MQ中生产消息的操作2,是两个分离操作,不符合原子性


C:由于操作MQ属异步,在数据一致性上,只能保证最终一致性。

对时效性要求很高系统,事务消息并非数据一致

但对时效性要求不高系统,就是数据一致的

需要结合业务需要看问题


I:由于事务消息分两步操作,本地事务提交后,别的事务消息就已经可看到提交的消息。所以,不符合隔离性


D:rocketMq上支持事物的反查机制,但“半消息”是存储在磁盘中,还是内存呢?

若存储在磁盘中,那就支持持久性,即使事物消息提交后,发生服务突然宕机也不受影响

若存储在内存,则无法保证持久性


rocketmq实现分布式事务,使用两阶段提交,和mysql写redo log和binlog日志的两阶段提交类似。以订单为例


提交订单消息到mq中,等待mq回复ack,消息提交成功,但是此时的消息对消费组不可见,即half消息

此阶段像mysql的引擎层写redo log的prepare阶段。


执行本地事务,执行本地事务成功

此阶段像mysql的service层写binlog的阶段,写binlog成功,最后提交或者回滚队列事务。

rocketmq为防止commit和rollback超时或者失败,采取回查的补偿机制,回查次数默认15次(感觉这个会不会导致服务超时了),超过会rollback,有点像mysql宕机重启根据redo log中的xid找binlog的xid事务,如果binlog日志也已经写成功,mysql这个事务也会提交,因为redo log和binlog这个事务都写完整。


消息对消费者不可见,将其消息的主题topic和队列id修改为half topic,原先的主题和队列id也做为消息的属性,如果事务提交或者回滚会将其消息的队列改为原先的队列。rocketMq开启任务,从half topic中获取消息,调用其中的生产者的监听进行回查是否提交回滚。


rocketmq采用commitlog存放消息,消费者使用consumeQueue二级索引从commitlog获取消息实体内容。

理解Index File:indexFile的作用就是给commitlog做的索引,提升读取消息时的查询效率。

回查借助OP topic进行获取到Half消息进行后续的回查操作。


总结


RocketMQ事务反查机制通过定期反查事务状态,来补偿提交事务消息可能出现的通信失败。

在Kafka的事务功能中,并没有类似的反查机制,需要用户自行去解决这个问题。


但不代表RocketMQ的事务功能比Kafka更好,只能说在该例场景,RocketMQ更适合。

Kafka对事务的定义、实现和适用场景,和RocketMQ有较大差异。


参考

https://rocketmq.apache.org/docs/transaction-example/


相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
7月前
|
消息中间件 网络性能优化 RocketMQ
消息队列 MQ产品使用合集之本地事务还没有执行完就触发了回查是什么导致的
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
8月前
|
消息中间件
[AIGC] 了解消息队列事务:保证数据一致性的关键
[AIGC] 了解消息队列事务:保证数据一致性的关键
107 1
|
消息中间件
|
消息中间件 存储 前端开发
同步异步调用,并谈谈消息队列mq;RocketMQ发送消息和消费消息测试类
同步调用优点: 时效性强,打电话、直播,很快可以得到结果 同步调用的问题:
650 1
|
消息中间件 关系型数据库 MySQL
5. 消息队列中,如何保证消息的顺序性?
5. 消息队列中,如何保证消息的顺序性?
624 0
5. 消息队列中,如何保证消息的顺序性?
|
消息中间件 存储 SQL
阿里云消息队列 Kafka-消息检索实践
本文章主要介绍消息队列使用过程中所遇到的消息丢失、重复消费等痛点问题的排查办法,以及消息队列 Kafka「检索组件」的场景实践,并对其关键技术进行解读。旨在帮助大家对消息队列 Kafka「检索组件」的特点和使用方式更加熟悉,以更有效地解决消息排查过程中所遇到的问题。
阿里云消息队列 Kafka-消息检索实践
|
消息中间件 物联网 Linux
消息队列中消息的格式|学习笔记
快速学习消息队列中消息的格式
|
消息中间件 NoSQL Kafka
消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(下)
消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(下)
消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(下)
|
消息中间件 Kafka 数据库
消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(上)
消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(上)
消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(上)
|
消息中间件 存储 SQL
消息队列面试连环问:如何保证消息不丢失?处理重复消息?消息有序性?消息堆积处理?(下)
消息队列面试连环问:如何保证消息不丢失?处理重复消息?消息有序性?消息堆积处理?(下)
消息队列面试连环问:如何保证消息不丢失?处理重复消息?消息有序性?消息堆积处理?(下)