基本概念
Topic:
一类消息的集合,消息发送者将一类消息发送到一个主题中,例如订单模块将订单发送到
order_topic
中,而用户登录时,将登录事件发送到user_login_topic
中。
标签(Tag):
消息设置的标志,用于同一Topic下区分不同类型的消息,可以根据Topic+Tag实现消息的精细化生产和消费。
ConsumeGroup:
消息消费组,一个消费组拥有多个消费者,消费组首先在启动时需要订阅需要消费的Topic。
- 一个Topic可以被多个消费组订阅,同样一个消费组也可以订阅多个主题。
队列(Queue):
一个Topic可以有很多队列,消息都存在Queue上,默认是一个Topic在同一个Broker组中是4个。
- 如果一个Topic现在在2个Broker组中,那么就有可能有8个队列。
由于一个Topic可能会有很多的队列,消息发送到哪个队列上?
RocketMQ提供了两种消息队列的选择算法:
- 轮询算法。
- 最小投递延迟算法。
轮询算法:
- 一个队列一个队列发送消息,这些就能保证消息能够均匀分布在不同的队列下,默认的队列选择算法。
最小投递延迟算法:
- 每次消息投递的时候会统计投递的时间延迟,在选择队列的时候会优先选择投递延迟时间小的队列。
- 这种算法可能会导致消息分布不均匀的问题。
消费模式
集群模式:
同一Topic下的一条消息只会被同一消费组中的一个消费者消费。
- 消息被负载均衡到了同一个消费组的多个消费者实例上。
广播消费:
每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。
- 通常用于刷新内存缓存。
Confirm机制
消息生产者把消息发送给MQ,如果接收成功,MQ会返回一个ack消息给生产者。
如果消息接收不成功,MQ会返回一个nack消息给生产者。
重试机制
集群消费下,重试机制的本质是 RocketMQ 的延迟消息功能。
Broker 端会为每个 Topic 创建一个重试队列 ,队列名称是:
%RETRY% + 消费者组名
,达到重试时间后将消息投递到重试队列中进行消费重试(消费者组会自动订阅重试 Topic)。最多重试消费 16 次,重试的时间间隔逐渐变长,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。
死信队列
死信队列用于处理无法被正常消费的消息。
- 当一条消息初次消费失败,消息队列会自动进行消息重试。
- 达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
RocketMQ将这种正常情况下无法被消费的消息称为死信消息,将存储死信消息的特殊队列称为死信队列。
Queue分配算法
一个Topic中的Queue只能由Consumer Group中的一个Consumer进行消费,而一个Consumer可以同时消费多个Queue中的消息。
那么Queue与Consumer间的配对关系是如何确定的,即Queue要分配给哪个Consumer进行消费?
常见的有四种策略,分别是:
- 平均分配策略、环形平均策略、一致性hash策略、同机房策略。
- 这些策略是通过在创建Consumer时的构造器传进去的。
平均分配策略(默认):
该算法是根据:
avg = QueueCount / ConsumerCount
的计算结果进行分配的。如果能够整除,则按顺序将avg个Queue逐个分配,如果不能整除,则将多余出的Queue按照Consumer顺序逐个分配。
环形分配策略:
环形平均算法是指,根据消费者的顺序,依次由Queue队列组成的环形图逐个分配,该方法不需要提前计算。
一致性哈希分配策略:
该算法会将consumer的hash值作为Node节点存放到hash环上,然后将queue的hash值也放到hash环上。
通过顺时针方向,距离queue最近的那个consumer就是该queue要分配的consumer。
一致性哈希算法可以有效减少由于消费者组扩容或缩容所带来的大量的Rebalance,所以它适合用在Consumer数量变化较频繁的场景。
但是一致性哈希算法也存在不足,就是分配效率较低,容易导致分配不均的情况。
即每个消费者消费的队列数,有可能相差很大,这样就会造成个别消费者压力过大。
- 可以引入虚拟桶,让queue在hash环中尽可能分配均匀。
机房分配策略:
该算法会根据queue的部署机房位置和consumer的位置,过滤出当前consumer相同机房的queue。
然后按照平均分配策略或环形平均策略对同机房queue进行分配。
如果没有同机房queue,则按照平均配策略或环形平均策略对所有queue进行分配。
Rebalance机制
Rebalance即再均衡:
- 指的是将⼀个Topic下的多个Queue在同⼀个Consumer Group中的多个Consumer间进行重新分配的过程。
它能够提升消息的并行消费能力。
哪些场景会触发Rebalance?
消费者所订阅Topic的队列数量发生变化。
比如动态调整了Topic对应的队列数量,那么此时肯定是要重新分配一下,也就是触发Rebalance再均衡。
- 例如⼀个Topic下5个队列,有2个消费者的情况下,那么就可以给其中⼀个消费者分配2个队列,给另⼀个分配3个队列。
- 假设调整到Topic下有7个队列,还是2个消费者的情况下,那么就可以给其中⼀个消费者分配4个队列,给另⼀个分配3个队列。
- 从而提升消息的并行消费能力。
像Broker扩容或缩容、Broker与NameServer间发生网络异常、Queue扩容或缩容等场景都可能导致消费者所订阅Topic的队列数量发生变化。
消费者组中消费者的数量发生变化。
比如动态添加了Consumer进行消费,那么此时肯定是要重新分配一下,也就是触发Rebalance再均衡。
例如,⼀个Topic下5个队列,在只有1个消费者的情况下,这个消费者将负责消费这5个队列的消息。
- 如果此时增加⼀个消费者,那么就可以给其中⼀个消费者分配2个队列,给另⼀个分配3个队列,从而提升消息的并行消费能力。
像Consumer Group扩容或缩容、Consumer与NameServer间发生网络异常、Consumer发生宕机等都会导致消费者组中消费者的数量发生变化。
由于⼀个队列最多分配给⼀个消费者,因此当某个消费者组下的消费者实例数量大于队列的数量时,多余的消费者实例将分配不到任何队列,等于是多余的消费者什么都不做,白白浪费。
Rebalance的危害?
消费暂停:
- 在只有一个Consumer时,其负责消费所有队列。
- 在新增了一个Consumer后会触发 Rebalance的发生。
- 此时原Consumer就需要暂停部分队列的消费,等到这些队列分配给新的Consumer后,这些暂停消费的队列才能继续被消费。
消费重复:
- Consumer在消费新分配给自己的队列时,必须接着之前Consumer 提交的消费进度的offset 继续消费。
- 然而默认情况下,offset是异步提交的,这个异步性导致提交到Broker的offset与Consumer实际消费的消息并不一致。
- 这个不一致的差值就是可能会重复消费的消息。
消费突刺:
- 由于Rebalance可能导致重复消费,如果需要重复消费的消息过多,或者因为Rebalance暂停时间过长从而导致积压了部分消息。
- 那么有可能会导致在Rebalance结束之后瞬间需要消费很多消息。
消息过滤
RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。
消息过滤目前是在Broker端实现的:
- 优点是减少了对于Consumer无用消息的网络传输。
- 缺点是增加了Broker的负担、而且实现相对复杂。
RocketMQ支持两种方式的消息过滤:一种是Tag过滤,另外一种是SQL过滤。
基本组件
Nameserver:
Nameserver集群,Topic的路由注册中心,为客户端根据Topic提供路由服务,从而引导客户端向Broker发送消息。
Nameserver之间的节点不通信,路由信息在Nameserver集群中采取的最终一致性。
Broker:
消息存储服务器,分为两种角色:Master与Slave。
在RocketMQ中,主服务承担读写操作,从服务器作为一个备份,当主服务器存在压力时,从服务器可以承担读服务(消息消费)。
所有Broker,包含Slave服务器每隔30s会向Nameserver发送心跳包,心跳包中会包含存在在Broker上所有的Topic的路由信息。
Client:
消息客户端,包括Producer(消息发送者)和Consumer(消费消费者),客户端在同一时间只会连接一台NameServer,只有在连接出现异常时才会尝试连接另外一台,客户端每隔30s向NameServer发起Topic的路由信息查询。
集群模式
集群模式中,Broker 分为 Master 与 Slave,一个 Master 可对应多个 Slave,一个 Slave 只能对应一个 Master。
每个 Broker 与 Name Server 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 Name Server。
Master 节点负责接收客户端的写入请求,并将消息持久化到磁盘上。
Slave 节点则负责从 Master 节点复制消息数据,并保持与 Master 节点的同步。
同步复制
生产者发送消息后,Master 接收到存储消息请求,将消息数据同步给 Slave 后,才将存储结果返回给生产者。
同步复制模式下,发送消息会有一定延迟,系统吞吐量也会降低。
异步复制
生产者发送消息后,Master 接收到存储消息请求,将消息存储后,直接将存储结果返回给生产者。
Master 和 Slave 再通过异步的方式同步数据,这种复制模式具有较小的延迟,可以实现比较高的吞吐量。
若 Master 出现故障,有些数据可能未写入 Slave,未同步的数据可能丢失。
消息类型
延迟消息
RocketMQ支持18个等级,每个等级代表一个延迟时间。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
原理解析:
Brocker启动时,根据不同等级开启定时器,即分别对每个延迟等级都开启一个定时器。
- RocketMQ是基于
java.util.Timer
的定时器。
事务消息
支持在分布式场景下保障消息生产和本地事务的最终一致性。
1、生产者将消息发送至 Broker。
2、Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
3、生产者开始执行本地事务逻辑。
4、生产者根据本地事务执行结果向服务端提交二次确认结果(Commit 或是 Rollback),Broker 收到确认结果后处理逻辑如下:
- 二次确认结果为 Commit :Broker 将半事务消息标记为可投递,并投递给消费者。
- 二次确认结果为 Rollback :Broker 将回滚事务,不会将半事务消息投递给消费者。
5、在断网或者是生产者应用重启的特殊情况下,若 Broker 未收到发送者提交的二次确认结果,或 Broker 收到的二次确认结果为 Unknown 未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
- 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
为什么要先发送Half Message(半消息):
可以先确认Broker服务器是否正常,如果半消息都发送失败了,那说明Broker挂了。
什么情况会回查:
执行本地事务的时候,由于突然网络等原因一直没有返回执行事务的结果(Commit或者Rollback)导致最终返回UNKNOW,就会回查。
本地事务执行成功后,返回Commit进行消息二次确认的时候的服务挂了。
- 这个时候在Broker端,它还是个Half Message(半消息),这也会回查。
基本原理
总体架构图
零拷贝
零拷贝技术是一个思想,指的是指计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域。
- RocketMQ内部使用基于mmap实现的零拷贝,用来读写文件。
mmap():
mmap(memory map)是一种内存映射文件的方法。
- 即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。
简单地说就是内核缓冲区和应用缓冲区共享,从而减少了从读缓冲区到用户缓冲区的一次CPU拷贝。
消息存储
CommitLog:
CommitLog
,消息存储文件,所有主题的消息都存储在CommitLog
文件中。业务系统向
RocketMQ
发送一条消息,最终这条消息会被持久化到CommitLog
文件。一台
Broker服务器
只有一个CommitLog
文件(组),RocketMQ
会将所有主题的消息存储在同一个文件中,这个文件中就存储着一条条Message,每条Message都会按照顺序写入。
ConsumeQueue:
它是为了高效检索主题消息的。
通过
ConsumerQueue
可以知道消息的长度和偏移量,那么查找消息就比较容易了。消息偏移量的差值等于 =
消息长度 * 队列长度
。
Index:
除了通过消息偏移量来查找消息的方式,还提供了其他几种方式可以查询消息:
- 通过Message Key查询
- 通过Unique Key查询
- 通过Message Id查询
Message Key和Unique Key
都是在消息发送之前,由客户端生成的。可以自己设置,也可以由客户端自动生成,
Message Id
是在Broker
端存储消息的时候生成。
消费方式
在RocketMQ里消费方式虽有PUSH与PULL两种,但实现机制实为 PULL 模式,PUSH 模式是一种伪推送,是对 PULL 模式的封装,每拉去一批消息后,提交到消费端的线程池(异步),然后马上向 Broker 拉取消息,即实现类似 推 的效果。
拉取式消费(Pull Consumer):
Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。
一旦获取了批量消息,应用就会启动消费过程。
- Pull指的是客户端主动向服务端请求,拉取数据。
由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息。
采用Pull方式,如何设置Pull消息的频率需要重点去考虑:
举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(消息延迟与忙等待)。
如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大。
若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能。
推动式消费(Push Consumer):
该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
Push指的是客户端与服务端建立好网络长连接,当服务端有数据时立即通过连接将数据推送给客户端。
由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者。
采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。
但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久:慢消费问题),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常。
长轮询:
在PULL模式下为了保证消费的实时性,采起了长轮询消息服务器拉取消息的方式,每隔一定时间客户端向服务端发起一次请求。
如果有数据则取回进行消费,如果服务端没数据客户端线程会阻塞,阻塞时间为15S,有数据了就会被唤醒。
长轮询还是由Consumer发起的,因此就算Broker端有大量数据也不会主动推送给Consumer。
RocketMQ 中 PULL 模式和 PUSH 模式的区别:
PULL 模式是从 Broker 拉取消息后放入缓存,然后消费端不停地从缓存取出消息来执行客户端定义的处理逻辑,而 PUSH 模式是在死循环中不停的从 Broker 拉取消息,拉取到后调用回调函数进行处理,回调函数中调用客户端定义的处理逻辑。
- 消费者订阅主题,然后自动进行集群内消息队列的动态负载,自动拉取消息,准实时。
PUSH 模式拉取消息依赖死循环来不停唤起业务,而 PULL 模式拉取消息是通过 MessageQueue 监听器来触发消息拉取线程,消息拉取线程会在拉取完一次后接着下一次拉取。
- 消费者无需订阅主题,由业务方(应用程序)直接根据MessageQueue拉取消息。
应用场景
消息重试(消费者)
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。
Consumer消费消息失败有以下几种情况:
- 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理。
- 由于依赖的下游应用服务不可用,例如DB连接不可用,外系统网络不可达等。
RocketMQ会为每个消费组都设置一个Topic名称为
%RETRY%+consumerGroup
的重试队列:
- 这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的。
- 用于暂时保存因为各种异常而导致Consumer端无法消费的消息。
考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别:
- 每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。
RocketMQ对于重试消息的处理:
- 先保存至Topic名称为
SCHEDULE_TOPIC_XXXX
的延迟队列中。- 后台定时任务按照对应的时间进行Delay后重新保存至
%RETRY%+consumerGroup
的重试队列中。
消息重投(生产者)
生产者在发送消息时,同步消息失败会重投,异步消息有重试,
oneway
没有任何保证。
- 消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。
消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。
- 生产者主动重发Consumer负载变化也会导致重复消息。
如下方法可以设置消息重试策略:
- retryTimesWhenSendFailed:
- 同步发送失败重投次数,默认为2,因此生产者会最多尝试发送
retryTimesWhenSendFailed + 1
次。- 不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。
- 超过重投次数,抛出异常,由客户端保证消息不丢。
- 当出现
RemotingException、MQClientException
和部分MQBrokerException
时会重投。
- retryTimesWhenSendAsyncFailed:
- 异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。
- retryAnotherBrokerWhenNotStoreOK:
- 消息刷盘(主或备)超时或slave不可用(返回状态非
SEND_OK
),是否尝试发送到其他broker,默认false。
顺序消费
顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
分区顺序消息
对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。
同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
适用场景:
- 适用于性能要求高,以 Sharding Key 作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。
示例:
- 电商的订单创建,以订单 ID 作为 Sharding Key ,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
全局顺序消息
对于指定的一个 Topic ,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
适用场景:
- 适用于性能要求不高,所有的消息严格按照 FIFO 原则来发布和消费的场景。
示例:
- 在证券处理中,以人民币兑换美元为 Topic,在价格相同的情况下,先出价者优先处理,则可以按照 FIFO 的方式发布和消费全局顺序消息。