消息中间件核心实体(0)

本文涉及的产品
云原生网关 MSE Higress,422元/月
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 消息中间件核心实体(0) 最近两周在做的一个新项目,一个主从复制的组件,这两天刚跑通测试。 从之前讨论的架构来说,消息中间件也是有主从复制这个模块的,像Rocket就支持主从模式。 在做这个项目之前已经写过两个版本的主从复制模块,基本思路是: Slave主动和Master建立链接 Sla...

消息中间件核心实体(0)

最近两周在做的一个新项目,一个主从复制的组件,这两天刚跑通测试。

从之前讨论的架构来说,消息中间件也是有主从复制这个模块的,像Rocket就支持主从模式。

在做这个项目之前已经写过两个版本的主从复制模块,基本思路是:

  1. Slave主动和Master建立链接
  2. Slave从Master不断Pull数据
  3. 并ack进度给Master
  4. Master根据Slave的进度来支持异步复制、半同步复制的语义
  5. Slave上有replay线程根据复制数据恢复上层状态

也可以采用Master向Slave push数据的方式(如果自己做主从复制,一定要去了解MySQL的主从复制实现)。

已经做了两个版本的主从复制了,为什么最近又会起新项目去做这个事情呢?因为我们意识到主从复制其实是一个相对独立的模块,和上层的消息业务并不相关。比如DB、或者持久化KV存储去做高可用方案的话可能都会涉及到主从复制这样一个模块。

所以我们想说能不能把主从模块从消息中间件中剥离出来,写成一个相对独立的模块。从确定这么做到完成第一个可以run的版本,花了两周时间,其中有8、9天在进行设计和领域建模(核心实体的定义),编码也就4、5天的样子。这也是这个版本和之前版本最大的区别,我们花了大量的时间去抽象实体,最后在编码上反而会简单很多;而之的版本抽象层次太低,有太多过程化的编码,虽然能run也没什么问题,但总是不够“优雅”。

说了这么多其实是想说,定义好实体基本上可以说完成项目编码的百分之三四十了。好的实体定义(领域模型)会让之后系统的实现变得简单。

废话说了这么多,接着谈一谈消息中间件中一些重要的实体和组件。

消息

Message

消息实体是消息中间件中最重要的对象了,关乎到用户能写入什么、消费什么,关乎到索引结构的设计。

一条消息最基础的属性有:

  • topic
  • content\body

topic表示这条消息属于哪个主题,这样最终Consumer可以通过订阅这个topic来消费这条消息。content或body,消息内容或消息体(RocketMQ是body,我们习惯叫content),它是一个byte数组。因为作为消息中间件我们只会去存储数据,数据的编解码是有用户自己决定的。

出去最最基础的这两个属性,在使用消息中间件时往往会有过滤的需求。比如可能交易业务会将所有的订单发送到一个topic,这时下游的业务方需要关注自己的业务,可能一些是需要处理虚拟商品的业务,一些是需要处理特普通商品的,这样就需要每个业务方能过滤出自己需要的消息。

所以Message往往会有一个tag属性:

  • topic
  • tag
  • content\body

用于做消息过滤。tag属性是一个String类型的,每条消息可以有一个tag,我们称为打标,Consumer在订阅消息的时候可以指定自己需要的一批tag。

RocketMQ中(开源版本)也这样去实现了,但是它将消息所有的属性放入到一个Map中:

  • properties:Map<String,String>

(不知道RocketMQ有没有支持多tag的版本,我们遇到过希望消息有多个tag的情况。这也是个挺正常的需求,比如可以从不同维护划分消息,比如支付类型+商品类型等,过滤的时候是一个and的逻辑,这个可以作为一个功能提升的考虑)

RocketMQ的Message properties中还有key、delaylevel、waitstore等属性,分别用于查询消息、设置延迟投递、是否等待刷盘等。

以上是暴露给用户的Message对象的基础属性,也决定了用户能执行的操作无非是配置上面一些内容。

MessageExt

Message是基础的消息,对于系统内部发送和消费时这些属性是不够的,所以内部回去拓展一个MessageExt,包含额外的一些属性:

  • id:消息的ID,可以考虑能否用一个long类型来做成全局唯一的,这样可以基于它做幂等之类的操作
  • queueId:目标的队列ID(这条消息最终落到哪个分区中——分区即队列,每个分区都是一个先进先出的队列)
  • bornTime:消息的产生时间
  • bornAddress:消息的产生地址
  • storeTime:消息的存储时间
  • crc:crc校验
  • ...

主题

Topic

主题相关的,最基础的实体是Topic,它描述了主题最基础的属性,比如名称、负责人等。

  • name
  • owner

owner信息可以是topic所属的团队或责任人等,主要用户在发生异常或其他需要人工反馈的场景下,能找到对应的人或者发送告警。

