消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(下)

简介: 消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(下)

就是来了波狸猫换太子,其实延时消息也是这么实现的,最终将换了皮的消息入盘。

Broker 处理提交或者回滚消息的处理方法是 EndTransactionProcessor#processRequest,我们来看一看它做了什么操作。


image.png

可以看到,如果是提交事务就是把皮再换回来写入真正的topic所属的队列中,供消费者消费,如果是回滚则是将半消息记录到一个 half_op 主题下,到时候后台服务扫描半消息的时候就依据其来判断这个消息已经处理过了。

那个后台服务就是 TransactionalMessageCheckService 服务,它会定时的扫描半消息队列,去请求反查接口看看事务成功了没,具体执行的就是TransactionalMessageServiceImpl#check 方法。

我大致说一下流程,这一步骤其实涉及到的代码很多,我就不贴代码了,有兴趣的同学自行了解。不过我相信用语言也是能说清楚的。

首先取半消息 topic 即RMQ_SYS_TRANS_HALF_TOPIC下的所有队列,如果还记得上面内容的话,就知道半消息写入的队列是 id 是 0 的这个队列,然后取出这个队列对应的 half_op 主题下的队列,即 RMQ_SYS_TRANS_OP_HALF_TOPIC 主题下的队列。

这个 half_op 主要是为了记录这个事务消息已经被处理过,也就是说已经得知此事务消息是提交的还是回滚的消息会被记录在 half_op 中。

然后调用 fillOpRemoveMap 方法,从 half_op 取一批已经处理过的消息来去重,将那些没有记录在 half_op 里面的半消息调用 putBackHalfMsgQueue 又写入了 commitlog 中,然后发送事务反查请求,这个反查请求也是 oneWay,即不会等待响应。当然此时的半消息队列的消费 offset 也会推进。


image.png

然后producer中的 ClientRemotingProcessor#processRequest 会处理这个请求,会把任务扔到 TransactionMQProducer 的线程池中进行,最终会调用上面我们发消息时候定义的 checkLocalTransactionState 方法,然后将事务状态发送给 Broker,也是用 oneWay 的方式。

看到这里相信大家会有一些疑问,比如为什么要有个 half_op ,为什么半消息处理了还要再写入 commitlog 中别急听我一一道来。

首先 RocketMQ 的设计就是顺序追加写入,所以说不会更改已经入盘的消息,那事务消息又需要更新反查的次数,超过一定反查失败就判定事务回滚。

因此每一次要反查的时候就将以前的半消息再入盘一次,并且往前推进消费进度。而 half_op 又会记录每一次反查的结果,不论是提交还是回滚都会记录,因此下一次还循环到处理此半消息的时候,可以从 half_op 得知此事务已经结束了,因此就被过滤掉不需要处理了。

如果得到的反查的结果是 UNKNOW,那 half_op 中也不会记录此结果,因此还能再次反查,并且更新反查次数。

到现在整个流程已经清晰了,我再画个图总结一下 Broker 的事务处理流程。


image.png


Kafka 事务消息


Kafka 的事务消息和 RocketMQ 的事务消息又不一样了,RocketMQ 解决的是本地事务的执行和发消息这两个动作满足事务的约束。

而 Kafka 事务消息则是用在一次事务中需要发送多个消息的情况,保证多个消息之间的事务约束,即多条消息要么都发送成功,要么都发送失败,就像下面代码所演示的。

image.png

Kafka 的事务基本上是配合其幂等机制来实现 Exactly Once 语义的,所以说 Kafka 的事务消息不是我们想的那种事务消息,RocketMQ 的才是。

讲到这我就想扯一下了,说到这个 Exactly Once 其实不太清楚的同学很容易会误解。

我们知道消息可靠性有三种,分别是最多一次、恰好一次、最少一次,之前在消息队列连环问的文章我已经提到了基本上我们都是用最少一次然后配合消费者端的幂等来实现恰好一次。

消息恰好被消费一次当然我们所有人追求的,但是之前文章我已经从各方面已经分析过了,基本上难以达到。

而 Kafka 竟说它能实现 Exactly Once?这么牛啤吗?这其实是 Kafka 的一个噱头,你要说他错,他还真没错,你要说他对但是他实现的 Exactly Once 不是你心中想的那个 Exactly Once。

它的恰好一次只能存在一种场景,就是从 Kafka 作为消息源,然后做了一番操作之后,再写入 Kafka 中


image.png

那他是如何实现恰好一次的?就是通过幂等,和我们在业务上实现的一样通过一个唯一 Id, 然后记录下来,如果已经记录过了就不写入,这样来保证恰好一次。

所以说 Kafka 实现的是在特定场景下的恰好一次,不是我们所想的利用 Kafka 来发送消息,那么这条消息只会恰巧被消费一次

这其实和 Redis 说他实现事务了一样,也不是我们心想的事务。

所以开源软件说啥啥特性开发出来了,我们一味的相信,因此其往往都是残血的或者在特殊的场景下才能满足,不要被误导了,不能相信表面上的描述,还得详细的看看文档或者源码。

