Apache RocketMQ 5.0 在Stream场景的存储增强

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
可观测可视化 Grafana 版,10个用户账号 1个月
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: RocketMQ的诞生是为了解决微服务解耦的问题。微服务解耦指将传统的巨大服务拆分为分布式的微服务。

1.jpeg

本文作者:刘振东,Apache RocketMQ PMC Member


RocketMQ基础介绍


2.png


RocketMQ的诞生是为了解决微服务解耦的问题。微服务解耦指将传统的巨大服务拆分为分布式的微服务。


拆分之后,产生了一个新的问题:服务之间需要进行通信才能对外形成完整的服务。通信方式分为两种:其一为RPC方式,也称为同步通信;其二为异步通信方式,比如RocketMQ。


RocketMQ的广泛使用证明异步通信方式存在极大优势。最显著的特点即异步解耦,所谓解耦指一个服务不需要知道另外一个服务的存在。比如开发A服务,即使其他服务需要A服务的数据,A服务也并不需要知道它们的存在,不需要依赖其他服务的发布,其他服务的新增也不会对A服务造成影响,从而实现了团队的解耦,指一个微服务由一个特定的团队去完成,而其他团队并不需要知道该团队的存在,只需要根据事先约定的数据格式,通过RocketMQ实现异步通信。这种组织方式大大促进了生产力的发展,自然也促使RocketMQ得到广泛应用。


3.png


在异步解耦过程中,有的组件生产消息,有的组件消费消息。RocketMQ的API model是其异步解耦过程的抽象概念。API model 的两端是RocketMQ领域最典型的两个概念:其一为producer,指消息的生产方或者数据生产方;其二为consumer,指消息的消费方。


除此之外,topic也是API model的一个重要概念。因为异步解耦的需要,一条数据从producer发出到最终被consumer消费的过程并不是直接连接,中间有一个抽象层,这个抽象概念称为topic,相当于一个逻辑地址。topic就像一个仓库,当一条数据被发送到一个topic时,它会负责将消息暂存,其他组件需要使用时可以拿取。


RocketMQ是一个分布式的消息中间件,因此topic实质上是一个逻辑概念,真正的物理概念是分布在每一个broker上的队列,即message queue。一个topic可以具有很多message queue,可以分布在很多broker上,从而具有了无限扩展的能力,这是topic的一个基本特性。


此外,topic接收消息还有一个非常重要的特性,即消息不可变。消息的不可变特性使其可以被重复地读取。通过引入consumer group 的概念,可以看到不同组的消费者读取消息的行为相互之间不会造成影响。Topic里的数据不会因为有consumer去读取而消失,可实现一处发送多处消费的能力。比如在一个组织内,订单团队发了一条消息到订单topic,该组织内的其他所有团队都可以直接进行读取,且一个团队的读取并不会影响其他团队的读取,实现了读取的相互独立。


4.png

MQ的一个重要特性是异步解耦,在互联网的超大流量场景下,异步解耦之后往往会跟随着削峰填谷问题。为了实现削峰填谷,需要持久化的能力。MQ是一个存储引擎,它可以暂存发送者的数据。如果消费者暂时无法处理,数据可以先堆积在MQ ,等到有足够的能力消费时再读取数据。


持久化也更好地支持了异步解耦的特性,即使consumer 全部不在线也并不影响producer的发送。持久化是MQ的一项重要能力。在持久化能力里,为了配合顺序的特性,MQ的引擎是一个顺序存储的引擎。


RocketMQ设计时略微有所不同,它将所有消息集中式地存储,再根据不同的topic、不同的队列分别建立索引。这种设计是RocketMQ针对微服务场景特别优化的,它具有能够很好地支持同步刷盘的能力,在海量Topic的场景下写入延迟依然能够保持平稳,这也是RocketMQ可与其他消息引擎竞争的重要特性。


5.png

流场景最初应用于用户行为分析。用户行为分析指根据用户的行为日志去猜测用户的喜好。比如推荐系统的搜索推荐广告等业务就是流场景最典型的场景。

流场景的第一步为将各个系统里产生的用户行为,包括日志、数据库记录等,集中导入到某些分析引擎。过程中数据来源多,数据的分析引擎也很多,包括离线引擎、实时引擎等。


