消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(下)

简介: 消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(下)

就是来了波狸猫换太子,其实延时消息也是这么实现的,最终将换了皮的消息入盘。

Broker 处理提交或者回滚消息的处理方法是 EndTransactionProcessor#processRequest,我们来看一看它做了什么操作。


image.png

可以看到,如果是提交事务就是把皮再换回来写入真正的topic所属的队列中,供消费者消费,如果是回滚则是将半消息记录到一个 half_op 主题下,到时候后台服务扫描半消息的时候就依据其来判断这个消息已经处理过了。

那个后台服务就是 TransactionalMessageCheckService 服务,它会定时的扫描半消息队列,去请求反查接口看看事务成功了没,具体执行的就是TransactionalMessageServiceImpl#check 方法。

我大致说一下流程,这一步骤其实涉及到的代码很多,我就不贴代码了,有兴趣的同学自行了解。不过我相信用语言也是能说清楚的。

首先取半消息 topic 即RMQ_SYS_TRANS_HALF_TOPIC下的所有队列,如果还记得上面内容的话,就知道半消息写入的队列是 id 是 0 的这个队列,然后取出这个队列对应的 half_op 主题下的队列,即 RMQ_SYS_TRANS_OP_HALF_TOPIC 主题下的队列。

这个 half_op 主要是为了记录这个事务消息已经被处理过,也就是说已经得知此事务消息是提交的还是回滚的消息会被记录在 half_op 中。

然后调用 fillOpRemoveMap 方法,从 half_op 取一批已经处理过的消息来去重,将那些没有记录在 half_op 里面的半消息调用 putBackHalfMsgQueue 又写入了 commitlog 中,然后发送事务反查请求,这个反查请求也是 oneWay,即不会等待响应。当然此时的半消息队列的消费 offset 也会推进。


image.png

然后producer中的 ClientRemotingProcessor#processRequest 会处理这个请求,会把任务扔到 TransactionMQProducer 的线程池中进行,最终会调用上面我们发消息时候定义的 checkLocalTransactionState 方法,然后将事务状态发送给 Broker,也是用 oneWay 的方式。

看到这里相信大家会有一些疑问,比如为什么要有个 half_op ,为什么半消息处理了还要再写入 commitlog 中别急听我一一道来。

首先 RocketMQ 的设计就是顺序追加写入,所以说不会更改已经入盘的消息,那事务消息又需要更新反查的次数,超过一定反查失败就判定事务回滚。

因此每一次要反查的时候就将以前的半消息再入盘一次,并且往前推进消费进度。而 half_op 又会记录每一次反查的结果,不论是提交还是回滚都会记录,因此下一次还循环到处理此半消息的时候,可以从 half_op 得知此事务已经结束了,因此就被过滤掉不需要处理了。

如果得到的反查的结果是 UNKNOW,那 half_op 中也不会记录此结果,因此还能再次反查,并且更新反查次数。

到现在整个流程已经清晰了,我再画个图总结一下 Broker 的事务处理流程。


image.png


Kafka 事务消息


Kafka 的事务消息和 RocketMQ 的事务消息又不一样了,RocketMQ 解决的是本地事务的执行和发消息这两个动作满足事务的约束。

而 Kafka 事务消息则是用在一次事务中需要发送多个消息的情况,保证多个消息之间的事务约束,即多条消息要么都发送成功,要么都发送失败,就像下面代码所演示的。

image.png

Kafka 的事务基本上是配合其幂等机制来实现 Exactly Once 语义的,所以说 Kafka 的事务消息不是我们想的那种事务消息,RocketMQ 的才是。

讲到这我就想扯一下了,说到这个 Exactly Once 其实不太清楚的同学很容易会误解。

我们知道消息可靠性有三种,分别是最多一次、恰好一次、最少一次,之前在消息队列连环问的文章我已经提到了基本上我们都是用最少一次然后配合消费者端的幂等来实现恰好一次。