不过从另一个角度看也无可厚非,作为一个开源软件肯定是想更多的人用,我也没说谎呀,我文档上写的很清楚的,这标题也没骗人吧?

确实,比如你点进震惊xxxx标题的文章,人家也没骗你啥,他自己确实震惊的呢。


image.png

再回来谈 Kafka 的事务消息,所以说这个事务消息不是我们想要的那个事务消息,其实不是今天的主题了,不过我还是简单的说一下。

Kafka 的事务有事务协调者角色,事务协调者其实就是 Broker 的一部分。

在开始事务的时候,生产者会向事务协调者发起请求表示事务开启,事务协调者会将这个消息记录到特殊的日志-事务日志中,然后生产者再发送真正想要发送的消息,这里 Kafka 和 RocketMQ 处理不一样,Kafka 会像对待正常消息一样处理这些事务消息,由消费端来过滤这个消息

然后发送完毕之后生产者会向事务协调者发送提交或者回滚请求,由事务协调者来进行两阶段提交,如果是提交那么会先执行预提交,即把事务的状态置为预提交然后写入事务日志,然后再向所有事务有关的分区写入一条类似事务结束的消息,这样消费端消费到这个消息的时候就知道事务好了,可以把消息放出来了。

最后协调者会向事务日志中再记一条事务结束信息,至此 Kafka 事务就完成了,我拿 confluent.io 上的图来总结一下这个流程。


image.png


最后


至此我们已经知道了 RocketMQ 和 Kakfa 的事务消息全流程,可以看到 RocketMQ 的事务消息才是我们想要的,当然你要是用的流式计算那么 Kakfa 的事务消息也是你想要的。

需要贴代码的文章其实很难受,这贴的多不好,贴的少又怕不清晰,真的难,如果觉得文章不错记得点个在看哟。


image.png


相关文章
|
16天前
|
消息中间件 存储 中间件
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
43 0
|
11天前
|
消息中间件 Java 双11
RocketMQ:揭秘电商巨头背后的消息队列秘密
**RocketMQ概览:**高性能分布式消息队列,适用于有序消息、事务处理、流计算、消息推送、日志处理及Binlog分发。在双11等高流量场景下证明了其性能、稳定性和低延迟。Java开发,利于扩展,性能超RabbitMQ,支持死信队列,但可能有集成兼容性问题。适合Java开发者,为电商等场景优化,每秒处理大量消息。
32 3
RocketMQ:揭秘电商巨头背后的消息队列秘密
|
2天前
|
消息中间件 Java Kafka
你了解RabbitMQ、RocketMQ 和 Kafka吗?
【6月更文挑战第26天】比较了RabbitMQ、RocketMQ和Kafka三种消息队列:RabbitMQ灵活,支持多种协议,适合中小型应用;RocketMQ高性能,适用于大规模消息处理;Kafka则以高吞吐量和流处理见长。RabbitMQ和Kafka生态丰富,而RocketMQ运维相对复杂。选择时考虑性能、灵活性、生态系统和易用性,以及特定场景如大数据流处理或分布式系统组件通信。
9 1
|
10天前
|
消息中间件 Java Kafka
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
|
1天前
|
消息中间件 Java Kafka
使用Spring Boot和Kafka实现高效消息队列
使用Spring Boot和Kafka实现高效消息队列
|
2天前
|
消息中间件 存储 运维
RocketMQ与Kafka深度对比:特性与适用场景解析
RocketMQ与Kafka深度对比:特性与适用场景解析
|
2天前
|
消息中间件 存储 SQL
RocketMQ与Kafka架构深度对比
RocketMQ与Kafka架构深度对比
|
2天前
|
消息中间件 存储 中间件
【主流技术】聊一聊消息队列 RocketMQ 的基本结构与概念
2.6Broker 代理服务器(Broker)是消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。 2.7Pull Consumer 拉取式消费(Pull Consumer)是 Consumer 消费的一种类型,也是默认的类型。下游应用系统通常主动调用 Consumer 的拉消息方法从 Broke r服务器拉消息,即主动权由下游应用控制。一旦获取了批量消息,应用就会启动消费过程。
|
25天前
|
消息中间件 存储 缓存
消息队列之 MetaQ 和 Kafka 哪个更香!
本篇文章首先介绍了MetaQ消息队列,然后介绍了作者对MetaQ和Kafka这两个消息队列的理解。
|
1月前
|
消息中间件 数据采集 Serverless
云消息队列 RocketMQ 版-消息集成-概述
消息集成是助力企业数字化转型的全栈式消息与数据集成平台,简化流程,支持云上云下、跨区域集成。它提供低代码的事件流服务,具备数据源集成、数据清洗、Serverless自定义处理等功能,支持丰富的数据源和跨端连接。然而,使用时存在如单个任务数据限制、任务名称长度等约束。消息流入(Source)负责从各种数据源获取数据,消息流出(Sink)将数据分发到目标,数据处理(Transform)允许数据转换和分析,而任务(Task)则结合这些组件执行实际的集成操作。
203 3

热门文章

最新文章

相关产品

  • 云消息队列 Kafka 版
  • 云消息队列 MQ