高可用
在说高可用之前,先来完善一下前面的一些概念。
在前面介绍概念的时候也说过,一个RokcetMQ中可以有很多个Broker实例,相同的BrokerName称为一个组,同一个Broker组下每个Broker实例保存的消息是一样的,不同的Broker组保存的消息是不一样的。
如图所示,两个BrokerA实例组成了一个Broker组,两个BrokerB实例也组成了一个Broker组。
前面说过,每个Broker实例都有一个CommitLog文件来存储消息的。那么两个BrokerA实例他们CommitLog文件存储的消息是一样的,两个BrokerB实例他们CommitLog文件存储的消息也是一样的。
那么BrokerA和BrokerB存的消息不一样是什么意思呢?
其实很容易理解,假设现在有个topicA存在BrokerA和BrokerB上,那么topicA在BrokerA和BrokerB默认都会有4个队列。
前面在说发消息的时候需要选择一个队列进行消息的发送,假设第一次选择了BrokerA上的队列发送消息,那么此时这条消息就存在BrokerA上,假设第二次选择了BrokerB上的队列发送消息,那么那么此时这条消息就存在BrokerB上,所以说BrokerA和BrokerB存的消息是不一样的。
那么为什么同一个Broker组内的Broker存储的消息是一样的呢?其实比较容易猜到,就是为了保证Broker的高可用,这样就算Broker组中的某个Broker挂了,这个Broker组依然可以对外提供服务。
那么如何实现同Broker组的Broker存的消息数据相同的呢?这就不得不提到Broker的高可用模式。
RocketMQ提供了两种Broker的高可用模式
- 主从同步模式
- Dledger模式
主从同步模式
在主从同步模式下,在启动的时候需要在配置文件中指定BrokerId,在同一个Broker组中,BrokerId为0的是主节点(master),其余为从节点(slave)。
当生产者将消息写入到主节点是,主节点会将消息内容同步到从节点机器上,这样一旦主节点宕机,从节点机器依然可以提供服务。
主从同步主要同步两部分数据
- topic等数据
- 消息
topic等数据是从节点每隔10s钟主动去主节点拉取,然后更新本身缓存的数据。
消息是主节点主动推送到从节点的。当主节点收到消息之后,会将消息通过两者之间建立的网络连接发送出去,从节点接收到消息之后,写到CommitLog即可。
从节点有两种方式知道主节点所在服务器的地址,第一种就是在配置文件指定;第二种就是从节点在注册到NameServer的时候会返回主节点的地址。
主从同步模式有一个比较严重的问题就是如果集群中的主节点挂了,这时需要人为进行干预,手动进行重启或者切换操作,而非集群自己从从节点中选择一个节点升级为主节点。
为了解决上述的问题,所以RocketMQ在4.5.0就引入了Dledger模式。
Dledger模式
在Dledger模式下的集群会基于Raft协议选出一个节点作为leader节点,当leader节点挂了后,会从follower中自动选出一个节点升级成为leader节点。所以Dledger模式解决了主从模式下无法自动选择主节点的问题。
在Dledger集群中,leader节点负责写入消息,当消息写入leader节点之后,leader会将消息同步到follower节点,当集群中过半数(节点数/2 +1)节点都成功写入了消息,这条消息才算真正写成功。
至于选举的细节,这里就不多说了,有兴趣的可以自行谷歌,还是挺有意思的。
消息消费
终于,在生产者成功发送消息到Broker,Broker在成功存储消息之后,消费者要消费消息了。
消费者在启动的时候会从NameSrever拉取消费者订阅的topic的路由信息,这样就知道订阅的topic有哪些queue,以及queue所在Broker的地址信息。
为什么消费者需要知道topic对应的哪些queue呢?
其实主要是因为消费者在消费消息的时候是以队列为消费单元的,消费者需要告诉Broker拉取的是哪个队列的消息,至于如何拉到消息的,后面再说。
消费的两种模式
前面说过,消费者是有个消费者组的概念,在启动消费者的时候会指定该消费者属于哪个消费者组。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");
一个消费者组中可以有多个消费者,不同消费者组之间消费消息是互不干扰的。
在同一个消费者组中,消息消费有两种模式。
- 集群模式
- 广播模式
集群模式
同一条消息只能被同一个消费组下的一个消费者消费,也就是说,同一条消息在同一个消费者组底下只会被消费一次,这就叫集群消费。
集群消费的实现就是将队列按照一定的算法分配给消费者,默认是按照平均分配的。
如图所示,将每个队列分配只分配给同一个消费者组中的一个消费者,这样消息就只会被一个消费者消费,从而实现了集群消费的效果。
RocketMQ默认是集群消费的模式。
广播模式
广播模式就是同一条消息可以被同一个消费者组下的所有消费者消费。
其实实现也很简单,就是将所有队列分配给每个消费者,这样每个消费者都能读取topic底下所有的队列的数据,就实现了广播模式。
如果你想使用广播模式,只需要在代码中指定即可。
consumer.setMessageModel(MessageModel.BROADCASTING);
ConsumeQueue
上一节我们提到消费者是从队列中拉取消息的,但是这里不经就有一个疑问,那就是消息明明都存在CommitLog文件中的,那么是如何去队列中拉的呢?难道是去遍历所有的文件,找到对应队列的消息进行消费么?
答案是否定的,因为这种每次都遍历数据的效率会很低,所以为了解决这种问题,引入了ConsumeQueue的这个概念,而消费实际是从ConsumeQueue中拉取数据的。
用户在创建topic的时候,Broker会为topic创建队列,并且每个队列其实会有一个编号queueId,每个队列都会对应一个ConsumeQueue,比如说一个topic在某个Broker上有4个队列,那么就有4个ConsumeQueue。
前面说过,消息在发送的时候,会根据一定的算法选择一个队列,之后再发送消息的时候会携带选择队列的queueId,这样Broker就知道消息属于哪个队列的了。当消息被存到CommitLog之后,其实还会往这条消息所在的队列的ConsumeQueue插一条数据。
ConsumeQueue也是由多个文件组成,每个文件默认是存30万条数据。
插入ConsumeQueue中的每条数据由20个字节组成,包含3部分信息,消息在CommitLog的起始位置(8个字节),消息在CommitLog存储的长度(8个字节),还有就是tag的hashCode(4个字节)。
所以当消费者从Broker拉取消息的时候,会告诉Broker拉取哪个队列(queueId)的消息、这个队列的哪个位置的消息(queueOffset)。
queueOffset就是指上图中ConsumeQueue一条数据的编号,单调递增的。
Broker在接受到消息的时候,找个指定队列的ConsumeQueue,由于每条数据固定是20个字节,所以可以轻易地计算出queueOffset对应的那条数据在哪个文件的哪个位置上,然后读出20个字节,从这20个字节中在解析出消息在CommitLog的起始位置和存储的长度,之后再到CommitLog中去查找,这样就找到了消息,然后在进行一些处理操作返回给消费者。
到这,我们就清楚的知道消费者是如何从队列中拉取消息的了,其实就是先从这个队列对应的ConsumeQueue中找到消息所在CommmitLog中的位置,然后再从CommmitLog中读取消息的。
RocketMQ如何实现消息的顺序性
这里插入一个比较常见的一个面试,那么如何保证保证消息的顺序性。
其实要想保证消息的顺序只要保证以下三点即可
- 生产者将需要保证顺序的消息发送到同一个队列
- 消息队列在存储消息的时候按照顺序存储
- 消费者按照顺序消费消息
第一点如何保证生产者将消息发送到同一个队列?
上文提到过RocketMQ生产者在发送消息的时候需要选择一个队列,并且选择算法是可以自定义的,这样我们只需要在根据业务需要,自定义队列选择算法,将顺序消息都指定到同一个队列,在发送消息的时候指定该算法,这样就实现了生产者发送消息的顺序性。
第二点,RocketMQ在存消息的时候,是按照顺序保存消息在ConsumeQueue中的位置的,由于消费消息的时候是先从ConsumeQueue查找消息的位置,这样也就保证了消息存储的顺序性。
第三点消费者按照顺序消费消息,这个RocketMQ已经实现了,只需要在消费消息的时候指定按照顺序消息消费即可,如下面所示,注册消息的监听器的时候使用MessageListenerOrderly这个接口的实现。
consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { //按照顺序消费消息记录 return null; } });
消息清理
由于消息是存磁盘的,但是磁盘空间是有限的,所以对于磁盘上的消息是需要清理的。
当出现以下几种情况下时就会触发消息清理:
- 手动执行删除
- 默认每天凌晨4点会自动清理过期的文件
- 当磁盘空间占用率默认达到75%之后,会自动清理过期文件
- 当磁盘空间占用率默认达到85%之后,无论这个文件是否过期,都会被清理掉
上述过期的文件是指文件最后一次修改的时间超过72小时(默认情况下),当然如果你的老板非常有钱,服务器的磁盘空间非常大,可以将这个过期时间修改的更长一点。
有的小伙伴肯定会有疑问,如果消息没有被消息,那么会被清理么?
答案是会被清理的,因为清理消息是直接删除CommitLog文件,所以只要达到上面的条件就会直接删除CommitLog文件,无论文件内的消息是否被消费过。
当消息被清理完之后,消息也就结束了它精彩的一生。
消息的一生总结
为了更好地理解本文,这里再来总结一下RokcetMQ消息一生的各个环节。
消息发送
- 生产者产生消息
- 生产者在发送消息之前会拉取topic的路由信息
- 根据队列选择算法,从topic众多的队列中选择一个队列
- 跟队列所在的Broker机器建立网络连接,将消息发送到Broker上
消息存储
- Broker接收到生产者的消息将消息存到CommitLog中
- 在CosumeQueue中存储这条消息在CommitLog中的位置
由于CommitLog和CosumeQueue都涉及到磁盘文件的读写操作,为了提高读写效率,RokcetMQ使用到了零拷贝技术,其实就是调用了一下Java提供的api。。
高可用
如果是集群模式,那么消息会被同步到从节点,从节点会将消息存到自己的CommitLog文件中。这样就算主节点挂了,从节点仍然可以对外提供访问。
消息消费
- 消费者会拉取订阅的Topic的路由信息,根据集群消费或者广播消费的模式来选择需要拉取消息的队列
- 与队列所在的机器建立连接,向Broker发送拉取消息的请求
- Broker在接收到请求知道,找到队列对应的ConsumeQueue,然后计算出拉取消息的位置,再解析出消息在CommitLog中的位置
- 根据解析出的位置,从CommitLog中读出消息的内容返回给消费者
消息清理
由于消息是存在磁盘的,而磁盘的空间是有限的,所以RocketMQ会根据一些条件去清理CommitLog文件。