吐血总结——消息队列之RocketMQ知识梳理

简介: 消息队列主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。今天我就首先分析一下RocketMQ,目前公司用的也是这个,因此在进行一下梳理,加深一下印象。

摘要

在说消息队列之前,我们要明白为啥需要消息队列,知乎上有一篇文章写的不错,链接: 为什么要使用消息队列?


消息队列主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。今天我就首先分析一下RocketMQ,目前公司用的也是这个,因此在进行一下梳理,加深一下印象。

RocketMQ概述

RocketMQ为分布式消息中间件,其高性能在于顺序写盘(CommitLog)、零拷贝和跳跃读(尽量命中PageCache),高可靠性在于刷盘机制和Master/Slave,另外NameServer如果全部挂掉都不会影响已经运行的Broker,Producer,Consumer。


RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。具有以下特性:

1、支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型

2、在一个队列中可靠的先进先出(FIFO)和严格的顺序传递

3、支持拉(pull)和推(push)两种消息模式

4、单一队列百万消息的堆积能力

5、支持多种消息协议,如 JMS、MQTT 等

6、分布式高可用的部署架构,满足至少一次消息传递语义

7、提供 docker 镜像用于隔离测试和云集群部署

8、提供配置、指标和监控等功能丰富的Dashboard


那么首先先明白NameServer、Broker、Producer、Consumer都是什么,其实后面当在分析Kafka的时候,就会发现两点很像,因为RocketMQ是阿里根据Kafka架构进行的自研开发,在一些功能结构上面保留了Kafka的一些特性。首先我们先看一下RocketMQ的架构图:

从图中就能够看出来这是一个双主双从的集群模式结构图,我们根据这个图进行一下分析:

NameServer

为producer 和 consumer 提供路由信息,记录broker与topic的关系,然后在这个基础上对Broker进行每十秒的监测,判断Broker是否依然存活。

其优点如下:

1、可集群部署
2、相互之间独立,没有通信,不必保障节点间的数据强一致性
3、其他角色同时向多个NameServer机器上报状态信息
4、本身是无状态的,NameServer中的Broker、Topic等状态信息不会持久存储,由各个角色定时上报并存储到内存中的(NameServer支持配置参数的持久化,一般用不到)
5、采用每30s心跳机制
6、长连接持续提供给Producer和Consumer Topic信息
7、存储当前集群所有的Broker信息、Topic与Broker的对应关系(
1)broker的基本信息(ip port等)2)主题topic的地址信息3)broker集群信息4)存活的broker信息5)filter 过滤器
)
8、只做集群元数据存储和心跳工作,功能简单,稳定性高
9、多机热备,单台NameServer宕机不影响其他NameServer工作
10、每个NameServer注册的信息都是一样的,而且是当前系统中的所有broker的元数据信息

需要注意的是,即使整个NameServer集群宕机了,已经正常工作的Producer、Consumer、Broker仍然能正常工作,但新起的Producer、Consumer、Broker就无法工作。

Broker(Master):

MQ 中最核心的部分,是 MQ 的服务端,核心逻辑几乎全在这里,它为生产者和消费者提供 RPC 接口,负责消息的存储、备份和删除,以及消费关系的维护,同时用来消息存储和生产消费转发。

1、单个Broker跟所有Namesrv保持心跳请求,心跳间隔为30秒
2、心跳请求中包括当前Broker信息(IP+端口等)以及存储所有topic信息
3、RocketMQ消息代理服务器主节点,负责接收Producer发送的消息、消息存储、Consumer拉取消息;
Broker(Slave):
RocketMQ消息代理服务器备份节点,主要是通过同步/异步的方式将主节点的消息同步过来进行备份,为RocketMQ集群的高可用性提供保障;

Producer:

1、负责生产消息,一般由业务系统负责生产消息。
2、一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。
3、RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。
4、同步和异步方式均需要Broker返回确认信息,单向发送不需要

Consumer:


         

Queue