消息恰好被消费一次当然我们所有人追求的,但是之前文章我已经从各方面已经分析过了,基本上难以达到。

而 Kafka 竟说它能实现 Exactly Once?这么牛啤吗?这其实是 Kafka 的一个噱头,你要说他错,他还真没错,你要说他对但是他实现的 Exactly Once 不是你心中想的那个 Exactly Once。

它的恰好一次只能存在一种场景,就是从 Kafka 作为消息源,然后做了一番操作之后,再写入 Kafka 中


image.png

那他是如何实现恰好一次的?就是通过幂等,和我们在业务上实现的一样通过一个唯一 Id, 然后记录下来,如果已经记录过了就不写入,这样来保证恰好一次。

所以说 Kafka 实现的是在特定场景下的恰好一次,不是我们所想的利用 Kafka 来发送消息,那么这条消息只会恰巧被消费一次

这其实和 Redis 说他实现事务了一样,也不是我们心想的事务。

所以开源软件说啥啥特性开发出来了,我们一味的相信,因此其往往都是残血的或者在特殊的场景下才能满足,不要被误导了,不能相信表面上的描述,还得详细的看看文档或者源码。

不过从另一个角度看也无可厚非,作为一个开源软件肯定是想更多的人用,我也没说谎呀,我文档上写的很清楚的,这标题也没骗人吧?

确实,比如你点进震惊xxxx标题的文章,人家也没骗你啥,他自己确实震惊的呢。


image.png

再回来谈 Kafka 的事务消息,所以说这个事务消息不是我们想要的那个事务消息,其实不是今天的主题了,不过我还是简单的说一下。

Kafka 的事务有事务协调者角色,事务协调者其实就是 Broker 的一部分。

在开始事务的时候,生产者会向事务协调者发起请求表示事务开启,事务协调者会将这个消息记录到特殊的日志-事务日志中,然后生产者再发送真正想要发送的消息,这里 Kafka 和 RocketMQ 处理不一样,Kafka 会像对待正常消息一样处理这些事务消息,由消费端来过滤这个消息

然后发送完毕之后生产者会向事务协调者发送提交或者回滚请求,由事务协调者来进行两阶段提交,如果是提交那么会先执行预提交,即把事务的状态置为预提交然后写入事务日志,然后再向所有事务有关的分区写入一条类似事务结束的消息,这样消费端消费到这个消息的时候就知道事务好了,可以把消息放出来了。

最后协调者会向事务日志中再记一条事务结束信息,至此 Kafka 事务就完成了,我拿 confluent.io 上的图来总结一下这个流程。


image.png


最后


至此我们已经知道了 RocketMQ 和 Kakfa 的事务消息全流程,可以看到 RocketMQ 的事务消息才是我们想要的,当然你要是用的流式计算那么 Kakfa 的事务消息也是你想要的。

需要贴代码的文章其实很难受,这贴的多不好,贴的少又怕不清晰,真的难,如果觉得文章不错记得点个在看哟。


image.png