为了降低复杂度,我们引入了类似MQ的工具,使得数据源和数据使用者不直接交互,而是先将数据发送到MQ里,整个系统的连接复杂度会从O(N2)变为O(N),复杂度大大降低。


从用户行为或者流处理的角度分析MQ扮演的角色以及它最终所期望的IT架构,可以发现其与微服务解耦的架构非常相似,两者之间的所有概念比如consumer、producer、topic、message queue、分片等,都可以一一对应。因此,如果只考虑RocketMQ的功能,它本身就能支持流处理场景。


目前有很多公司在使用RocketMQ进行流处理,但RocketMQ在解决流处理问题时仍然存在可优化的空间。


Stream场景特征分析

6.png


流处理的场景具有三个特点:


(1)单条消息size很小:微服务解耦中,一条消息一般就是一条订单,包含的数据非常多,买家、卖家等各种信息糅杂在一起发给下游,一条消息通常会达到至少1KB甚至几KB。但在流场景里,数据类似于用户的行为日志,比如某个用户登录、某个用户下线、某个用户浏览某个页面等描述。用户行为的表达很可能只占几个字节到100个字节。


(2)消息数量很多:用户的浏览行为数量远远大于操作行为数量,整体消息数量急剧增多。通常在微服务解耦场景中,单机不会超过10万TPS。但是在流场景或者日志搜集等场景当中,单机百万TPS很常见。


(3)Catch Up读常态化:在流场景中,经常有任务的replay,即读取历史数据再计算历史结果,也称为cache up read。相对于微服务解耦场景,catch up read在流场景中会更常见。


总而言之,在整个流场景里,吞吐变得更加重要。


存储增强三步曲——批、分、合


7.png


RocketMQ起初为微服务解耦设计时,是面向单条记录,因此吞吐并不高。RocketMQ 5.0针对吞吐引入了一个新特性batch。


在传统RocketMQ里,一条消息一条记录,一条消息一条索引。这种传统设计的优点是能够保证延迟更加稳定,但也意味着吞吐不高。因为通信链路层的RPC次数太多,对CPU的消耗太大。因此,RocketMQ 5.0针对该问题,推出了batch功能。


Batch的基本逻辑是:在客户端自动组装,将多条消息按照topic和队列合并,作为一个请求发送到服务端;服务端收到消息一般不解压,而是直接存储;消费端一次拿下一批,将多个byte的消息拿到本地,再进行解压。如果每个 batch包含10条消息,TPS可以很轻松地上升10倍。原本一条消息要发一次远程请求,而加入batch后10条消息发一次远程请求即可。


因为服务端不进行解压,所以对服务端的CPU增加非常小,将解压和合并的功能下放到各个客户端,从而使服务端资源不容易形成瓶颈,TPS可以很轻松地得到提高。


8.png

流场景中另一个典型问题是扩容和数据重均衡。在微服务场景中,流量不大的情况下,扩容问题并不明显。但是在流场景中,单机流量本来就高,一旦扩容,扩容和数据重均衡问题就难以忽略。在扩容过程中,假如原先是一个node,需要扩容变成两个node,则会产生重均衡的问题。


为了解决该问题,通常有两个办法:


(1)直接增加队列的数量,即“Add a Shard”。这种方法会产生一个问题,队列的数量发生变化导致整个数据的分布也发生变化。比如做word count单词个数计算,原本A单词位于队列0,队列数发生变化之后,A单词位于队列2,计算的结果会出现问题。因此如果增加队列个数,流计算任务需要重新运行一遍来修正数据。另一问题为分片数,如果每动一次就增加分片数,则会导致分片数量膨胀而且很难减少,这也会产生问题。


(2)不增加队列,但是复制队列。比如原本队列1在node 0上,增加一个node 1,将队列1从node 0转移到 node 1,过程中队列数量没有发生变化,数据分布也没发生变化,因此客户端、发送端、消费端等流接收任务都不需要重跑。此方法对用户很友好,但也会带来一个新的问题:复制过程会导致额外带宽消耗。在流场景中需要扩容的本质原因是机器的流量过高,但是为了将流量引走还需要新增一个复制任务,在还未完成引流之前就给系统带来额外的性能消耗,可能会导致扩容的过程直接产生网络风暴,系统崩溃。另外,复制分区时,因为流计算任务的每个分片数据量很大,复制过程耗时会很长。


