从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(十三)rocketmq 篇(3): 消息读写队列,消息存储,消息发送,消息消费关联流程和原理

简介: 从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(十三)rocketmq 篇(3): 消息读写队列,消息存储,消息发送,消息消费关联流程和原理

本文承接上文《从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(十三)rocketmq 篇(2):路由注册,消息发送核心流程原理》

ae48956613064dabae7290d6474de3c4.png

闲话少说,我们直接上图,我这特意用颜色标注了一下,注意观察颜色相同的部分


d9999dc8c74c4d62a5d5d2d6b668b61a.png

流程描述

消息生产-存储流程

1.首选生产者从本地缓存或者从nameserver 获取到对应topic 对应的broker路由以及quene 写队列

2.生产者本地使用负载均衡策略选择一个broker和队列进行发送

3.broker 接到消息后会直接保存或者通过page cache 和内存映射首先将消息保存如内存中,

然后定时去保存到commitLog里,具体看是同步保存还是异步保存

4.broker 会启动定时任务监听commitLog 文件更新,如果有更新,

会同步到consumeQuene和index中,comsumeQuene结构为/topic名/queneid/xxx

消息消费-存储流程

1.消费者从nameserver 获取到对应topic 对应的broker路由以及quene 读队列

2.然后开启一个线程去批量拉取消息,将消息放入消息租possessMessage 内

3.处理possessMessage ,处理完一批后保存消费进度到本地

4.启动定时任务发送消费进度到broker端

5.broker 同步进度文件consumeOffset.json

消息存储结构

消息存储结构图

RocketMQ存储路径为${ROCKET_HOME}/store

5714c557fcc340d3b281e768257be72b.png

核心文件数据结构介绍

commitLog 数据结构

消息主体以及元数据的存储主体,存储消息生产端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1GB,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824。第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件  

81592cd38a6147b0a84a9edd3414d4a2.png


673069028ec74661a5cb4576ce7f8e38.png

RocketMQ基于主题订阅模式实现消息消费,消费者关心的是一个主题下的所有消息,但同一主题的消息是不连续地存储在CommitLog文件中的。如果消息消费者直接从消息存储文件中遍历查找订阅主题下的消息,效率将极其低下。RocketMQ为了适应消息消费的检索需求,设计了ConsumeQueue文件,该文件可以看作CommitLog关于消息消费的“索引”文件,ConsumeQueue的第一级目录为消息主题,第二级目录为主题的消息队列


单个ConsumeQueue文件中默认包含30万个条目,单个文件的长度为3×106×20字节,单个ConsumeQueue文件可以看作一个ConsumeQueue条目的数组,其下标为ConsumeQueue的逻辑偏移量,消息消费进度存储的偏移量即逻辑偏移量。ConsumeQueue即为CommitLog文件的索引文件,其构建机制是当消息到达CommitLog文件后,由专门的线程产生消息转发任务  

index 数据结构


ConsumeQueue是RocketMQ专门为消息订阅构建的索引文件,目的是提高根据主题与消息队列检索消息的速度。另外,RocketMQ引入哈希索引机制为消息建立索引,HashMap的设计包含两个基本点:哈希槽与哈希冲突的链表结构。




Index包含Index文件头、哈希槽、Index条目(数据)。Index文件头包含40字节,记录该Index的统计信息,其结构如下。

1)beginTimestamp:Index文件中消息的最小存储时间。

2)endTimestamp:Index文件中消息的最大存储时间。

3)beginPhyoffset:Index文件中消息的最小物理偏移量(CommitLog文件偏移量)。

4)endPhyoffset:Index文件中消息的最大物理偏移量(CommitLog文件偏移量)。

5)hashslotCount:hashslot个数,并不是哈希槽使用的个数,在这里意义不大。

6)indexCount:Index条目列表当前已使用的个数,Index条目在Index条目列表中按顺序存储。

一个Index默认包含500万个哈希槽。哈希槽存储的是落在该哈希槽的哈希码最新的Index索引。默认一个Index文件包含2000万个条目,每个Index条目结构如下。

1)hashcode:key的哈希码。

2)phyoffset:消息对应的物理偏移量。


3)timedif:该消息存储时间与第一条消息的时间戳的差值,若小于0,则该消息无效。

4)pre index no:该条目的前一条记录的Index索引,当出现哈希冲突时,构建链表结构。

接下来重点分析如何将Map<String/*消息索引key*/,long phyOffset/*消息物理偏移量*/>存入Index文件,以及如何根据消息索引key快速查找消息。

RocketMQ将消息索引键与消息偏移量的映射关系写入Index的实现方法为public boolean putKey(final String key, final long phyOffset, final long storeTimestamp),参数含义分别为消息索引、消息物理偏移量、消息存储时间

消息读写队列的概念

每个tpoic 在broker 中创建的时候都会默认创建4个读队列和4个写队列

独写队列不是我们传统意义理解的独写分离实际存在的队列,实际上只是两个数字变量,

用来返回给消息生产者和消息消费者选择发送队列用的,

比如生产者连接broker topic-1的时候如果写队列设置4,那么就会返回broker-0 ,broker-1,broker-2,broker-3

这时候就会从0~3选择一个发送到broker ,消费者连接borker topic-1的时候如果读队列设置未4,根据nameserver 负载均衡后

