本文将详细介绍RocketMQ中的核心概念(领域模型),包括主题,队列,消息,生产者,消费者和消费者组,订阅关系等核心概念。
1. 模型关系
在详细介绍各个领域模型之前,首先让我们整体来预览下各个模型之间的关系图。
从图中可以看出整体关系是,生产者发送消息到某个Topic中的某个队列中,消费者通过订阅关系订阅指定Topic中消息。
1. 主题(Topic)
主题是RocketMQ中消息传输和存储的的顶层容器,用于标识同一类型业务逻辑的消息。主题只是一个逻辑概念,它并不是一个实际的消息容器。
主题的作用有两个:
- 定义数据的分类隔离:RocketMQ官方建议将不同业务类型的数据拆分到不同的主题中,比如线上商品购物场景下,订单交易如创建订单,支付,取消等订单消息使用同一个主题,物流相关的消息可以使用同一个主题,积分相关的消息可以使用同一个主题
- 定义数据的身份和权限:RocketMQ中的消息本身是匿名无身份的,同一分类的消息使用相同的主题来做身份识别和权限管理。
1.1. 主题的内部属性:
- 主题名称:主题名称用于标识主题,主题名称在集群内部全局唯一。
- 队列列表:队列是主题的组成单元,是消息实际存储的容器, 一个主题下有一个或多个队列。在RocketMQ集群中实际上是通过broker来管理队列列表的。
- 消息类型:RocketMQ在创建主题的时候可以指定主题中存放的消息类型。默认有: Normal (普通消息),FIFO (顺序消息),Delay(定时/延迟消息),Transaction(事务消息)
1.2. 使用建议
由于主题(Topic)非常重要,RocketMQ官方建议在生产环境中不能开启自动创建主题的配置,以免产生大量垃圾主题,无法管理和回收浪费系统资源。
RocketMQ官方推荐在RocketMQ 5.0版本下使用 myadmin命令来创建主题,创建命令是:
./bin/mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=<message_type>
nameserver_address: 是Nameserver集群的地址,比如:172.31.184.89:9876
topic_name:是主题名称
cluster_name:是broker集群的名称
message_type:表示消息类型,可以填入: Normal ,FIFO ,Delay,Transaction,不填默认就是普通消息。
比如下面创建一个名称为 feige_FIFOTopic的存放顺序消息的主题
./bin/mqadmin updateTopic -n 172.31.184.89:9876 -t feige_FIFOTopic -c rocketmq-cluster -a +message.type=FIFO
2. 队列
队列是RocketMQ中消息传输和存储的实际容器,是RocketMQ中消息存储的最小单元。RocketMQ中所有主题都是由多个队列组成。
队列具有天然的顺序性,即按照消息写入的顺序存入队列中。队头存的是最先写入的消息,队尾存的是最近写入的消息。
消息在队列里的顺序和消息之间的顺序通过位点(offset)来进行标记管理,RocketMQ中消息被消费会记录当前已经消费到的消息的offset,下次则从此offset位点继续消费。
RocketMQ支持在任意位点消费任意数量的消息,RocketMQ中消息在队列中被消费之后并不会随即出队列,而是会默认保留48小时,这也保证了在RocketMQ中消息可以被回溯消费,以及进行消息失败重试等操作。
一个Topic的MessageQueue中的消息只能被一个消费者组中的一个消费者消费。一个MessageQueue中的消息不允许同一个消费者组中的多个消费者同时消费。
3. 消息
消息是RocketMQ中最小的数据传输单元,生产者将需要发送的数据包装成消息发送给RocketMQ的服务端。
消息具有两大特性:
- 消息不可变性:消息一旦产生,消息的内容即不可改变,即使经过传输链路的控制也不会发生变化。
- 消息持久化:RocketMQ默认会对消息进行持久化,即消息会被保存到RocketMQ服务端的存储文件中,从而保证了消息的可回溯性和系统故障场景下的可恢复性。
3.1. 消息的内部属性
在消息被发送成功之后我们可以看到如下输出信息:
SendResult [sendStatus=SEND_OK, msgId=0A299C7A551414DAD5DC2C3A61960000, offsetMsgId=AC1FB85900002A9F00000000002D9A88, messageQueue=MessageQueue [topic=SQLFilterTest, brokerName=broker-b, queueId=2], queueOffset=2]
- 主题名称:当前消息所属的主题的名称。集群内全局唯一。
- 消息类型:当前消息的类型
- 消息队列(queueId):实际存储当前消息的队列
- 消息位点(queueOffset): 当前消息存储在队列中的位置。
- 消息ID(msgId):消息的唯一标识,集群内每条消息的ID全局唯一。
- 索引Key列表(可选):消息的索引键,可通过设置不同的key区分消息和快速查找消息
- 过滤标签Tag(可选):消息的过滤标签。消费者可通过Tag对消息进行过滤,仅接收指定标签的消息。
- 定时时间:定时场景下,消息触发延时投递的毫秒级时间戳。
- 消息发送时间:消息发送时,生产者客户端系统的本地毫秒级时间戳。
- 消息保存时间戳:消息在 RocketMQ 服务端完成存储时,服务端系统的本地毫秒级时间戳。 对于定时消息和事务消息,消息保存时间指的是消息生效对消费方可见的服务端系统时间。
- 消费重试次数:息消费失败后,RocketMQ 服务端重新投递的次数。每次重试后,重试次数加1。由服务端系统标记。首次消费,重试次数为0;消费失败首次重试时,重试次数为1。
- 消息负载:业务消息的实际报文数据。
RocketMQ限制消息大小,普通消息限制在4MB以内,事务和定时消息限制在 64KB。
4. 生产者
用来构建并发送消息到RocketMQ服务端的运行实体,一般是集成到业务系统中。
生产者可以发送普通消息,顺序消息,定时消息以及事务消息。
由于创建和销毁生产比较耗费系统资源, 故RocketMQ官方不建议在单一进程中创建大量生产者。
5. 消费者组
RocketMQ系统中承载多个消费行为一致的消费者的负载均衡分组,消费者组是一个逻辑概念。
5.1. 内部属性
- 消费者分组名称:用于区分不同消费者分组,消费者分组名称在集群内全局唯一。
- 投递顺序性:消费者消费消息时, RocketMQ 向消费者客户端投递消息的顺序。
- 消费重试策略:消费者消费消息失败时,系统的重试策略。消费者消费消息失败时,系统会按照重试策略,将指定消息投递给消费者重新消费。
- 订阅关系:当前消费者分组关联的订阅关系集合。包括消费者订阅的主题,以及消息的过滤规则等。
RocketMQ通过消费者分组来实现消费者的管理,同一分组内的消费者共同分摊消息并进行消费,因此,为了保证分组内消息的正常负载和消费。RocketMQ要求同一个消费者分组下所有消费者的消费行为要保持一致。
5.2. 使用建议
- 分组内消费者的投递顺序一致
同一消费者分组下所有消费者的消费投递顺序是相同的,统一都是顺序投递或并发投递,不同业务场景不能混用消费者分组。 - 分组内消费者的业务类型一致
一般消费者分组和主题对应不同业务域对消息消费的要求不同,因此,不同业务域主题的消费建议使用不同的消费者分组,避免一个消费者分组消费超过10个主题。
6. 消费者
消费者是RocketMQ中用来接收并处理消息的运行实体,消费者从RocketMQ服务端获取消息并进行解析。消费者通常被集成到业务系统中。
RocketMQ中提供了推模式的消费者DefaultMQPushConsumer,以及拉模式的消费者
7.订阅关系
RocketMQ系统中消费者获取消息,处理消息的规则和状态配置。
7.1 .订阅关系判断原则
RocketMQ 的订阅关系按照消费者分组和主题粒度设计,因此,一个订阅关系指的是指定某个消费者分组对于某个主题的订阅,判断原则如下:
不同消费者分组对于同一个主题的订阅相互独立如下图所示,消费者分组Group A和消费者分组Group B分别以不同的订阅关系订阅了同一个主题Topic A,这两个订阅关系互相独立,可以各自定义,不受影响。
同一个消费者分组对于不同主题的订阅也相互独立如下图所示,消费者分组Group A订阅了两个主题Topic A和Topic B,对于Group A中的消费者来说,订阅的Topic A为一个订阅关系,订阅的Topic B为另外一个订阅关系,且这两个订阅关系互相独立,可以各自定义,不受影响。
7.2 .内部属性
- 过滤类型消息过滤规则的类型。订阅关系中设置消息过滤规则后,系统将按照过滤规则匹配主题中的消息,只将符合条件的消息投递给消费者消费,实现消息的再次分类。
- TAG过滤:按照Tag字符进行全文过滤匹配。
- SQL92过滤:按照SQL语法对消息属性进行过滤匹配。
- 过滤表达式
- 自定义的过滤规则表达式。
7.3. 使用建议
- 建议不要频繁修改订阅关系。