本文承接上文《从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(十三)rocketmq 篇(2):路由注册,消息发送核心流程原理》
闲话少说,我们直接上图,我这特意用颜色标注了一下,注意观察颜色相同的部分
流程描述
消息生产-存储流程
1.首选生产者从本地缓存或者从nameserver 获取到对应topic 对应的broker路由以及quene 写队列
2.生产者本地使用负载均衡策略选择一个broker和队列进行发送
3.broker 接到消息后会直接保存或者通过page cache 和内存映射首先将消息保存如内存中,
然后定时去保存到commitLog里,具体看是同步保存还是异步保存
4.broker 会启动定时任务监听commitLog 文件更新,如果有更新,
会同步到consumeQuene和index中,comsumeQuene结构为/topic名/queneid/xxx
消息消费-存储流程
1.消费者从nameserver 获取到对应topic 对应的broker路由以及quene 读队列
2.然后开启一个线程去批量拉取消息,将消息放入消息租possessMessage 内
3.处理possessMessage ,处理完一批后保存消费进度到本地
4.启动定时任务发送消费进度到broker端
5.broker 同步进度文件consumeOffset.json
消息存储结构
消息存储结构图
RocketMQ存储路径为${ROCKET_HOME}/store
核心文件数据结构介绍
commitLog 数据结构
消息主体以及元数据的存储主体,存储消息生产端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1GB,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824。第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件
RocketMQ基于主题订阅模式实现消息消费,消费者关心的是一个主题下的所有消息,但同一主题的消息是不连续地存储在CommitLog文件中的。如果消息消费者直接从消息存储文件中遍历查找订阅主题下的消息,效率将极其低下。RocketMQ为了适应消息消费的检索需求,设计了ConsumeQueue文件,该文件可以看作CommitLog关于消息消费的“索引”文件,ConsumeQueue的第一级目录为消息主题,第二级目录为主题的消息队列
单个ConsumeQueue文件中默认包含30万个条目,单个文件的长度为3×106×20字节,单个ConsumeQueue文件可以看作一个ConsumeQueue条目的数组,其下标为ConsumeQueue的逻辑偏移量,消息消费进度存储的偏移量即逻辑偏移量。ConsumeQueue即为CommitLog文件的索引文件,其构建机制是当消息到达CommitLog文件后,由专门的线程产生消息转发任务
index 数据结构
ConsumeQueue是RocketMQ专门为消息订阅构建的索引文件,目的是提高根据主题与消息队列检索消息的速度。另外,RocketMQ引入哈希索引机制为消息建立索引,HashMap的设计包含两个基本点:哈希槽与哈希冲突的链表结构。
Index包含Index文件头、哈希槽、Index条目(数据)。Index文件头包含40字节,记录该Index的统计信息,其结构如下。
1)beginTimestamp:Index文件中消息的最小存储时间。
2)endTimestamp:Index文件中消息的最大存储时间。
3)beginPhyoffset:Index文件中消息的最小物理偏移量(CommitLog文件偏移量)。
4)endPhyoffset:Index文件中消息的最大物理偏移量(CommitLog文件偏移量)。
5)hashslotCount:hashslot个数,并不是哈希槽使用的个数,在这里意义不大。
6)indexCount:Index条目列表当前已使用的个数,Index条目在Index条目列表中按顺序存储。
一个Index默认包含500万个哈希槽。哈希槽存储的是落在该哈希槽的哈希码最新的Index索引。默认一个Index文件包含2000万个条目,每个Index条目结构如下。
1)hashcode:key的哈希码。
2)phyoffset:消息对应的物理偏移量。
3)timedif:该消息存储时间与第一条消息的时间戳的差值,若小于0,则该消息无效。
4)pre index no:该条目的前一条记录的Index索引,当出现哈希冲突时,构建链表结构。
接下来重点分析如何将Map<String/*消息索引key*/,long phyOffset/*消息物理偏移量*/>存入Index文件,以及如何根据消息索引key快速查找消息。
RocketMQ将消息索引键与消息偏移量的映射关系写入Index的实现方法为public boolean putKey(final String key, final long phyOffset, final long storeTimestamp),参数含义分别为消息索引、消息物理偏移量、消息存储时间
消息读写队列的概念
每个tpoic 在broker 中创建的时候都会默认创建4个读队列和4个写队列
独写队列不是我们传统意义理解的独写分离实际存在的队列,实际上只是两个数字变量,
用来返回给消息生产者和消息消费者选择发送队列用的,
比如生产者连接broker topic-1的时候如果写队列设置4,那么就会返回broker-0 ,broker-1,broker-2,broker-3
这时候就会从0~3选择一个发送到broker ,消费者连接borker topic-1的时候如果读队列设置未4,根据nameserver 负载均衡后
那么就会会返回broker-0 ,broker-1,broker-2,broker-3,一个或者多个,
注意点:无论一发送端还是消费端,实际上都是针对文件的操作,
也就是上面提到的commitLog 和consumeQuene,而不是针对的java的实际几个队列,主要流程图下图