Apache RocketMQ 的 Service Mesh 开源之旅

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
传统型负载均衡 CLB,每月750个小时 15LCU
简介: 自 19 年底开始,支持 Apache RocketMQ 的 Network Filter 历时 4 个月的 Code Review([Pull Request](https://github.com/envoyproxy/envoy/pull/9503)),于本月正式合入 CNCF Envoy 官方社区([RocketMQ Proxy Filter 官方文档](https://www.

云原生消息中间件团队招聘中,职位需求见 JD,欢迎大家踊跃投递!

写在前面

自 19 年底开始,支持 Apache RocketMQ 的 Network Filter 历时 4 个月的 Code Review(Pull Request),于本月正式合入 CNCF Envoy 官方社区(RocketMQ Proxy Filter 官方文档),这使得 RocketMQ 成为继 Dubbo 之后,国内第二个成功进入 Service Mesh 官方社区的中间件产品。

Service Mesh 下的消息收发

主要流程如下图:

sub_and_pop

图 1 : Service Mesh 下的消息收发

简述一下 Service Mesh 下 RocketMQ 消息的发送与消费过程:

  • Pilot 获取到 Topic 的路由信息并通过 xDS 的形式下发给数据平面/Envoy ,Envoy 会代理 SDK 向 Broker/Nameserver 发送的所有的网络请求。
  • 发送时,Envoy 通过 request code 判断出请求为发送,并根据 topic 和 request code 选出对应的 CDS,然后通过 Envoy 提供的负载均衡策略选出对应的 Broker 并发送,这里会使用数据平面的 subset 机制来确保选出的 Broker 是可写的。
  • 消费时,Envoy 通过 request code 判断出请求为消费,并根据 topic 和 request code 选出对应的 CDS,然后和发送一样选出对应的 Broker 进行消费(与发送类似,这里也会使用 subset 来确保选出的 Broker 是可读的),并记录相应的元数据,当消息消费 SDK 发出 ACK 请求时会取出相应的元数据信息进行比对,再通过路由来准确将 ACK 请求发往上次消费时所使用的 Broker。

RocketMQ Mesh 化所遭遇的难题

Service Mesh 常常被称为下一代微服务,这一方面揭示了在早期 Mesh 化浪潮中微服务是绝对的主力军,另一方面,微服务的 Mesh 化也相对更加便利,而随着消息队列和一些数据库产品也逐渐走向 Service Mesh,各个产品在这个过程中也会有各自的问题亟需解决,RocketMQ 也没有例外。

有状态的网络模型

RocketMQ 的网络模型比 RPC 更加复杂,是一套有状态的网络交互,这主要体现在两点:

  • RocketMQ 目前的网络调用高度依赖于有状态的 IP
  • 原生 SDK 中消费时的负载均衡使得每个消费者的状态不可以被忽略

对于前者,使得现有的 SDK 完全无法使用分区顺序消息,因为发送请求和消费请求 RPC 的内容中并不包含 IP/(BrokerName + BrokerId) 等信息,导致使用了 Mesh 之后的 SDK 不能保证发送和消费的 Queue 在同一台 Broker 上,即 Broker 信息本身在 Mesh 化的过程中被抹除了。当然这一点,对于只有一台 Broker 的全局顺序消息而言是不存在的,因为数据平面在负载均衡的时候并没有其他 Broker 的选择,因此在路由层面上,全局顺序消息是不存在问题的。

对于后者,RocketMQ 的 Pull/Push Consumer 中 Queue 是负载均衡的基本单位,原生的 Consumer 中其实是要感知与自己处于同一 ConsumerGroup 下消费同一 Topic 的 Consumer 数目的,每个 Consumer 根据自己的位置来选择相应的 Queue 来进行消费,这些 Queue 在一个 Topic-ConsumerGroup 映射下是被每个 Consumer 独占的,而这一点在现有的数据平面是很难实现的,而且,现有数据平面的负载均衡没法做到 Queue 粒度,这使得 RocketMQ 中的负载均衡策略已经不再适用于 Service Mesh 体系下。

此时我们将目光投向了 RocketMQ 为支持 HTTP 而开发的 Pop 消费接口,在 Pop 接口下,每个 Queue 可以不再是被当前 Topic-ConsumerGroup 的 Consumer 独占的,不同的消费者可以同时消费一个 Queue 里的数据,这为我们使用 Envoy 中原生的负载均衡策略提供了可能。

pop_consumer

图 2 : Before/After Mesh 的针对 Consumer 的 Queue 分配

图 2 右侧即为 Service Mesh 中 Pop Consumer 的消费情况,在 Envoy 中我们会忽略掉 SDK 传来的 Queue 信息。

弹内海量的 Topic 路由信息

在集团内部,Nameserver 中保存着上 GB 的 Topic 路由信息,在 Mesh 中,我们将这部分抽象成 CDS,这使得对于无法预先知道应用所使用的 Topic 的情形而言,控制平面只能全量推送 CDS,这无疑会给控制平面带来巨大的稳定性压力。

在 Envoy 更早期,是完全的全量推送,在数据平面刚启动时,控制平面会下发全量的 xDS 信息,之后控制平面则可以主动控制数据的下发频率,但是无疑下发的数据依旧是全量的。后续 Envoy 支持了部分的 delta xDS API,即可以下发增量的 xDS 数据给数据平面,这当然使得对于已有的 sidecar ,新下发的数据量大大降低,但是 sidecar 中拥有的 xDS 数据依然是全量的,对应到 RocketMQ ,即全量的 CDS 信息都放在内存中,这是我们不可接受的。于是我们希望能够有 on-demand CDS 的方式使得 sidecar 可以仅仅获取自己想要的 CDS 。而此时正好 Envoy 支持了 delta CDS,并仅支持了这一种 delta xDS。其实此时拥有 delta CDS 的 xDS 协议本身已经提供了 on-demand CDS 的能力,但是无论是控制平面还是数据平面并没有暴露这种能力,于是在这里对 Envoy 进行了修改并暴露了相关接口使得数据平面可以主动向控制平面发起对指定 CDS 的请求,并基于 delta gRPC 的方式实现了一个简单的控制平面。Envoy 会主动发起对指定 CDS 资源的请求,并提供了相应的回调接口供资源返回时进行调用。

对于 on-demand CDS 的叙述对应到 RocketMQ 的流程中是这样的,当 GetTopicRoute 或者 SendMessage 的请求到达 Envoy 时,Envoy 会 hang 住这个流程并发起向控制平面中相应 CDS 资源的请求并直到资源返回后重启这个流程。

关于 on-demand CDS 的修改,之前还向社区发起了 Pull Request ,现在看来当时的想法还是太不成熟了。原因是我们这样的做法完全忽略了 RDS 的存在,而将 CDS 和 Topic 实现了强绑定,甚至名称也一模一样,关于这一点,社区的 Senior Maintainer @htuch 对我们的想法进行了反驳,大意就是实际上的 CDS 资源名可能带上了负载均衡方式,inbound/outbound 等各种 prefix 和 suffix,不能直接等同于 Topic 名,更重要的是社区赋予 CDS 本身的定义是脱离于业务的,而我们这样的做法过于 tricky ,是与社区的初衷背道而驰的。

因此我们就需要加上 RDS 来进行抽象,RDS 通过 topic 和其他信息来定位到具体所需要的 CDS 名,由于作为数据平面,无法预先在代码层面就知道所需要找的 CDS 名,那么如此一来,通过 CDS 名来做 on-demand CDS 就更无从谈起了,因此从这一点出发只能接受全量方案,不过好在这并不会影响代码贡献给社区。

route_config:
  name: default_route
  routes:
  - match:
      topic:
        exact: mesh
      headers:
        - name: code
          exact_match: 105
    route:
      cluster: foo-v145-acme-tau-beta-lambda

图 3 : RocketMQ Filter 的 RDS 配置示例

上面可以看到对于 topic 名为 mesh 的请求会被 RDS 路由到 foo-v145-acme-tau-beta-lambda 这个 CDS 上,事先我们只知道 topic 名,无法知道被匹配到的 CDS 资源名。

如今站在更高的视角,发现这个错误很简单,但是其实这个问题我们直到后续 code review 时才及时纠正,确实可以更早就做得更好。

不过从目前社区的动态来看,on-demand xDS 或许已经是一个 roadmap,起码目前 xDS 已经全系支持 delta ,VHDS 更是首度支持了 on-demand 的特性。

Mesh 为 RocketMQ 带来了什么?

A service mesh is a dedicated infrastructure layer for handling service-to-service communication. It’s responsible for the reliable delivery of requests through the complex topology of services that comprise a modern, cloud native application. In practice, the service mesh is typically implemented as an array of lightweight network proxies that are deployed alongside application code, without the application needing to be aware.

这是 Service Mesh 这个词的创造者 William Morgan 对其做出的定义,概括一下:作为网络代理,并对用户透明,承担作为基础设施的职责。

providers.png

图 4 : Service Mesh Providers

这里的职责在 RocketMQ 中包括服务发现,负载均衡,流量监控等职责,使得调用方和被代理方的职责大大降低了。

当然目前的 RocketMQ Filter 为了保证兼容性做出了很多让步,比如为了保证 SDK 可以成功获取到路由,将路由信息聚合包装成了TopicRouteData返回给了 SDK ,但是在理想情况下,SDK 本身已经不需要关心路由了,纯为 Mesh 情景设计的 SDK 是更加精简的,不再会有消费侧 Rebalance,发送和消费的服务发现,甚至在未来像消息体压缩和 schema 校验这些功能 SDK 和 Broker 或许都可以不用再关心,来了就发送/消费,发送/消费完就走或许才是 RocketMQ Mesh 的终极形态。

pop_consumer

图 5 : 应用维度下终态的 RocketMQ Service Mesh

What's Next ?

目前 RocketMQ Filter 具备了普通消息的发送和 Pop 消费能力,但是如果想要具备更加完整的产品形态,功能上还有一些需要补充:

  • 支持 Pull 请求:现在 Envoy Proxy 只接收 Pop 类型的消费请求,之后会考虑支持普通的 Pull 类型,Envoy 会将 Pull 请求转换成 Pop 请求,从而做到让用户无感知。
  • 支持全局顺序消息:目前在 Mesh 体系下,虽然全局顺序消息的路由不存在问题,但是如果多个 Consumer 同时消费全局顺序消息,其中一个消费者突然下线导致消息没有 ACK 而会导致另一个消费者的消息产生乱序,这一点需要在 Envoy 中进行保证。
  • Broker 侧的 Proxy:对 Broker 侧的请求也进行代理和调度。

蜿蜒曲折的社区历程

起初,RocketMQ Filter 的初次 Pull Request 就包含了当前几乎全部的功能,导致了一个超过 8K 行的超大 PR,感谢@天千 在 Code Review 中所做的工作,非常专业,帮助了我们更快地合入社区。

另外,Envoy 社区的 CI 实在太严格了,严格要求 97% 以上的单测行覆盖率,Bazel 源码级依赖,纯静态链接,本身无 cache 编译 24 逻辑核心 CPU 和 load 均打满至少半个小时才能编完,社区的各种 CI 跑完一次则少说两三个小时,多则六七个小时,并对新提交的代码有着极其严苛的语法和 format 要求,这使得在 PR 中修改一小部分代码就可能带来大量的单测变动和 format 需求,不过好的是单测可以很方便地帮助我们发现一些内存 case 。客观上来说,官方社区以这么高的标准来要求 contributors 确实可以很大程度上控制住代码质量,我们在补全单测的过程中,还是发现并解决了不少自身的问题,总得来说还是有一定必要的,毕竟对于 C++ 代码而言,一旦生产环境出问题,调试和追踪起来会困难得多。

最后,RocketMQ Filter 的代码由我和@叔田 共同完成,对于一个没什么开源经验的我来说,为这样的热门社区贡献代码是一次非常宝贵的经历,同时也感谢叔田在此过程中给予的帮助和建议。

相关链接

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
20天前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
77 2
|
2月前
|
消息中间件 人工智能 Apache
Apache RocketMQ 中文社区全新升级!
RocketMQ 中文社区升级发布只是起点,我们将持续优化体验细节,推出更多功能和服务,更重要的是提供更多全面、深度、高质量的内容。
498 11
|
3月前
|
消息中间件 安全 API
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(1)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
262 1
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(1)
|
3月前
|
消息中间件 安全 Apache
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(2)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
226 0
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(2)
|
2月前
|
消息中间件 新零售 弹性计算
云消息队列 RabbitMQ 版入门训练营,解锁对比开源优势与零基础实战
欢迎加入「云消息队列 RabbitMQ 版入门训练营」。
|
2月前
|
消息中间件 安全 API
Apache RocketMQ ACL 2.0 全新升级
RocketMQ 作为一款流行的分布式消息中间件,被广泛应用于各种大型分布式系统和微服务中,承担着异步通信、系统解耦、削峰填谷和消息通知等重要的角色。随着技术的演进和业务规模的扩大,安全相关的挑战日益突出,消息系统的访问控制也变得尤为重要。然而,RocketMQ 现有的 ACL 1.0 版本已经无法满足未来的发展。因此,我们推出了 RocketMQ ACL 2.0 升级版,进一步提升 RocketMQ 数据的安全性。本文将介绍 RocketMQ ACL 2.0 的新特性、工作原理,以及相关的配置和实践。
13545 2
|
3月前
|
消息中间件 RocketMQ
消息队列 MQ产品使用合集之在开源延时消息插件方案中和原生延时消息方案中,同时设置参数是否会出现错乱
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
22天前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
31 1
|
6天前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
22天前
|
消息中间件 分布式计算 Hadoop
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
32 3

推荐镜像

更多