因此采用复制方式来解决大数据存储引擎的扩容其实很困难,可用性与可靠性难以权衡。


9.png

RocketMQ针对该问题提出了logic queue解决方案。logic queue是暴露给客户的队列,一个物理queue分布在一个node上,用于实际存储数据的队列。一个logic queue由多个物理queue通过位点映射组合而成。


位点映射的原理如下:假设LogicQueue-1由Queue-1和Queue-2组成,Queue-1包含0到100,Queue-2包含101到200,可映射成一个总位点是0到 200的LogicQueue-1。

上述情况下,只需修改映射关系,将逻辑队列修改到新node上的队列里,即可实现扩容。


写入时,新进的数据写入新节点,即实现了写入端的负载均衡。而读取过程有所不同,最新的数据会从本机读取,老数据会采用远程读取。


在流计算的整个生命周期中,数据在不断产生、不断地消费。因此在大多数情况下,如果没有产生堆积,远程读取的数量很小,几乎能够瞬间完成,写入和读取都在新节点上,以此完成扩容。这种扩容有两个明显优势:其一是不需要搬运数据;其二是分片数量不用发生变化。这也意味着上下游的客户端都无需重启,也不用发生变化,数据任务都是完整而正确的。


10.png

严格意义上来看,RocketMQ是一个流存储引擎。但RocketMQ 5.0推出了RocketMQ Streams——一个轻量级的流计算。


在RocketMQ Streams中,source 端是数据源头,中间有算子,最后数据会进入Sink端。通常它们都是有状态的,比如计算word count,每一条数据进来,一个新的单词出现,首先要拿取过去数据的count值,加1后生成新的count值,这个中间的数据为状态村塾,一般称为state store。

State store的特征包括:


(1)本地化locality:一个轻量级的流计算如果要遵循高效的计算性能,通常需要本地化。本地化指将state store的数据和计算节点放在一起,至少要缓存到计算节点里。从state store里面取数据,就相当于从本地取数据,提升性能。如果每次读取state store都是远程读取,可想而知性能会显著下降。


(2)持久化persistency:一旦灾难发生后,其他计算节点要能完整地读到state store。例如原本在A计算节点计算数据,A计算节点出问题后要到B计算节点计算,在B计算节点也需要恢复出state store。


(3)可搬运exchangeable:state store需要能便利地从一个系统搬到另外一个系统。比如算完word count,一个BI系统或者一个网站系统想尝试感知到该变化,则需要将数据从state store搬运到系统里。


11.png


针对以上三个特性,RocketMQ推出了一种新的topic,称为compacted topic。compacted topic的存储方式和使用方式和与正常topic 一样,唯一区别在于其服务端会将key相同的record删掉,进行规整。


比如图中原本offset=3的节点是K1V4,会覆盖之前的offset=0的K1V1和offset=2的K1V3,使最后只剩下一个K1V4。此设计的优点在于重新恢复时需要读取的数据量非常少。


State store是一个NoSQL型的table,比如word count就是一个KV 结构,key是单词,count是单词出现的个数,但是word count 在不断变化,需要将表转变成一个流,表发生的所有变化形成的数、记录的列表就形成了一个流,该过程称为流转,即将一个表转成一个流。RocketMQ可以通过以上方式轻松地将一张动态表存下来。


Compacted topic相当于一张动态表,且为流的形式,因此compacted topic是一种流表二象性的状态。这种特殊的topic可以充当state store的存储层——一个持久化层。


State store本身是表,且key和value不断变化。为了实现容灾的特性,需要将该表持久化,将该表的所有修改记录形成一个流,存到RocketMQ的一个compacted topic,state store即相当于被持久化。假如一个计算节点A崩溃,B计算节点接管任务时,可以将topic以普通API的方式读出,再在本地恢复state store,以此实现流计算任务的disaster recovery,即实现了容灾特性,可以帮助RocketMQ构建一个轻量级、没有任何外部依赖的流计算引擎。


12.png


