消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(上)

简介: 消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(上)

每个时代,都不会亏待会学习的人。

大家好,我是 yes。

今天我们来谈一谈消息队列的事务消息,一说起事务相信大家都不陌生,脑海里蹦出来的就是 ACID。

通常我们理解的事务就是为了一些更新操作要么都成功,要么都失败,不会有中间状态的产生,而 ACID 是一个严格的事务实现的定义,不过在单体系统时候一般都不会严格的遵循 ACID 的约束来实现事务,更别说分布式系统了。

分布式系统往往只能妥协到最终一致性,保证数据最终的完整性和一致性,主要原因就是实力不允许...因为可用性为王。

而且要保证完全版的事务实现代价很大,你想想要维护这么多系统的数据,不允许有中间状态数据可以被读取,所有的操作必须不可分割,这意味着一个事务的执行是阻塞的,资源是被长时间锁定的。

在高并发情况下资源被长时间的占用,就是致命的伤害,举一个有味道的例子,如厕高峰期,好了懂得都懂。


image.png


对了, ACID 是什么还不太清楚的同学,赶紧去查一查,这里我就不展开说了。


分布式事务


那说到分布式事务,常见的有 2PC、TCC 和事务消息,这篇文章重点就是事务消息,不过 2PC 和 TCC 我稍微提一下。


2PC


2PC 就是二阶段提交,分别有协调者和参与者两个角色,二阶段分别是准备阶段和提交阶段。

准备阶段就是协调者向各参与者发送准备命令,这个阶段参与者除了事务的提交啥都做了,而提交阶段就是协调者看看各个参与者准备阶段都 o 不 ok,如果有 ok 那么就向各个参与者发送提交命令,如果有一个不 ok 那么就发送回滚命令。

这里的重点就是 2PC 只适用于数据库层面的事务,什么意思呢?就是你想在数据库里面写一条数据同时又要上传一张图片,这两个操作 2PC 无法保证两个操作满足事务的约束。

而且 2PC 是一种强一致性的分布式事务,它是同步阻塞的,即在接收到提交或回滚命令之前,所有参与者都是互相等待,特别是执行完准备阶段的时候,此时的资源都是锁定的状态,假如有一个参与者卡了很久,其他参与者都得等它,产生长时间资源锁定状态下的阻塞

总体而言效率低,并且存在单点故障问题,协调者是就是那个单点,并且在极端条件下存在数据不一致的风险,例如某个参与者未收到提交命令,此时宕机了,恢复之后数据是回滚的,而其他参与者其实都已经执行了提交事务的命令了。


TCC


TCC 能保证业务层面的事务,也就是说它不仅仅是数据库层面,上面的上传图片这种操作它也能做。

TCC 分为三个阶段 try - confirm - cancel,简单的说就是每个业务都需要有这三个方法,先都执行 try 方法,这一阶段不会做真正的业务操作,只是先占个坑,什么意思呢?比如打算加 10 个积分,那先在预添加字段加上这 10 积分,这个时候用户账上的积分其实是没有增加的。

然后如果都 try 成功了那么就执行 confirm 方法,大家都来做真正的业务操作,如果有一个 try 失败了那么大家都执行 cancel 操作,来撤回刚才的修改。

可以看到 TCC 其实对业务的耦合性很大,因为业务上需要做一定的改造才能完成这三个方法,这其实就是 TCC 的缺点,并且 confirm 和 cancel 操作要注意幂等,因为到执行这两步的时候没有退路,是务必要完成的,因此需要有重试机制,所以需要保证方法幂等。


事务消息


事务消息就是今天文章的主角了,它主要是适用于异步更新的场景,并且对数据实时性要求不高的地方

它的目的是为了解决消息生产者与消息消费者的数据一致性问题。

比如你点外卖,我们先选了炸鸡加入购物车,又选了瓶可乐,然后下单,付完款这个流程就结束了。

image.png


而购物车里面的数据就很适合用消息通知异步删除,因为一般而言我们下完单不会再去点开这个店家的菜单,而且就算点开了购物车里还有这些菜品也没有关系,影响不大。

我们希望的就是下单成功之后购物车的菜品最终会被删除,所以要点就是下单和发消息这两个步骤要么都成功要么都失败


RocketMQ 事务消息


我们先来看一下 RocketMQ 是如何实现事务消息的。

RocketMQ 的事务消息也可以被认为是一个两阶段提交,简单的说就是在事务开始的时候会先发送一个半消息给 Broker。

半消息的意思就是这个消息此时对 Consumer 是不可见的,而且也不是存在真正要发送的队列中,而是一个特殊队列。

发送完半消息之后再执行本地事务,再根据本地事务的执行结果来决定是向 Broker 发送提交消息,还是发送回滚消息。

此时有人说这一步发送提交或者回滚消息失败了怎么办?

影响不大,Broker 会定时的向 Producer 来反查这个事务是否成功,具体的就是 Producer 需要暴露一个接口,通过这个接口 Broker 可以得知事务到底有没有执行成功,没成功就返回未知,因为有可能事务还在执行,会进行多次查询。