那么就会会返回broker-0 ,broker-1,broker-2,broker-3,一个或者多个,


注意点:无论一发送端还是消费端,实际上都是针对文件的操作,

也就是上面提到的commitLog 和consumeQuene,而不是针对的java的实际几个队列,主要流程图下图

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
负载均衡 Java API
《深入理解Spring》Spring Cloud 构建分布式系统的微服务全家桶
Spring Cloud为微服务架构提供一站式解决方案,涵盖服务注册、配置管理、负载均衡、熔断限流等核心功能,助力开发者构建高可用、易扩展的分布式系统,并持续向云原生演进。
|
4月前
|
监控 Java 数据库
从零学 Dropwizard:手把手搭轻量 Java 微服务,告别 Spring 臃肿
Dropwizard 整合 Jetty、Jersey 等成熟组件,开箱即用,无需复杂配置。轻量高效,启动快,资源占用少,内置监控、健康检查与安全防护,搭配 Docker 部署便捷,是构建生产级 Java 微服务的极简利器。
411 3
|
9月前
|
消息中间件 存储 缓存
RocketMQ原理—4.消息读写的性能优化
本文详细解析了RocketMQ消息队列的核心原理与性能优化机制,涵盖Producer消息分发、Broker高并发写入、Consumer拉取消息流程等内容。重点探讨了基于队列的消息分发、Hash有序分发、CommitLog内存写入优化、ConsumeQueue物理存储设计等关键技术点。同时分析了数据丢失场景及解决方案,如同步刷盘与JVM OffHeap缓存分离策略,并总结了写入与读取流程的性能优化方法,为理解和优化分布式消息系统提供了全面指导。
RocketMQ原理—4.消息读写的性能优化
|
10月前
|
安全 Java Apache
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 身份和权限认证
本文介绍了 Apache Shiro 的身份认证与权限认证机制。在身份认证部分,分析了 Shiro 的认证流程,包括应用程序调用 `Subject.login(token)` 方法、SecurityManager 接管认证以及通过 Realm 进行具体的安全验证。权限认证部分阐述了权限(permission)、角色(role)和用户(user)三者的关系,其中用户可拥有多个角色,角色则对应不同的权限组合,例如普通用户仅能查看或添加信息,而管理员可执行所有操作。
533 0
|
10月前
|
安全 Java 数据安全/隐私保护
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 三大核心组件
本课程介绍如何在Spring Boot中集成Shiro框架,主要讲解Shiro的认证与授权功能。Shiro是一个简单易用的Java安全框架,用于认证、授权、加密和会话管理等。其核心组件包括Subject(认证主体)、SecurityManager(安全管理员)和Realm(域)。Subject负责身份认证,包含Principals(身份)和Credentials(凭证);SecurityManager是架构核心,协调内部组件运作;Realm则是连接Shiro与应用数据的桥梁,用于访问用户账户及权限信息。通过学习,您将掌握Shiro的基本原理及其在项目中的应用。
383 0
|
9月前
|
消息中间件 存储 设计模式
RocketMQ原理—5.高可用+高并发+高性能架构
本文主要从高可用架构、高并发架构、高性能架构三个方面来介绍RocketMQ的原理。
2986 21
RocketMQ原理—5.高可用+高并发+高性能架构
|
9月前
|
存储 消息中间件 缓存
RocketMQ原理—3.源码设计简单分析下
本文介绍了Producer作为生产者是如何创建出来的、启动时是如何准备好相关资源的、如何从拉取Topic元数据的、如何选择MessageQueue的、与Broker是如何进行网络通信的,Broker收到一条消息后是如何存储的、如何实时更新索引文件的、如何实现同步刷盘以及异步刷盘的、如何清理存储较久的磁盘数据的,Consumer作为消费者是如何创建和启动的、消费者组的多个Consumer会如何分配消息、Consumer会如何从Broker拉取一批消息。
428 11
RocketMQ原理—3.源码设计简单分析下
|
9月前
|
存储 消息中间件 网络协议
RocketMQ原理—1.RocketMQ整体运行原理
本文详细解析了RocketMQ的整体运行原理,涵盖从生产者到消费者的全流程。首先介绍生产者发送消息的机制,包括Topic与MessageQueue的关系及写入策略;接着分析Broker如何通过CommitLog和ConsumeQueue实现消息持久化,并探讨同步与异步刷盘的优缺点。同时,讲解基于DLedger技术的主从同步原理,确保高可用性。消费者部分则重点讨论消费模式(集群 vs 广播)、拉取消息策略及负载均衡机制。网络通信层面,基于Netty的高性能架构通过多线程池分工协作提升并发能力。最后,揭示mmap与PageCache技术优化文件读写的细节,总结了RocketMQ的核心运行机制。
RocketMQ原理—1.RocketMQ整体运行原理
|
9月前
|
消息中间件 Java 数据管理
RocketMQ原理—2.源码设计简单分析上
本文介绍了NameServer的启动脚本、启动时会解析哪些配置、如何初始化Netty网络服务器、如何启动Netty网络服务器,介绍了Broker启动时是如何初始化配置的、BrokerController的创建以及包含的组件、BrokerController的初始化、启动、Broker如何把自己注册到NameServer上、BrokerOuterAPI是如何发送注册请求的,介绍了NameServer如何处理Broker的注册请求、Broker如何发送定时心跳