Batch、logic queue、compacted topic三个存储的基本特性,分别用于解决增强吞吐的问题、弹性的问题以及state store的问题。将三个存储特性进行结合,再配合RocketMQ Streams,可以形成一个轻量级的流计算解决方案。只需要RocketMQ和RocketMQ Streams,即可实现一个通用的流计算存储引擎。


13.png


RocketMQ 5.0从原先的微服务解耦转变为流存储引擎,原先的异步解耦、削峰填补等特性依然可以在新场景中充分得以使用。


此外,RocketMQ 5.0针对流存储的场景实现了三个重大特性的增强:其一是batch,可提升性能,将吞吐能力提升10倍;其二是logic queue,可以实现秒级扩容,并且无需搬运数据,也无需改变分片数量;其三是针对流计算场景中所用state store实现了compacted topic。


经过增强,RocketMQ向流存储引擎发展的过程更进了一步。


加入 Apache RocketMQ 社区


十年铸剑,Apache RocketMQ 的成长离不开全球接近 500 位开发者的积极参与贡献,相信在下个版本你就是 Apache RocketMQ 的贡献者,在社区不仅可以结识社区大牛,提升技术水平,也可以提升个人影响力,促进自身成长。

社区 5.0 版本正在进行着如火如荼的开发,另外还有接近 30 个 SIG(兴趣小组)等你加入,欢迎立志打造世界级分布式系统的同学加入社区,添加社区开发者微信:rocketmq666 即可进群,参与贡献,打造下一代消息、事件、流融合处理平台。

14.jpeg

微信扫码添加小火箭进群

另外还可以加入钉钉群与 RocketMQ 爱好者一起广泛讨论:

15.png

钉钉扫码加群

关注「Apache RocketMQ」公众号,获取更多技术干货

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 人工智能 Apache
Apache RocketMQ 中文社区全新升级!
RocketMQ 中文社区升级发布只是起点,我们将持续优化体验细节,推出更多功能和服务,更重要的是提供更多全面、深度、高质量的内容。
576 16
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
225 2
|
5月前
|
消息中间件 安全 API
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(1)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
296 1
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(1)
|
17天前
|
监控 Cloud Native BI
8+ 典型分析场景,25+ 标杆案例,Apache Doris 和 SelectDB 精选案例集(2024版)电子版上线
飞轮科技正式推出 Apache Doris 和 SelectDB 精选案例集 ——《走向现代化的数据仓库(2024 版)》,汇聚了来自各行各业的成功案例与实践经验。该书以行业为划分标准,辅以使用场景标签,旨在为读者提供一个高度整合、全面涵盖、分类清晰且易于查阅的学习资源库。
|
5月前
|
消息中间件 安全 Apache
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(2)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
243 0
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(2)
|
1月前
|
存储 分布式计算 druid
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
57 1
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
|
1月前
|
SQL 存储 分布式计算
大数据-157 Apache Kylin 背景 历程 特点 场景 架构 组件 详解
大数据-157 Apache Kylin 背景 历程 特点 场景 架构 组件 详解
25 9
|
2月前
|
存储 JSON 物联网
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
本文我们将聚焦企业最普遍使用的 JSON 数据,分别介绍业界传统方案以及 Apache Doris 半结构化数据存储分析的三种方案,并通过图表直观展示这些方案的优势与不足。同时,结合具体应用场景,分享不同需求场景下的使用方式,帮助用户快速选择最合适的 JSON 数据存储及分析方案。
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
|
3月前
|
存储 消息中间件 运维
招联金融基于 Apache Doris 数仓升级:单集群 QPS 超 10w,存储成本降低 70%
招联内部已有 40+ 个项目使用 Apache Doris ,拥有超百台集群节点,个别集群峰值 QPS 可达 10w+ 。通过应用 Doris ,招联金融在多场景中均有显著的收益,比如标签关联计算效率相较之前有 6 倍的提升,同等规模数据存储成本节省超 2/3,真正实现了降本提效。
招联金融基于 Apache Doris 数仓升级:单集群 QPS 超 10w,存储成本降低 70%
|
1月前
|
消息中间件 前端开发 Java
java高并发场景RabbitMQ的使用
java高并发场景RabbitMQ的使用
89 0

相关产品

  • 云消息队列 MQ
  • 推荐镜像

    更多