一、方案背景
RocketMQ(以下简称MQ)作为消息中间件在事务管理,异步解耦,削峰填谷,数据同步等应用场景中有着广泛使用。当业务系统进行灰度发布时,Dubbo与HTTP的调用可以基于业界通用的灰度方式在我们的微服务治理与网关平台来实现,但MQ已有的灰度方案都不能完全解决消息的隔离与切换衔接问题,为此,我们在鲁班MQ平台(包含根因分析、资源管理、订阅关系校验、延时优化等等的扩展)增加了MQ灰度功能的扩展实现。
二、RocketMQ技术特点
为什么MQ的灰度方案迟迟没有实现呢?我们先来回顾一下RocketMQ的几个核心技术点。
2.1 存储模型的简述
(图2.1 MQ的存储模型)
CommitLog:消息体实际存储的地方,当我们发送的任一业务消息的时候,它最终会存储在commitLog上。MQ在Broker进行集群部署(这里为也简洁,不涉及主从部分)时,同一业务消息只会落到集群的某一个Broker节点上。而这个Broker上的commitLog就会存储所有Topic路由到它的消息,当消息数据量到达1个G后会重新生成一个新的commitLog。
Topic:消息主题,表示一类消息的逻辑集合。每条消息只属于一个Topic,Topic中包含多条消息,是MQ进行消息发送订阅的基本单位。属于一级消息类型,偏重于业务逻辑设计。
Tag:消息标签,二级消息类型,每一个具体的消息都可以选择性地附带一个Tag,用于区分同一个Topic中的消息类型,例如订单Topic, 可以使用Tag=tel来区分手机订单,使用Tag=iot来表示智能设备。在生产者发送消息时,可以给这个消息指定一个具体的Tag, 在消费方可以从Broker中订阅获取感兴趣的Tag,而不是全部消息(注:严谨的拉取过程,并不全是在Broker端过滤,也有可能部分在消费方过滤,在这里不展开描述)。
Queue:实际上Topic更像是一个逻辑概念供我们使用,在源码层级看,Topic以Queue的形式分布在多个Broker上,一个topic往往包含多条Queue(注:全局顺序消息的Topic只有一条Queue,所以才能保证全局的顺序性),Queue与commitLog存在映射关系。可以理解为消息的索引,且只有通过指定Topic的具体某个Queue,才能找到消息。(注:熟悉kafka的同学可以类比partition)。
消费组及其ID:表示一类Producer或Consumer,这类Producer或Consumer通常生产或消费同应用域的消息,且消息生产与消费的逻辑一致。每个消费组可以定义全局维一的GroupID来标识,由它来代表消费组。不同的消费组在消费时互相隔离,不会影响彼此的消费位点计算。
2.2 消息发送与消费
(图2.2 消息发送与拉取模型)
2.2.1 客户端标识
在生产者或消费者集群中,每一个MQ客户端的运行实例,在MQ的客户端会保证产生唯一的ClientID。注:同一应用实例中,既充当生产者,也充当消费者时,其ClientID实际上是同一个取值。
2.2.2 消息发送
当向某个Topic发送消息的时候,会首先获得这个Topic的元数据,元数据会包括它有多少个Queue,每个Queue属于哪个Broker。默认的情况下,发送方会选择一条Queue发送当前消息,算法类型是轮询,也就是下一条消息会选择另一条Queue进行发送。另外MQ也提供了指定某条Queue或者自定义选择Queue的方法进行消息的发送,自定义选择Queue需实现MessageQueueSelector接口。
2.2.3 消息消费
进行消息的消费时,同一消费组(GroupID)的消费者会订阅Topic,消费者首先获得Topic的元数据,也就是会获得这个Topic的所有Queue信息。然后这些Queue按规则分配给各个具体的客户端(ClientID),各个客户端根据分配到的Queue计算对应的需要拉取消息的offset并生成PullRequest,拉取消息并消费完成后,客户端会生成ACK并更新消费进度。
这里的消费进度是该批消息未消费成功的最小offset,如图2.3所示,一批消息中如果1、5未消费,其余的消息已消费,此时更新的offset仍是1,消费者如果宕机重启,会从1号开始消费消息,此时2、3、4号消息将会重复消费。
(图2.3 消费进度更新示意图)
因此RocketMQ只保证了消息不会丢失,无法保证消息不会重复消费,消息的幂等性需要业务自己实现。
另外,消费方可以指定消费某些Tag的消息,在pullRequest进行拉取时,会在Broker里会按照存储模型的Queue索引信息按hash值进行快速过滤,返回到消费方,消费方也会对返回的消息进行精准的过滤。
2.3 订阅关系一致性
在消费端,同一消费组内(GroupID相同,本节所描述的前提都是同一消费组内)的各个应用实例MQ客户端需要保持订阅关系的一致性,所谓订阅关系的一致性就是同一消费组内的所有客户端所订阅的Topic和Tag必须完全一致,如果组内订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。
2.3.1 订阅关系的维护
每一个应用实例MQ客户端都有独立的ClientID,我们简单地讲解一下订阅关系的维护:
- 各个MQ消费客户端会带着它的ClientID向所有Broker发送心跳包,心跳包中含有具体的本客户端订阅的Topic & Tag等信息,registerConsumer方法会按照消费组将客户端分组存储,同一消费组内的ClientID信息在同一个ConsumerGroupInfo中;
- Broker在接收到任何消费客户端发过来的心跳包后,会在ConsumerManager类中的ConcurrentMapconsumerTable按照消费组名称GroupID作为key存储不同的消费组信息,同一消费组的订阅信息会在每一次收到心跳包后,都会按当前的订阅Topic & Tag信息进行更新维护,也就是相当于只保存最新的心跳包订阅信息(心跳包中的subVersion会标记心跳包版本,当重平衡结果发生改变后,subVersion会更新,Broker只会保存最新版本的心跳包中的订阅信息),不管这个心跳包来源于这个消费组的哪个ClientID。(由于Tag是依赖于Topic的属性,Topic和Tag订阅关系不一致时Broker对应的处理结果也略有不同,具体可见updateSubscription方法)。
2.3.2 订阅不一致影响
在这里使用例子的方式来阐述订阅关系不一致带来的部分问题。假设同一组内的消费者clientA订阅了TOPIC_A,clientB订阅了TOPIC_B;TOPIC_A、TOPIC_B均为4条Queue,在进行消费分配时,最终Queue的分配结果如下:
(表2.1 消息费者的Queue分配结果表)
因为clientB没有订阅TOPIC_A,clientA也没有订阅TOPIC_B,所以TOPIA_A中的Queue-2、Queue-3,TOPIC_B中的Queue-0、Queue-1中的消息则无法正常消费。而且在实际过程中,clientA也没法正常消费TOPIC_B的消息,这时会在客户端看到很多异常,有些Queue的消息得不到消费产生堆积。
此外,在订阅关系中,不同client所订阅的Tag不一样时,也会发生拉取的消息与所需要订阅的消息不匹配的问题。