消息中间件核心实体(0)
最近两周在做的一个新项目,一个主从复制的组件,这两天刚跑通测试。
从之前讨论的架构来说,消息中间件也是有主从复制这个模块的,像Rocket就支持主从模式。
在做这个项目之前已经写过两个版本的主从复制模块,基本思路是:
- Slave主动和Master建立链接
- Slave从Master不断Pull数据
- 并ack进度给Master
- Master根据Slave的进度来支持异步复制、半同步复制的语义
- Slave上有replay线程根据复制数据恢复上层状态
也可以采用Master向Slave push数据的方式(如果自己做主从复制,一定要去了解MySQL的主从复制实现)。
已经做了两个版本的主从复制了,为什么最近又会起新项目去做这个事情呢?因为我们意识到主从复制其实是一个相对独立的模块,和上层的消息业务并不相关。比如DB、或者持久化KV存储去做高可用方案的话可能都会涉及到主从复制这样一个模块。
所以我们想说能不能把主从模块从消息中间件中剥离出来,写成一个相对独立的模块。从确定这么做到完成第一个可以run的版本,花了两周时间,其中有8、9天在进行设计和领域建模(核心实体的定义),编码也就4、5天的样子。这也是这个版本和之前版本最大的区别,我们花了大量的时间去抽象实体,最后在编码上反而会简单很多;而之的版本抽象层次太低,有太多过程化的编码,虽然能run也没什么问题,但总是不够“优雅”。
说了这么多其实是想说,定义好实体基本上可以说完成项目编码的百分之三四十了。好的实体定义(领域模型)会让之后系统的实现变得简单。
废话说了这么多,接着谈一谈消息中间件中一些重要的实体和组件。
消息
Message
消息实体是消息中间件中最重要的对象了,关乎到用户能写入什么、消费什么,关乎到索引结构的设计。
一条消息最基础的属性有:
- topic
- content\body
topic表示这条消息属于哪个主题,这样最终Consumer可以通过订阅这个topic来消费这条消息。content或body,消息内容或消息体(RocketMQ是body,我们习惯叫content),它是一个byte数组。因为作为消息中间件我们只会去存储数据,数据的编解码是有用户自己决定的。
出去最最基础的这两个属性,在使用消息中间件时往往会有过滤的需求。比如可能交易业务会将所有的订单发送到一个topic,这时下游的业务方需要关注自己的业务,可能一些是需要处理虚拟商品的业务,一些是需要处理特普通商品的,这样就需要每个业务方能过滤出自己需要的消息。
所以Message往往会有一个tag属性:
- topic
- tag
- content\body
用于做消息过滤。tag属性是一个String类型的,每条消息可以有一个tag,我们称为打标,Consumer在订阅消息的时候可以指定自己需要的一批tag。
RocketMQ中(开源版本)也这样去实现了,但是它将消息所有的属性放入到一个Map中:
- properties:Map<String,String>
(不知道RocketMQ有没有支持多tag的版本,我们遇到过希望消息有多个tag的情况。这也是个挺正常的需求,比如可以从不同维护划分消息,比如支付类型+商品类型等,过滤的时候是一个and的逻辑,这个可以作为一个功能提升的考虑)
RocketMQ的Message properties中还有key、delaylevel、waitstore等属性,分别用于查询消息、设置延迟投递、是否等待刷盘等。
以上是暴露给用户的Message对象的基础属性,也决定了用户能执行的操作无非是配置上面一些内容。
MessageExt
Message是基础的消息,对于系统内部发送和消费时这些属性是不够的,所以内部回去拓展一个MessageExt,包含额外的一些属性:
- id:消息的ID,可以考虑能否用一个long类型来做成全局唯一的,这样可以基于它做幂等之类的操作
- queueId:目标的队列ID(这条消息最终落到哪个分区中——分区即队列,每个分区都是一个先进先出的队列)
- bornTime:消息的产生时间
- bornAddress:消息的产生地址
- storeTime:消息的存储时间
- crc:crc校验
- ...
主题
Topic
主题相关的,最基础的实体是Topic,它描述了主题最基础的属性,比如名称、负责人等。
- name
- owner
owner信息可以是topic所属的团队或责任人等,主要用户在发生异常或其他需要人工反馈的场景下,能找到对应的人或者发送告警。
RocketMQ中对应的是TopicConfig实体,描述了主题最基础的属性:
- topicName
- readQueueNums
- writeQueueNums
- perm:读写模式
- topicFilterType:过滤类型(只支持单tag,尚不支持多tag)
- topicSysFlag:系统属性
- order:是否顺序?
其中一些属性并没有很理解,比如readQueueNums和writeQueueNums,一个topic不应该有多少读的队列就有多少写的队列吗?(没有实践中使用RocketMQ的经验,还望了解的同学指教)
Topic元数据
和主题相关的最重要的实体应该是队列的分布情况,即一个Topic包含了哪些队列,把这个元数据暂且成为TopicMeta。
一个TopicMeta对象需要有队列的部分情况,这样,
- 在发送时,根据消息的topic属性,获取到TopicMeta再从中获取队列信息,然后写入到特定的队列中
- 在消费时,获取队列信息,然后从每个队列中获取数据
在第一次考虑这个实体的时候,它大概是这个样子的:
- Topic topic:包含一个Topic实体,表明基础信息
- int queueNums:包含的队列数
- List broker:分布的Broker节点
这个结构能满足需求。
Producer拿到TopicMeta后,根据brokers.size * queueNums得到总分区数,每次发送消息时根据一定的路由策略选择一个分区(队列)作为目标分区进行写入。
Consumer拿到TopicMeta后,知道所有的broker,知道分区数,这样就知道了所有的分区情况(每个Broker上同一个Topic拥有相同数据的分区,编号为[0, queueNums-1]),能建立所有分区的消费关系。
但是在不断的实践中,发现这种模式并不是一种很好的抽象:
- 在对Topic进行扩容和缩容的时候,只能以Broker为单位,即每次扩容或缩容的分区数都是queueNums的倍数
- 隐含了一层关系,即客户端知道总分区数的计算规则和分区的分布规则
对TopicMeta的抽象应该是真实的描述Topic的队列的分布情况,所以TopicMeta应该包含所有的队列的分布情况,应该包含一个Set或List集合,里面包含了所有的队列。
TopicMeta
- Topic topic
- Set/List queues:队列信息(Queue描述了自身的信息)
Kafka的实现中是Topic信息包含了所有队列的信息,使用了一个Map去存储,Key是一个Integer,应该是Partition的ID。
队列
Queue是消息聚合的最小单位,一个Queue应该反映出自身所处的物理地址,这样可以进行写入和消费,另外应该包含一些状态来描述是否可读可写。另外应该有它的备份信息(高可用是每个部分都应该考虑的),即这个队列的备份队列分布等。
Kafka中这个对象叫TopicPartitionInfo,包含属性如下:
- int partition
- Node leader
- List<Node> replicas
- List<Node> isr
这个实体定义相对来说是比较好的,描述了这个队列当前的Leader,它的备份,也就是每个队列都是可以进行主备切换的(回想一下,Kafka中每个Broker相互备份Partition的,而不是Broker之间的主从备份)。在客户端也不会隐含什么规则,而是直接根据路由策略来使用分区(队列)。
小结
消息中间件模型中远远不止上面这一些实体,但是不希望篇幅太长(看起来太累),所以打算拆开成几篇。
这篇主要是基础的实体,下一篇会写和核心流程相关的一些实体,主要会是路由、数据读取等。
如果本文对您有帮助,点一下右下角的“推荐”