接上一篇《消息中间件核心实体(0)》,这一篇继续介绍消息中间件中的一些实体。
上一篇主要是Message、Topic、TopicMeta和Queue这样最基础的实体,这几篇介绍一些发送和消费的过程中会涉及到的实体和组件。
1. 发送
1.1 增强Message属性
Message一般只包含topic、tag、content这些属性,这些属性也是使用方在发送时会涉及到的内容。但是光有这些属性往往是不够的,比如我们会需要记录产生这条消息的Producer的信息;记录消息的产生时间和产生的IP信息等等。这些信息都是在Client中给消息附加上去的,对发送方来说是透明的,所以不会在Message实体中暴露,而是我们会增加一个实体:EnhancedMessage。
EnhancedMessage继承自Message,并会增加一些如下的属性:
bornTime
bornAddress
producer
etc
引申一点,Producer发送消息的大致过程如下:
增强Message属性,得到EnhancedMessage的实例
获取可以写入的队列(也可以理解成获取分区)
向队列写入消息(可以是队列暴露写入接口或者由专门的写入工具写入到队列中)
伪代码:
EnhancedMessage msg = enhance(message);
// 根据消息选择一个可以写入的目标队列
WritableQueue queue = router.select(msg);
// 写入消息(queue实现write方法进行写入)
Result result = queue.write(msg);
// write过程
// 将消息序列化成自定义协议的网络包
Packet messagePacket = Serializer.encode(msg);
// 发送网络包
bootstrap.write(messagePacket);
上面的WritableQueue暴露了API去写入,具体实现可以是写入到网络,即远端的一个Partition。而在做单元测试或者本地测试的时候,可以覆盖write的实现,而不用真正写入到网络中,这会使代码更容易测试测试。
上面两幅图是Rocket开源版本中发送相关的一些代码,私以为这段代码非常的不优雅,读起来特别累,特别是requestHeader的各种属性设置。
这段是Rocket开源版本中真正将消息写入到网络的实现,看起来总是非常臃肿,另外不知道是如何mock这些实现以达到在本地做测试的目的的。
1.2 Queue的路由选择
发送过程中会涉及到队列的选择(分区的选择),一条消息最终会根据一定的策略落到一个分区中,这里需要一个组件来完成选择(把这个组件单独抽象出来,这样便于控制写入的目标来进行测试,抽象出来也可以由使用方来实现,这样可以按照使用方自己的场景做特定的路由)。
路由组件非常的简单,一般是Router会根据topic获取到topic的元数据(元数据包含了多有分区的信息),然后根据消息的属性或者用户的参数计算出落到哪个分区,比如可以根据用户的参数对分区总数取模来选择分区,这样可以做到将某一类消息发送到一个分区,比如同一个用户的消息或同一笔订单的不同消息。
这个组件会比较简单,但是在集成的时候需要注意一点,这个组件用户可以自己注入到Producer中来达到控制分区选择策略的目的。
RocketMQ在TopicPublishInfo中实现分区的选择,TopicPublishInfo包含了队列信息(List<MessageQueue> messageQueueList属性),笔者更倾向于抽象出独立的路由组件,以便在特定的场景用户可以自己实现路由,或者在测试时可以做到使用特定路由规则。
2. 消费
消费可以分为多种方式,从获取消息的方式上可以分为Pull和Push两种类型的Consumer;从消费消息的方式上可以分为集群消费和广播消费。这里不展开讨论各种模式的实现(以后单独会讨论Consumer该实现那些内容),会以Push模式&集群消费的Consumer为例,把消费流程中涉及到的一些组件进行介绍。
2.1 分配分区
集群消费中需要保证每个分区有且只有一个Consumer在进行消费。如果某个分区没有Consumer消费,那么使用方拿不到完整的数据;如果某个分区被两个Consumer消费,那么会产生大量的重复消息。所以这里需要实现一个分区分配策略,使在分布式环境中,每个Consumer拿到属于自己的分区,且相互交叉。下面是四个分区两个Consumer默认情况下的分配结果。
实现的策略一般是:
拿到一个Topic所有的分区,对这个列表进行排序
拿到当前所有的Consumer,对Consumer列表进行排序
根据自己所处的Consumer列表的位置和Consumer总数,从分区列表中获取对应的一部分
每个分区和Consumer都有唯一的ID,这样各自按照排序后的结果进行分配,可以达到相互不交叉且不遗漏的目的。(在Consumer总数或分区数发生变化的过程中可能分配结果不正确,这个过程是短暂的,且在消费时还会结合锁去保证分区只有一个Consumer消费,所以不会对实际消费产生影响)。
同样记住一点,这个分配策略是需要暴露出去的,系统可以默认实现集群消费和广播消费的基础策略,用户可以实现自己的分配策略注入到系统中。
2.2 消息缓存
消费端一个重要的组件是消息缓存。为了提升性能,在消费端消息的获取和消息的消费是异步的。Consumer内部有线程专门从服务端获取消息写入到消息缓存中,另外有线程从缓存中获取消息调用用户的回调接口来执行业务操作。
消息缓存除了提供基础的put和take来实现存入消息和取出消息,还需要自身容量,水位控制等配置。
本身Buffer不是很复杂的部分,但是需要考虑一些流控策略,比如Buffer使用率到多少时降低从服务端获取数据的频率。
RocketMQ中实现消息缓存由ProcessQueue实现,笔者倾向于独立出Buffer模块,另外Buffer需要提供锁,以实现顺序消费。
2.3 消费进度
还有一个重要的实体是消费进度,系统需要记录“每个”Consumer的消费进度,且这个数据需要被持久化。
消费进度需要记录某个Group对某个Topic的某个分区的消费位点。进度是按照Topic维度去组织的(持久化在服务端),结构如下:
topic
group0
cursor0、cursor1、cursor2...
group1
...
实现的对象应该是:
class Cursors {
String topic;
Cursor cursor;
class Cursor {
String group;
// 用数组来存储一个group消费的一个topic的所有分区的进度
// 分区数一般情况下不会变更(变更场景很少),用数据就可以
long[] cursors;
}
}
Consumer可以在每一次获取消息时将消费进度提交到服务端,在服务端来更新Cursors内部的数据。
3. 结语
最近两篇内容将一些基础实体和组件简单的介绍了一下,下一篇讨论一下消息应该由Server Push给Consumer还是Consumer主动来Pull消息。
往期文章:
欢迎关注公众号来交流MQ相关问题。