如果成功那么就将半消息恢复到正常要发送的队列中,这样消费者就可以消费这条消息了。

我们再来简单的看下如何使用,我根据官网示例代码简化了下。


微信图片_20220511213512.png


可以看到使用起来还是很简便直观的,无非就是多加个反查事务结果的方法,然后把本地事务执行的过程写在 TransationListener 里面。

至此 RocketMQ 事务消息大致的流程已经清晰了,我们画一张整体的流程图来过一遍,其实到第四步这个消息要么就是正常的消息,要么就是抛弃什么都不存在,此时这个事务消息已经结束它的生命周期了。


image.png

image.png



RocketMQ 事务消息源码分析


然后我们再从源码的角度来看看到底是怎么做的,首先我们看下sendMessageInTransaction 方法,方法有点长,不过没有关系结构还是很清晰的。

image.png


流程也就是我们上面分析的,将消息塞入一些属性,标明此时这个消息还是半消息,然后发送至 Broker,然后执行本地事务,然后将本地事务的执行状态发送给 Broker ,我们现在再来看下 Broker 到底是怎么处理这个消息的

在 Broker 的 SendMessageProcessor#sendMessage 中会处理这个半消息请求,因为今天主要分析的是事务消息,所以其他流程不做分析,我大致的说一下原理。

简单的说就是 sendMessage 中查到接受来的消息的属性里面MessageConst.PROPERTY_TRANSACTION_PREPARED 是 true ,那么可以得知这个消息是事务消息,然后再判断一下这条消息是否超过最大消费次数,是否要延迟,Broker 是否接受事务消息等操作后,将这条消息真正的 topic 和队列存入属性中,然后重置消息的 topic 为RMQ_SYS_TRANS_HALF_TOPIC ,并且队列是 0 的队列中,使得消费者无法读取这个消息。

以上就是整体处理半消息的流程,我们来看一下源码。

image.png

相关文章
|
1月前
|
消息中间件 存储 大数据
Apache Kafka: 强大消息队列系统的介绍与使用
Apache Kafka: 强大消息队列系统的介绍与使用
|
1月前
|
消息中间件 存储 缓存
Kafka【基础知识 01】消息队列介绍+Kafka架构及核心概念(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 01】消息队列介绍+Kafka架构及核心概念(图片来源于网络)
96 2
|
1月前
|
消息中间件 存储 Kafka
RabbitMQ、RocketMQ和Kafka全面对决,谁是最佳选择?
1、应用场景 1.RabbitMQ: 适用于易用性和灵活性要求较高的场景 异步任务处理:RabbitMQ提供可靠的消息传递机制,适用于处理异步任务,例如将耗时的任务放入消息队列中,然后由消费者异步处理,提高系统的响应速度和可伸缩性。 解耦系统组件:通过使用RabbitMQ作为消息中间件,不同的系统组件可以通过消息进行解耦,实现松耦合的架构,提高系统的可维护性和灵活性。 事件驱动架构:RabbitMQ的发布-订阅模式可以用于构建事件驱动架构,将系统中的事件作为消息发布到相应的主题,不同的消费者可以订阅感兴趣的主题进行相应的处理。
177 2
|
11天前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【4月更文挑战第17天】本文介绍了在Java环境下使用Apache Kafka进行消息队列处理的方法。Kafka是一个分布式流处理平台,采用发布/订阅模型,支持高效的消息生产和消费。文章详细讲解了Kafka的核心概念,包括主题、生产者和消费者,以及消息的存储和消费流程。此外,还展示了Java代码示例,说明如何创建生产者和消费者。最后,讨论了在高并发场景下的优化策略,如分区、消息压缩和批处理。通过理解和应用这些策略,可以构建高性能的消息系统。
|
25天前
|
消息中间件 存储 负载均衡
消息队列学习之kafka
【4月更文挑战第2天】消息队列学习之kafka,一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台。
17 2
|
30天前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
48 1
|
2月前
|
消息中间件 存储 监控
美团面试:Kafka如何处理百万级消息队列?
美团面试:Kafka如何处理百万级消息队列?
135 1
|
3月前
|
消息中间件 监控 负载均衡
Kafka高级应用:如何配置处理MQ百万级消息队列?
在大数据时代,Apache Kafka作为一款高性能的分布式消息队列系统,广泛应用于处理大规模数据流。本文将深入探讨在Kafka环境中处理百万级消息队列的高级应用技巧。
178 0
|
3月前
|
消息中间件 存储 Kafka
云消息队列 Kafka 版生态谈第一期:无代码转储能力介绍
云消息队列 Kafka 版生态谈第一期:无代码转储能力介绍
|
3月前
|
消息中间件 存储 Kafka
Kafka - 消息队列的两种模式
Kafka - 消息队列的两种模式
88 0

相关产品

  • 云消息队列 Kafka 版
  • 云消息队列 MQ