主要用于支持点对点的消息传递模式,即生产者将消息发送至队列,队列存在于Broker中,消息者从该队列中取出消息。这种传递方式的特点是,一个队列可以被多个生产者或消费者共用,但是某条消息一旦被某消费者取出,它将不再存在于队列中。即一条消息只能传递给一个消费者。

Topic(主题):

1、主要用于发布/订阅的传递模式。
2、生产者可将消息发布到Topic中,该Topic可由多个消费者订阅,所有订阅该Topic的消费者都能收到生产者发布的消息。
3、所有订阅的消费者都接收消息后,消息才会从Topic中移除。即一条消息可以传递给多个消费者。
4、来代表一种数据的集合,Topic 并不具有真正的属性,它只是一类数据的集合,不同类型的数据我们应该放到不同的 Topic 中
5、Topic 会分布式的进行存储;

Topic与Broker的对应关系

注意: 当我们真正使用 MQ 时,第一步应该总是先创建一些 Topic,作为数据集合存放不同类型的消息,其实本质上来讲和使用数据库时总是先创建表结构是一样的。

什么是长轮询机制?

Consumer从消息队列获取消息的方式主要有两种:pull和push。两种都有一些问题,比如说pull的情况下,有时候可能导致消息在服务端堆积,消息处理延时较高,有时候又可能因为消息队列中没有消息而导致空拉取,造成资源浪费,而在push的情况下,可能导致超出客户端压力,造成客户端卡死甚至宕机。于是,把pull和push相结合,得到了长轮询。

长轮询的机制是由客户端发起pull请求,服务端接收到客户端的请求后,如果发现队列中没有消息,并不立即返回,而是持有该请求一段时间,在此期间,服务端不断轮询队列中是否有新消息,如果有,则用现有连接将消息返回给客户端,如果一段时间内还是没有新消息,则返回空。长轮询机制的好处在于,其本质还是pull,所以,消息处理的主动权还是在客户端手中,客户端就可以根据自己的能力去做消息处理。而服务端持有请求一段时间的机制又很大程序的避免了空拉取,减少了资源的浪费。但是,这种机制也有一定问题,当客户端数量过多时,服务端可能在时间段内需要持有过多的连接,这种请求下,也会对服务端造成压力。不过,一般来说,消息队列的承压能力还是比较可靠的,再加上集群的保障,基本不用担心这个问题。

Rocketmq的工作流程是怎样的?

1、先启动Namesrv,Namesrver启动后监听端口,等待Broker、Produer、Consumer连接上来,相当于一个路由控制中心。

2、Broker启动,跟所有的Namesrver保持长连接,每30s发送一次心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,Namesrv集群中就有Topic跟Broker的映射关系。

3、收发消息前,先创建topic,创建topic时需要指定该topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。

4、Producer启动并且发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建长连接,直接向Broker发消息。

5、Consumer跟Producer类似,跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker,然后直接跟Broker建立连接通道,开始消费消息。

RocketMq使用哪种方式消费消息,pull还是push?

RocketMq提供两种方式:pull和push进行消息的消费 而RocketMq的push方式,本质上也是采用pull的方式进行实现的。也就是说这两种方式本质上都是采用consumer轮询从broker拉取消息的 push方式里,consumer把轮询过程封装了一层,并注册了MessageListener监听器。当轮询取到消息后,便唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉好像消息是被推送过来的 其实想想,消息统一都发到了broker,而broker又不会主动去push消息,那么消息肯定都是需要消费者主动去拉的喽~


后面知识点太多了 还是自己看吧,有时间再总结!!!


RocketMQ中的延迟消息

RocketMQ-延时消息Demo及实现原理分析

RocketMQ的消费模式

细谈RocketMQ的消费模式

RocketMQ 的核心 NameServer

不要和陌生人说话,消息中间件之 Topic

还在纠结秒杀?看看 MQ 如何搞定

为什么要使用消息中间件?

《吃透MQ系列》核心基础全在这里了,一文啃透!

分布式消息队列

RocketMQ相关问题研究

Broker 主从同步机制

RocketMQ刷盘机制

