本文作者:李伟 - Apache RocketMQ Committer,RocketMQ Python客户端项目Owner ,Apache Doris Contributor,腾讯云消息队列资深开发工程师,著有《RocketMQ分布式消息中间件(核心原理与最佳实践)》。
RocketMqueue101
RocketMQ拥有诸多出色的特性:
比如多副本机制,RocketMQ支持存储层的多副本Dledger,它是基于 Raft 协议的一致性存储库,保证能够从存储层实现多副本;
比如ACL 鉴权机制,用于确定哪些 producer 能生产、哪些消费者组能消费,以及服务端的消息过滤;
比如事务消息,它是 RocketMQ 实现的生产者事务,生产者向 broker 发送一条事务消息,由生产者执行本地事务。如果执行成功,则向 broker 端发送 commit 事件,消费者才能消费;如果本地事务处理失败,则发送rollback事件,使消费者无法消费该消息。
比如Request-Reply ,它是类似于同步 RPC 调用的过程,用户相同的逻辑用消息来实现,能够实现同步 RPC 调用的过程,可以将调用API和发送消息两套逻辑进行统一。
广播消息指消息发出后,订阅它的所有消费者都能消费到所有实例。负载均衡消费指默认策略下,同一个消费者组的消费者都能平均地消费消息,具体策略可自行调整。RocketMQ支持Pull、Push和Pop三种消费模式,支持java、go、cpp、python、c#等多种语言。
搭建RocketMQ集群的流程如下:
第一步:安装NameServer集群。NameServer集群包含一个或多个NameServer节点。启动服务时,默认监听 9876 端口。NameServer集群搭建好之后,启动一套Broker集群。
第二步:搭建Broker集群,使用经典master-slave部署模式, master 提供读写,同时会将数据存储和元数据同步一份到 slave 。通过 10912 的 HA 端口做数据同步。
第三步:写生产者代码生产。生产者集群包含多个生产者实例,通过 broker 的 10911 和 10909 端口向 broker 发送数据。
第四步:消费者通过 10911 或10909端口向 broker 拉取数据。
生产者或消费者实例启动时,会先配置NameServer地址,由生产者或消费者从NameServer集群上拉取topic、Queue和Broker等路由信息,然后根据路由信息发送或拉取消息。
生产者和消费者均与broker之间存在 channel 连接。如果生产者或消费者长时间没有与 broker 联系,则 broker 会将连接剔除。
以下为RocketMQ101相关名词解析:
生产者包含生产者组和生产者实例。生产者组是若干个生产者实例的组合,且RocketMQ希望同一个生产者组内的实例行为一致。消费者组和消费者实例也同理。行为一致指生产者实例都生产同一种类型的消息,比如都生产订单消息,包括创建订单、订单发货、订单删除等步骤。行为一致的好处在于消息的生产和消费比较规整,不会出现混乱。
Topic 是消息的分类,为字符串形式,可以通过 topic 将某集群内的全部消息进行分类,所有 topic 的消息组成全量的消息。而Tag 又属于 topic 的子分类。
消费者在订阅消息时,必须先指定topic再指定 tag ,这样的一条记录被称为订阅关系。如果订阅关系不一致,则会导致订阅混乱,发生重复消费或不消费、消息堆积等情况。
Queue类似于分区,但它是逻辑上的概念,并不是物理存储上的概念。Property类似于 header,property 包含除了主要信息以外的扩展信息,比如消息属于哪个业务 ID、发送者IP 等。向某个 topic 发送消息时,能够指定 property 。
NameServer中包含 broker 与 cluster 的关系、Queue\ topic 与broker 的关系,即路由信息。
Broker中包含以下四部分:
① CommitLog——常规的文件存储。RocketMQ 发送的数据会append到 CommitLog。
② Consumer queue——消费者在消费 topic 时,topic 中包含多个queue,每一个queue都被称为 consumer queue,每个消费者对于每个 consumer queue 都存在消费进度。
③ index——在 dashboard 上能够根据 key 来查询消息。
④ Dledger commitlog——由Dledger 存储库来管理的 CommitLog,能够实现多副本。
RocketMQ的生产消费模型十分简单。如上图,Topic A 有四个queue,其中 queue1、queue2 在 Master Broker 1 上,queue3、queue4在 Master Broker 2 上。ProducerGroup A 下有两个生产者实例,分别向两个 broker 的 queue 发送消息。Consumer Group A 也有两个消费者 consume 1和consume 2。
从四个queue里取消息时,每个消费者默认的策略是依次向 queue1、queue2、queue3、queue4 循环发消息,以此最大程度地保证消息分布均匀。
消费者的消费模式有负载均衡和广播消费消费两种。
负载均衡策略下,比如共有4条queue,则consumer instant1和consumer instant2会分别被分配到2个queue,具体分配到哪两条需由算法决定。
广播消费策略下,假设 topic 有 100 条消息,则 consumer instance 1 和 consumer instance 2每一个消费者实例都会消费到 100 条消息,即同消费者组的每个消费者示例都会消费到全量的消息。
RocketMQ 生态项目
RocketMQ生态项目包含以下几个部分:
客户端:客户端主要分为Java客户端与非Java客户端,其中RocketMQ Java 客户端是最原生的客户端,与RocketMQ的编写语言一致,功能也最为齐全。
计算:RocketMQ支持轻量级的预计算,比如轻量级的 ETL。RocketMQ-Flink 能够直接对接 Flink,方便将RocketMQ数据传输到 Flink 做计算,利用 Flink 强大的生态同步到下游多种类型的目的地。RocketMQ-Connect与RocketMQ-Streams是轻量级的计算框架,功能更简单、轻量,部署运维也更容易。
管控:RocketMQ-Dashboard 拥有简单稳定且功能强大的管控端,能够支持常用的运维操作比如修改配置、禁用消费者等。
云原生:RocketMQ-Docker 支持打包 RocketMQ 源码成为Docker image 项目,能够支持各种不同平台的打包。RocketMQ-Operator支持RocketMQ上K8s,能够支持比如重启进程、下发配置、拉起集群等操作。
监控:RocketMQ-Exporter目前能够支持80+指标,可直接导入到Prometheus做告警和监控。开源项目可通过Prometheus的数据配置 Grafana 做大盘,实现监控能力。此外,Prometheus能够支持Hook回调,方便公司用户将RocketMQ指标监控对接到自己的告警平台。
云原生是技术行业的趋势,能够减少成本、方便运维和管理。RocketMQ新版本实现了存储计算分离,支持更快速、更方便地上 K8s 。EDA 事件驱动和无服务也是大势所趋,比如腾讯云的云函数、阿里云的 eventbridge 等产品都是 Serverless、EDA场景,能够直接集成RocketMQ。微服务领域,RocketMQ也提供了诸多原生支持。
电商、金融等传统领域正在进行数字化转型,消息传递、指标、日志传递等需求都能够利用 Rocket MQ 简单快速地实现。
总而言之,RocketMQ能够利用自己强大的生态项目,支持企业各种各样形态的数据传输和计算。
RocketMQ数据流构建
RocketMQ的数据流构建主要包含消息、CDC数据流、监控数据流以及湖仓数据流。CDC数据主要负责记录记录数据变更,监控数据流包括业务监控和常规监控。
消息的构建如上图所示。
以订单服务为例,订单服务收到创建订单的请求,创建成功后会将订单的基本信息通过RocketMQ发送给 B 服务。假设B服务为短信服务消费,由B服务向客户发送短信通知,包含订单相关的详细信息。
RocketMQ发送消息至B服务时,通过重试和死信实现最终一致性,以保证消息能够成功发送给消费者。RocketMQ有 16 次重试机会,且为阶梯性重试,能够持续十几个小时。
RocketMQ 支持通过Canal/Flink CDC、RocketMQ-collect 的方式,将 Binlog 等数据提供给计算平台,再由RocketMQ Flink、RocketMQ Streams 等进行轻量级的计算。计算完成后,将结果转发给下游数据库比如MySQL、ES、Redis等,进行异构或同构的数据同步。
RocketMQ支持从flume读取日志文件发送至RocketMQ,再通过 RocketMQ Collect 或RocketMQ Flink等将日志数据进行消费、ETL 转换或发送至 ES 。ES 已与ELK 产品打通,可以在 Kabana上查看日志。
除了日志,RocketMQ能够在业务系统做后端监控埋点,通过 RocketMQ client 将监控埋点数据发到RocketMQ,再通过RocketMQ Flink 或RocketMQ Streams 消费数据并发送给业务监控平台或数据湖仓库等,生成在线报表或实时报表。
前端监控大部分通过 HTTP 请求发送至RocketMQ,再通过RocketMQ相关的轻量级计算框架,根据不同诉求将数据汇总至不同的后端,比如 ES 或自建平台。
所有数据都能入到湖仓,因为所有数据都会有数据分析、数据挖掘或出统计报表的诉求。比如前后端的监控、 TP 数据库里的业务数据、日志文件的指标数据或日志文件都能通过对应的工具发到 RocketMQ,通过RocketMQ 提供的轻量级计算工具进行计算,然后发送到下游的 Hive、Doris、Clickhouse 或Hudi等数据库或数据仓库,产出报表、实时大盘、实时数据表等。
RocketMQ 能够采集各种数据,比如metrics、TP数据、log的数据,然后通过RocketMQ 提供的轻量级计算工具进行计算,最终汇总到同构/异构的数据库、数据仓库或数据湖等。
数据构建流程中,RocketMQ 作为中间核心的传输链路,是否能够借助本身的特性避免偶然性的因素影响数据的传输?
RocketMQ 的架构十分简单,而简单也意味着稳定和可靠。因此,使用RocketMQ 做核心数据链路时,其稳定性和可靠性能够避免很多意外,减少不可控因素。
网络抖动往往无法避免,它可能导致数据丢失,而RocketMQ 能够通过重试机制保证数据的最终一致。比如消息只发一半时发生了网络抖动,网络恢复如何保证数据最终能够被消费者完整地消费?
默认的消费机制下,RocketMQ 有16次重试机会,按阶梯重试,重试间隔逐渐增加,最大限度地让消费者能够消费到数据。如果 16 次重试后依然没有消费成功,则消息会进入死信队列,由人工介入处理。产生死信消息后,RocketMQ 能够产生告警,以快速发现并处理问题。
针对数据丢失,RocketMQ 提供了消息轨迹,帮助快速定位,找到问题所在。消费者消息是否成功发送、 broker 是否存储成功、消费者是否成功消费到等问题,都可以通过消息轨迹进行确认。
针对带宽打满的问题,RocketMQ提供了服务端过滤的功能。假设Topic内是访问日志,将 tag 设为域名,消费者组可以只订阅某个域名下的访问日志,RocketMQ能够在服务端对消息进行过滤,再发送给消费者组。Broker 只会将属于消费者的域名消息发送给消费者,不会发送所有消息,因此能节约大量带宽,可高达80%-90%。
RocketMQ 5.0
RocketMQ5.0 架构有两个重大改变,实现了存储计算分离以及轻量级客户端。
存储和计算分离主要为数据层面,将做存储和计算的 broker 拆分成了存储的 broker 和计算的 broker,两类broker各司其职,分别负责存储和计算。
此前,RocketMQ的客户端生态较为丰富,但各个客户端的功能差异较大,难以实现一致。RocketMQ5.0 彻底地解了该问题,实现了轻量级的基于gRPC的多语言客户端。RocketMQ5.0 将此前客户端的重逻辑比如 rebalance 等转移至由Cbroker负责处理,使客户端逻辑变得非常轻量,客户端只剩消息消费或发消息调用接口,各个语言的逻辑容易统一,兼容性更好,不会出现实现方式不同导致逻辑不一致。
RocketMQ5.0 除了存储和计算分离以外,还实现了数据面和控制面的拆分。控制面主要负责接入,此前只能通过NameServer的方式接入,而现在除了NameServer以外还提供了一种新的通过LB Group的方式接入,更简单易用。LB Group 能够方便大家用更简单的方式接入,逻辑集群的接入可以通过LB group 来实现,比如哪些客户端应该连到哪些集群,这也是NameServer难以实现的能力。一组NameServer会管理一个物理集群,物理集群可拆分为多个逻辑集群,每个逻辑集群能够分给不同的租户使用。
Rocket MQ 可以看作是一个通道,通道有上游和下游,且不同行业的上下游不一样,通道中的数据也不一样。
互联网已经涉及到每一个领域,但是垂直领域的发展依然非常欠缺。比如配送互联网涉及到交通运输等,需要有交通运输方面的专家与互联网技术进行深度结合,才能对配送行业引起深远广泛的影响。
未来,我们需要技术人员深耕于某一行业,做出真正适用于行业的优秀的互联网产品。
当前,RocketMQ已经能够支持事件和流。工业互联网行业非常重要的一个元素是IoT事件,它可能来自于各种终端设备,每天都会产生大量的、持续的事件,这些大量的数据都需要队列进行传输,提供给下游做计算。
因此,RocketMQ未来的发展将着力于事件和流。
RocketMQ已经推出了 RocketMQ Collect、RocketMQ Flink和RocketMQ streams ,在流计算上逐渐发力,形成了一整套完善的生态,能够帮助用户快速构建流式应用。而消息更是 RocketMQ 的擅长之处,能够帮助用户在不同场景的消息下方便、快速地接入使用。RocketMQ 已经开源了MQTT等协议,使接入设备更快速方便。
随着RocketMQ 5.0的发布,RocketMQ 在处理消息、事件和流上实现了统一,有了越来越强大的优势,存储和计算分离的特性也使其能提供更低的成本,使企业上云更省钱、更省力,也更省人力。
加入 Apache RocketMQ 社区
十年铸剑,Apache RocketMQ 的成长离不开全球接近 500 位开发者的积极参与贡献,相信在下个版本你就是 Apache RocketMQ 的贡献者,在社区不仅可以结识社区大牛,提升技术水平,也可以提升个人影响力,促进自身成长。
社区 5.0 版本正在进行着如火如荼的开发,另外还有接近 30 个 SIG(兴趣小组)等你加入,欢迎立志打造世界级分布式系统的同学加入社区,添加社区开发者微信:rocketmq666 即可进群,参与贡献,打造下一代消息、事件、流融合处理平台。
微信扫码添加小火箭进群
另外还可以加入钉钉群与 RocketMQ 爱好者一起广泛讨论:
钉钉扫码加群
关注「Apache RocketMQ」公众号,获取更多技术干货