RocketMQ中对应的是TopicConfig实体,描述了主题最基础的属性:

  • topicName
  • readQueueNums
  • writeQueueNums
  • perm:读写模式
  • topicFilterType:过滤类型(只支持单tag,尚不支持多tag)
  • topicSysFlag:系统属性
  • order:是否顺序?

其中一些属性并没有很理解,比如readQueueNums和writeQueueNums,一个topic不应该有多少读的队列就有多少写的队列吗?(没有实践中使用RocketMQ的经验,还望了解的同学指教)

Topic元数据

和主题相关的最重要的实体应该是队列的分布情况,即一个Topic包含了哪些队列,把这个元数据暂且成为TopicMeta。

一个TopicMeta对象需要有队列的部分情况,这样,

  • 在发送时,根据消息的topic属性,获取到TopicMeta再从中获取队列信息,然后写入到特定的队列中
  • 在消费时,获取队列信息,然后从每个队列中获取数据

在第一次考虑这个实体的时候,它大概是这个样子的:

  • Topic topic:包含一个Topic实体,表明基础信息
  • int queueNums:包含的队列数
  • List broker:分布的Broker节点

这个结构能满足需求。

Producer拿到TopicMeta后,根据brokers.size * queueNums得到总分区数,每次发送消息时根据一定的路由策略选择一个分区(队列)作为目标分区进行写入。

Consumer拿到TopicMeta后,知道所有的broker,知道分区数,这样就知道了所有的分区情况(每个Broker上同一个Topic拥有相同数据的分区,编号为[0, queueNums-1]),能建立所有分区的消费关系。

但是在不断的实践中,发现这种模式并不是一种很好的抽象:

  1. 在对Topic进行扩容和缩容的时候,只能以Broker为单位,即每次扩容或缩容的分区数都是queueNums的倍数
  2. 隐含了一层关系,即客户端知道总分区数的计算规则和分区的分布规则

对TopicMeta的抽象应该是真实的描述Topic的队列的分布情况,所以TopicMeta应该包含所有的队列的分布情况,应该包含一个Set或List集合,里面包含了所有的队列。

TopicMeta

  • Topic topic
  • Set/List queues:队列信息(Queue描述了自身的信息)

Kafka的实现中是Topic信息包含了所有队列的信息,使用了一个Map去存储,Key是一个Integer,应该是Partition的ID。

队列

Queue是消息聚合的最小单位,一个Queue应该反映出自身所处的物理地址,这样可以进行写入和消费,另外应该包含一些状态来描述是否可读可写。另外应该有它的备份信息(高可用是每个部分都应该考虑的),即这个队列的备份队列分布等。

Kafka中这个对象叫TopicPartitionInfo,包含属性如下:

  • int partition
  • Node leader
  • List<Node> replicas
  • List<Node> isr

这个实体定义相对来说是比较好的,描述了这个队列当前的Leader,它的备份,也就是每个队列都是可以进行主备切换的(回想一下,Kafka中每个Broker相互备份Partition的,而不是Broker之间的主从备份)。在客户端也不会隐含什么规则,而是直接根据路由策略来使用分区(队列)。

小结

消息中间件模型中远远不止上面这一些实体,但是不希望篇幅太长(看起来太累),所以打算拆开成几篇。

这篇主要是基础的实体,下一篇会写和核心流程相关的一些实体,主要会是路由、数据读取等。

如果本文对您有帮助,点一下右下角的“推荐”
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 缓存 测试技术
消息中间件核心实体(1)
接上一篇《消息中间件核心实体(0)》,这一篇继续介绍消息中间件中的一些实体。 上一篇主要是Message、Topic、TopicMeta和Queue这样最基础的实体,这几篇介绍一些发送和消费的过程中会涉及到的实体和组件。
1090 6
|
7月前
|
消息中间件 存储 负载均衡
消息中间件的选择:RabbitMQ是一个明智的选择
消息中间件的选择:RabbitMQ是一个明智的选择
120 0
|
6月前
|
消息中间件 存储 中间件
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
1700 0
|
5月前
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
285 3
|
2月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
124 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
4月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
4月前
|
消息中间件 Docker 容器
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
这篇文章提供了RabbitMQ的安装和基本使用教程,包括如何使用Docker拉取RabbitMQ镜像、创建容器、通过浏览器访问管理界面,以及如何创建交换机、队列、绑定和使用direct、fanout和topic三种类型的交换器进行消息发布和接收的测试。
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
|
4月前
|
消息中间件 存储 网络协议
消息中间件RabbitMQ---概述和概念 【一】
该文章提供了对消息中间件RabbitMQ的全面概述,包括其核心概念、工作原理以及与AMQP和JMS的关系。
消息中间件RabbitMQ---概述和概念 【一】
|
5月前
|
消息中间件 监控 负载均衡
中间件RabbitMQ性能瓶颈
【7月更文挑战第13天】
330 11
|
5月前
|
消息中间件 NoSQL Kafka
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别