相关文章
|
9天前
|
消息中间件 Java Kafka
掌握Kafka事务,看这篇就够了
先赞后看,南哥助你Java进阶一大半Kafka事务实际上引入了原子多分区写入的概念,播客画了以下流程图,展示了事务在分区级别如何工作。我是南哥,一个Java学习与进阶的领路人,相信对你通关面试、拿下Offer进入心心念念的公司有所帮助。
掌握Kafka事务,看这篇就够了
|
22天前
|
消息中间件 Java Kafka
消息传递新纪元:探索RabbitMQ、RocketMQ和Kafka的魅力所在
【8月更文挑战第29天】这段内容介绍了在分布式系统中起到异步通信与解耦作用的消息队列,并详细探讨了三种流行的消息队列产品:RabbitMQ、RocketMQ 和 Kafka。其中,RabbitMQ 是一个基于 AMQP 协议的开源消息队列系统,支持多种消息模型;RocketMQ 则是由阿里巴巴开源的具备高性能、高可用性和高可靠性的分布式消息队列,支持事务消息等多种特性;而 Kafka 作为一个由 LinkedIn 开源的分布式流处理平台,以高吞吐量和良好的可扩展性著称。此外,还提供了使用这三种消息队列发送和接收消息的代码示例。总之,这三种消息队列各有优势,适用于不同的业务场景。
38 3
|
1月前
|
消息中间件 存储 监控
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别,设计目标、适用场景、吞吐量、消息存储和持久化、可靠性、集群负载均衡
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别
|
27天前
|
消息中间件 传感器 缓存
为什么Kafka能秒杀众多消息队列?揭秘它背后的五大性能神器,让你秒懂Kafka的极速之道!
【8月更文挑战第24天】Apache Kafka作为分布式流处理平台的领先者,凭借其出色的性能和扩展能力广受好评。本文通过案例分析,深入探讨Kafka实现高性能的关键因素:分区与并行处理显著提升吞吐量;批量发送结合压缩算法减少网络I/O次数及数据量;顺序写盘与页缓存机制提高写入效率;Zero-Copy技术降低CPU消耗;集群扩展与负载均衡确保系统稳定性和可靠性。这些机制共同作用,使Kafka能够在处理大规模数据流时表现出色。
38 3
|
27天前
|
消息中间件 存储 Kafka
ZooKeeper助力Kafka:掌握这四大作用,让你的消息队列系统稳如老狗!
【8月更文挑战第24天】Kafka是一款高性能的分布式消息队列系统,其稳定运行很大程度上依赖于ZooKeeper提供的分布式协调服务。ZooKeeper在Kafka中承担了四大关键职责:集群管理(Broker的注册与选举)、主题与分区管理、领导者选举机制以及消费者组管理。通过具体的代码示例展示了这些功能的具体实现方式。
40 2
|
1月前
|
消息中间件 存储 Java
【揭秘】RocketMQ内部运作大揭秘:一探究竟,原来消息队列是这样工作的!
【8月更文挑战第19天】RocketMQ是一款高性能、高可用的消息中间件,在分布式系统中至关重要。它采用发布/订阅模式,支持高吞吐量的消息传递。核心组件包括管理元数据的NameServer、存储消息的Broker以及Producer和Consumer。RocketMQ支持发布/订阅与点对点两种模型,并具备复杂的消息持久化和路由机制。通过Java API示例,可轻松实现消息的发送与接收。RocketMQ凭借其出色的特性和可靠性,成为大型分布式系统首选的消息解决方案。
53 5
|
1月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
24天前
|
消息中间件 Kafka Apache
kafka vs rocketmq: 不要只顾着吞吐量而忘了延迟这个指标
这篇文章讨论了Apache RocketMQ和Kafka的对比,强调RocketMQ在低延迟、消息重试与追踪、海量Topic、多租户等方面进行了优化,特别是在小包非批量和大量分区场景下的吞吐量超越Kafka,适合电商和金融领域等高并发、高可靠和高可用场景。
47 0
|
27天前
|
消息中间件 监控 RocketMQ
分布式事务实现方案:一文详解RocketMQ事务消息
分布式事务实现方案:一文详解RocketMQ事务消息
|
28天前
|
消息中间件 监控 安全
大事务+MQ普通消息线上问题排查过程技术分享
【8月更文挑战第23天】在复杂的企业级系统中,大事务与消息队列(MQ)的结合使用是一种常见的架构设计,用于解耦系统、提升系统响应性和扩展性。然而,这种设计也带来了其特有的挑战,特别是在处理退款业务等涉及金融交易的高敏感场景时。本文将围绕“大事务+MQ普通消息线上问题排查过程”这一主题,分享一次实际工作中的技术排查经验,旨在为大家提供可借鉴的解决思路和方法。
41 0

相关产品

  • 云消息队列 Kafka 版
  • 云消息队列 MQ