RocketMQ练习:

如果以集群的形式来进行演示的话必定会使得我电脑不堪重负,所以我采用单机的模式来进行。

@rocketMQ配置

106.12.50.23 rocketmq-nameserver-1

106.12.50.23 rocketmq-master-1

18.191.180.186 rocketmq-nameserver-2

18.191.180.186 rocketmq-master-2


broker-a.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH

启动命令

sed -i 's#${user.home}#/usr/local/mq/rocketmq#g' *.xml
启动nameserver
nohup sh mqnamesrv &
启动broker
nohup sh mqbroker -c  /usr/local/mq/rocketmq/conf/2m-2s-async/broker-a.properties > /dev/null 2>&1 &
jps

单机模式启动配置broker-a.properties

# 集群名称
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
# 0 表示Master,>0 表示Slave
brokerId=0
# nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver-1:9876
# 在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许Broker 自动创建Topic, 建议线下开启, 线上关闭
autoCreateTopicEnable=true
# 是否允许Broker 自动创建订阅组, 建议线下开启, 线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认是凌晨4点
deleteWhen=04
# 文件保留时间,默认是48小时
fileReservedTime=48
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30w条, 根据业务情况调整
mapedFileSizeConsumeQueue=30000
# destroyMapedFileIntervalForcibly=12000
# redeleteHangedFileInterval=12000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/mq/rocketmq/store
# commitLog存储路径
storePathCommitLog=/usr/local/mq/rocketmq/store/commitlog
# 消息队列储存路径
storePathConsumeQueue=/usr/local/mq/rocketmq/store/consumequeue
# 消息索引粗存路径
storePathIndex=/usr/local/mq/rocketmq/store/index
# checkpoint 文件储存路径
storeCheckpoint=/usr/local/mq/rocketmq/store/checkpoint
# abort 文件存储路径
abortFile=/usr/local/mq/rocketmq/store/abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker的角色
# -ASYNC_MASTER 异步复制Master
# -SYNC_MASTER 同步双写Master
# -SLAVE
brokerRole=ASYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# checkTransactionMessageEnable=false
# 发消息线程池数量
# sendMessageTreadPoolNums=128
# 拉消息线程池数量
# pullMessageTreadPoolNums=128lushDiskType=ASYNC_FLUSHH


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
100 7
|
2月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
91 8
|
1月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
2月前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
2月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
2月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
72 4
|
3月前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
82 9
|
3月前
|
消息中间件 监控 数据处理
解决方案 | 云消息队列RabbitMQ实践
解决方案 | 云消息队列RabbitMQ实践
55 1
|
3月前
|
消息中间件 弹性计算 运维
云消息队列RabbitMQ实践
本评测报告详细分析了阿里云云消息队列 RabbitMQ 版的实践原理、部署体验及核心优势。报告认为其在解决消息积压、脑裂难题及弹性伸缩方面表现优秀,但建议进一步细化架构优化策略和技术细节描述。部署文档详尽,对初学者友好,但仍需加强网络配置和版本兼容性说明。实际部署展示了其高可用性和成本优化能力,适用于高并发消息处理和分布式系统数据同步。为进一步提升方案,建议增加安全性配置指导、性能调优建议及监控告警系统设置。
|
2月前
|
消息中间件 监控 测试技术
云消息队列RabbitMQ实践 - 评测
根据反馈,对本解决方案的实践原理已有一定理解,描述整体清晰但需在消息队列配置与使用上增加更多示例和说明以助理解。部署体验中获得了一定的引导和文档支持,尽管文档仍有待完善;期间出现的配置文件错误及依赖库缺失等问题已通过查阅资料解决。设计验证展示了云消息队列RabbitMQ的核心优势,包括高可用性和灵活性,未来可通过增加自动化测试来提高系统稳定性。实践后,用户对方案解决问题的能力及适用场景有了明确认识,认为其具有实际生产价值,不过仍需在性能优化、安全性增强及监控功能上进行改进以适应高并发和大数据量环境。
47 0