页缓存其实就是操作系统对文件的缓存,用来加速文件的读写,也就是说对文件的写入先写到页缓存中,操作系统会不定期刷盘(时间不可控),对文件的读会先加载到页缓存中,并且根据局部性原理还会预读临近块的内容。
其实也是因为使用内存映射机制,所以 RocketMQ 的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存中。
文件预分配和文件预热
而内存映射也只是做了映射,只有当真正读取页面的时候产生缺页中断,才会将数据真正加载到内存中,所以 RocketMQ 做了一些优化,防止运行时的性能抖动。
文件预分配
CommitLog 的大小默认是1G,当超过大小限制的时候需要准备新的文件,而 RocketMQ 就起了一个后台线程 AllocateMappedFileService,不断的处理 AllocateRequest,AllocateRequest 其实就是预分配的请求,会提前准备好下一个文件的分配,防止在消息写入的过程中分配文件,产生抖动。
文件预热
有一个 warmMappedFile 方法,它会把当前映射的文件,每一页遍历多去,写入一个0字节,然后再调用mlock 和 madvise(MADV_WILLNEED)。
mlock:可以将进程使用的部分或者全部的地址空间锁定在物理内存中,防止其被交换到 swap 空间。
madvise:给操作系统建议,说这文件在不久的将来要访问的,因此,提前读几页可能是个好主意。
小结一下
CommitLog 采用混合型存储,也就是所有 Topic 都存在一起,顺序追加写入,文件名用起始偏移量命名。
消息先写入 CommitLog 再通过后台线程分发到 ConsumerQueue 和 IndexFile 中。
消费者先读取 ConsumerQueue 得到真正消息的物理地址,然后访问 CommitLog 得到真正的消息。
利用了 mmap 机制减少一次拷贝,利用文件预分配和文件预热提高性能。
提供同步和异步刷盘,根据场景选择合适的机制。
Broker 的 HA
从 Broker 会和主 Broker 建立长连接,然后获取主 Broker commitlog 最大偏移量,开始向主 Broker 拉取消息,主 Broker 会返回一定数量的消息,循环进行,达到主从数据同步。
消费者消费消息会先请求主 Broker ,如果主 Broker 觉得现在压力有点大,则会返回从 Broker 拉取消息的建议,然后消费者就去从服务器拉取消息。
Consumer
消费有两种模式,分别是广播模式和集群模式。
广播模式:一个分组下的每个消费者都会消费完整的Topic 消息。
集群模式:一个分组下的消费者瓜分消费Topic 消息。
一般我们用的都是集群模式。
而消费者消费消息又分为推和拉模式,详细看我这篇文章消息队列推拉模式,分别从源码级别分析了 RokcetMQ 和 Kafka 的消息推拉,以及推拉模式的优缺点。
Consumer 端的负载均衡机制
Consumer 会定期的获取 Topic 下的队列数,然后再去查找订阅了该 Topic 的同一消费组的所有消费者信息,默认的分配策略是类似分页排序分配。
将队列排好序,然后消费者排好序,比如队列有 9 个,消费者有 3 个,那消费者-1 消费队列 0、1、2 的消息,消费者-2 消费队列 3、4、5,以此类推。
所以如果负载太大,那么就加队列,加消费者,通过负载均衡机制就可以感知到重平衡,均匀负载。
Consumer 消息消费的重试
难免会遇到消息消费失败的情况,所以需要提供消费失败的重试,而一般的消费失败要么就是消息结构有误,要么就是一些暂时无法处理的状态,所以立即重试不太合适。
RocketMQ 会给每个消费组都设置一个重试队列,Topic 是 %RETRY%+consumerGroup
,并且设定了很多重试级别来延迟重试的时间。
为了利用 RocketMQ 的延时队列功能,重试的消息会先保存在 Topic 名称为“SCHEDULE_TOPIC_XXXX”的延迟队列,在消息的扩展字段里面会存储原来所属的 Topic 信息。
delay 一段时间后再恢复到重试队列中,然后 Consumer 就会消费这个重试队列主题,得到之前的消息。
如果超过一定的重试次数都消费失败,则会移入到死信队列,即 Topic %DLQ%" + ConsumerGroup
中,存储死信队列即认为消费成功,因为实在没辙了,暂时放过。
然后我们可以通过人工来处理死信队列的这些消息。
消息的全局顺序和局部顺序
全局顺序就是消除一切并发,一个 Topic 一个队列,Producer 和 Consuemr 的并发都为一。
局部顺序其实就是指某个队列顺序,多队列之间还是能并行的。
可以通过 MessageQueueSelector 指定 Producer 某个业务只发这一个队列,然后 Comsuer 通过MessageListenerOrderly 接受消息,其实就是加锁消费。
在 Broker 会有一个 mqLockTable ,顺序消息在创建拉取消息任务的时候需要在 Broker 锁定该消息队列,之后加锁成功的才能消费。
而严格的顺序消息其实很难,假设现在都好好的,如果有个 Broker 宕机了,然后发生了重平衡,队列对应的消费者实例就变了,就会有可能会出现乱序的情况,如果要保持严格顺序,那此时就只能让整个集群不可用了。
一些注意点
1、订阅消息是以 ConsumerGroup 为单位存储的,所以ConsumerGroup 中的每个 Consumer 需要有相同的订阅。
因为订阅消息是随着心跳上传的,如果一个 ConsumerGroup 中 Consumer 订阅信息不一样,那么就会出现互相覆盖的情况。
比如消费者 A 订阅 Topic a,消费者 B 订阅 Topic b,此时消费者 A 去 Broker 拿消息,然后 B 的心跳包发出了,Broker 更新了,然后接到 A 的请求,一脸懵逼,没这订阅关系啊。
2、RocketMQ 主从读写分离
从只能读,不能写,并且只有当前客户端读的 offset 和 当前 Broker 已接受的最大 offset 超过限制的物理内存大小时候才会去从读,所以正常情况下从分担不了流量
3、单单加机器提升不了消费速度,队列的数量也需要跟上。
4、之前提到的,不要允许自动创建主题
RocketMQ 的最佳实践
这些最佳实践部分参考自官网。
Tags的使用
建议一个应用一个 Topic,利用 tages 来标记不同业务,因为 tages 设置比较灵活,且一个应用一个 Topic 很清晰,能直观的辨别。
Keys的使用
如果有消息业务上的唯一标识,请填写到 keys 字段中,方便日后的定位查找。
提高 Consumer 的消费能力
1、提高消费并行度:增加队列数和消费者数量,提高单个消费者的并行消费线程,参数 consumeThreadMax。
2、批处理消费,设置 consumeMessageBatchMaxSize 参数,这样一次能拿到多条消息,然后比如一个 update语句之前要执行十次,现在一次就执行完。
3、跳过非核心的消息,当负载很重的时候,为了保住那些核心的消息,设置那些非核心的消息,例如此时消息堆积 1W 条了之后,就直接返回消费成功,跳过非核心消息。
NameServer 的寻址
请使用 HTTP 静态服务器寻址(默认),这样 NameServer 就能动态发现。
JVM选项
以下抄自官网:
如果不关心 RocketMQ Broker的启动时间,通过“预触摸” Java 堆以确保在 JVM 初始化期间每个页面都将被分配。
那些不关心启动时间的人可以启用它: -XX:+AlwaysPreTouch 禁用偏置锁定可能会减少JVM暂停, -XX:-UseBiasedLocking 至于垃圾回收,建议使用带JDK 1.8的G1收集器。
-XX:+UseG1GC -XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30
另外不要把-XX:MaxGCPauseMillis的值设置太小,否则JVM将使用一个小的年轻代来实现这个目标,这将导致非常频繁的minor GC,所以建议使用rolling GC日志文件:
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m
Linux内核参数
以下抄自官网:
- vm.extra_free_kbytes,告诉VM在后台回收(kswapd)启动的阈值与直接回收(通过分配进程)的阈值之间保留额外的可用内存。RocketMQ使用此参数来避免内存分配中的长延迟。(与具体内核版本相关)
- vm.min_free_kbytes,如果将其设置为低于1024KB,将会巧妙的将系统破坏,并且系统在高负载下容易出现死锁。
- vm.max_map_count,限制一个进程可能具有的最大内存映射区域数。RocketMQ将使用mmap加载CommitLog和ConsumeQueue,因此建议将为此参数设置较大的值。(agressiveness --> aggressiveness)
- vm.swappiness,定义内核交换内存页面的积极程度。较高的值会增加攻击性,较低的值会减少交换量。建议将值设置为10来避免交换延迟。
- File descriptor limits,RocketMQ需要为文件(CommitLog和ConsumeQueue)和网络连接打开文件描述符。我们建议设置文件描述符的值为655350。
- Disk scheduler,RocketMQ建议使用I/O截止时间调度器,它试图为请求提供有保证的延迟。
最后
其实还有很多没讲,比如流量控制、消息的过滤、定时消息的实现,包括底层通信 1+N+M1+M2 的 Reactor 多线程设计等等。
主要是内容太多了,而且也不太影响主流程,所以还是剥离出来之后写吧,大致的一些实现还是讲了的。
包括元信息的交互、消息的发送、存储、消费等等。
关于事务消息的那一块我之前文章也分析过了,所以这个就不再贴了。
可以看到要实现一个生产级别的消息队列还是有很多很多东西需要考虑的,不过大致的架构和涉及到的模块差不多就这些了。
至于具体的细节深入,还是得靠大家自行研究了,我就起个抛砖引玉的作用。
最后个人能力有限,如果哪里有纰漏请抓紧联系鞭挞我!