若MQ不支持半消息,是否有其他的解决方案
利用数据库的事务消息表。
把消息信息的快照和对业务数据的操作作为数据库事务操作数据库,操作成功后从数据库读取消息信息发送给broker,收到发送成功的回执后删除数据库中的消息快照。我个人觉得这种方案在不支持半消息的队列方案里也是一种选择,不知道您觉得这种实现方案有没有什么问题。
如果有个生产者和消费者都可访问,并且性能还不错的数据库,肯定使用这个数据库实现事务较好。
然而大部分事务消息使用的场景是
- 没有这样的数据库
- 或由于设计、安全或者网络原因,生产者消费者不能共享数据库
- 或数据库的性能达不到要求
如果先创建订单,当前服务由于不可抗拒因素不能正常工作,没给购物车系统发送消息,这种情况加就会出现:订单已创建且购物车没有清空。
而发送半消息,可通过定期查询事务状态然后根据然后具体的业务回滚操作或者重新发送消息(保持业务的幂等性)。
消费端做幂等处理来保障消息不会重复消费
- 可以采用状态机的方式
- 消息数据唯一键+redis setnx来保障
- 本地消息表,要确保插入本地消息表和执行消息消费业务在同一事务里
RocketMQ分布式事务
RocketMQ事务实现增加了事务反查机制来解决事务消息提交失败的问题。
如果Producer(即订单模块),在提交或回滚事务消息时发生网络异常,Broker没有收到提交或回滚请求,Broker会定期去Producer反查该事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚该事务。
要支持事务反查机制,业务代码需实现一个反查本地事务状态的接口,告知RocketMQ本地事务是成功还是失败。
如果反查的服务器数据不一致,它是认为本地事务失败还是继续多次反查呢?
反查接口的定义,它检查的是本地事务(在我们这个例子里面就是数据库事务)有没有执行成功,并不比较数据是否一致。
该例中反查本地事务逻辑简单,只要根据消息中订单ID,在订单库中查询该订单是否存在,若订单存在则返回成功,否则返回失败。
RocketMQ会自动根据事务反查的结果提交或者回滚事务消息。
反查本地事务的实现并不依赖消息的发送方,即订单服务的某节点的任何数据。
这种情况下,即使发送事务消息的订单服务节点宕机,RocketMQ依然可通过其他订单服务节点执行反查,确保事务完整性。
RocketMQ事务消息流程图
如果本地事务提交失败,已发出去的消息是无法撤回的,会导致数据不一致。
若插入消息表成功后,消费方崩溃导致消费失败
因为消费失败,会自动重试,所以不会丢消息,但可能重复消费。
回查时间和次数怎么设置
如果发布者本地事务执行太久还没执行完,消息中心就来回查是不是有问题,所以应可以把发消息放本地事务的后面吧,另外次数定义也是经验值吧
反查一般是定一个事务超时时间,超时之前会不定期回查。
事务回查感觉需要特定业务自己实现,消息体里需要带回查的参数来判断本地事务结果
- 有无通用做法,比如发消息前产生唯一id写入数据库,后面回查用这个id来查事务结果就好了,而无需具体逻辑自己做
目前还没有通用的做法,需要业务方自己实现。
RocketMQ事务消息代码实现本案例
代码实现订单下单:
首先通过producer.sendMessageInTransaction()方法发半消息给MQ
此时会在TransactionListener中的executeLocalTransaction()方法阻塞,然后在这个方法里面进行订单创建并提交本地事务
如果commit成功,则返回COMMIT状态
否则是ROLLBACK状态,如果正常返回COMMIT或者ROLLBACK的话,不会存在第3步的反查情况。
如果上面的本地事务提交成功以后,此节点突然断电,那么checkLocalTransaction()反查方法就会在某个时候被MQ调用,此方法会根据消息中的订单号去数据库确认订单是否存在,存在就返回COMMIT状态,否则是ROLLBACK状态。
购物车在另一模块,只要收到MQ消息就将本次订单的商品从购物车中删除即可。
RocketMQ事务消息完整实现ACID了吗
A:本地事务的操作1,与往MQ中生产消息的操作2,是两个分离操作,不符合原子性
C:由于操作MQ属异步,在数据一致性上,只能保证最终一致性。
对时效性要求很高系统,事务消息并非数据一致
但对时效性要求不高系统,就是数据一致的
需要结合业务需要看问题
I:由于事务消息分两步操作,本地事务提交后,别的事务消息就已经可看到提交的消息。所以,不符合隔离性
D:rocketMq上支持事物的反查机制,但“半消息”是存储在磁盘中,还是内存呢?
若存储在磁盘中,那就支持持久性,即使事物消息提交后,发生服务突然宕机也不受影响
若存储在内存,则无法保证持久性
rocketmq实现分布式事务,使用两阶段提交,和mysql写redo log和binlog日志的两阶段提交类似。以订单为例
提交订单消息到mq中,等待mq回复ack,消息提交成功,但是此时的消息对消费组不可见,即half消息
此阶段像mysql的引擎层写redo log的prepare阶段。
执行本地事务,执行本地事务成功
此阶段像mysql的service层写binlog的阶段,写binlog成功,最后提交或者回滚队列事务。
rocketmq为防止commit和rollback超时或者失败,采取回查的补偿机制,回查次数默认15次(感觉这个会不会导致服务超时了),超过会rollback,有点像mysql宕机重启根据redo log中的xid找binlog的xid事务,如果binlog日志也已经写成功,mysql这个事务也会提交,因为redo log和binlog这个事务都写完整。
消息对消费者不可见,将其消息的主题topic和队列id修改为half topic,原先的主题和队列id也做为消息的属性,如果事务提交或者回滚会将其消息的队列改为原先的队列。rocketMq开启任务,从half topic中获取消息,调用其中的生产者的监听进行回查是否提交回滚。
rocketmq采用commitlog存放消息,消费者使用consumeQueue二级索引从commitlog获取消息实体内容。
理解Index File:indexFile的作用就是给commitlog做的索引,提升读取消息时的查询效率。
回查借助OP topic进行获取到Half消息进行后续的回查操作。
总结
RocketMQ事务反查机制通过定期反查事务状态,来补偿提交事务消息可能出现的通信失败。
在Kafka的事务功能中,并没有类似的反查机制,需要用户自行去解决这个问题。
但不代表RocketMQ的事务功能比Kafka更好,只能说在该例场景,RocketMQ更适合。
Kafka对事务的定义、实现和适用场景,和RocketMQ有较大差异。
参考
https://rocketmq.apache.org/docs/transaction-example/