摘要
在说消息队列之前,我们要明白为啥需要消息队列,知乎上有一篇文章写的不错,链接: 为什么要使用消息队列?
消息队列主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。今天我就首先分析一下RocketMQ,目前公司用的也是这个,因此在进行一下梳理,加深一下印象。
RocketMQ概述
RocketMQ为分布式消息中间件,其高性能在于顺序写盘(CommitLog)、零拷贝和跳跃读(尽量命中PageCache),高可靠性在于刷盘机制和Master/Slave,另外NameServer如果全部挂掉都不会影响已经运行的Broker,Producer,Consumer。
RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。具有以下特性:
1、支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
2、在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
3、支持拉(pull)和推(push)两种消息模式
4、单一队列百万消息的堆积能力
5、支持多种消息协议,如 JMS、MQTT 等
6、分布式高可用的部署架构,满足至少一次消息传递语义
7、提供 docker 镜像用于隔离测试和云集群部署
8、提供配置、指标和监控等功能丰富的Dashboard
那么首先先明白NameServer、Broker、Producer、Consumer都是什么,其实后面当在分析Kafka的时候,就会发现两点很像,因为RocketMQ是阿里根据Kafka架构进行的自研开发,在一些功能结构上面保留了Kafka的一些特性。首先我们先看一下RocketMQ的架构图:
从图中就能够看出来这是一个双主双从的集群模式结构图,我们根据这个图进行一下分析:
NameServer
为producer 和 consumer 提供路由信息,记录broker与topic的关系,然后在这个基础上对Broker进行每十秒的监测,判断Broker是否依然存活。
其优点如下:
1、可集群部署 2、相互之间独立,没有通信,不必保障节点间的数据强一致性 3、其他角色同时向多个NameServer机器上报状态信息 4、本身是无状态的,NameServer中的Broker、Topic等状态信息不会持久存储,由各个角色定时上报并存储到内存中的(NameServer支持配置参数的持久化,一般用不到) 5、采用每30s心跳机制 6、长连接持续提供给Producer和Consumer Topic信息 7、存储当前集群所有的Broker信息、Topic与Broker的对应关系( 1)broker的基本信息(ip port等)2)主题topic的地址信息3)broker集群信息4)存活的broker信息5)filter 过滤器 ) 8、只做集群元数据存储和心跳工作,功能简单,稳定性高 9、多机热备,单台NameServer宕机不影响其他NameServer工作 10、每个NameServer注册的信息都是一样的,而且是当前系统中的所有broker的元数据信息
需要注意的是,即使整个NameServer集群宕机了,已经正常工作的Producer、Consumer、Broker仍然能正常工作,但新起的Producer、Consumer、Broker就无法工作。
Broker(Master):
MQ 中最核心的部分,是 MQ 的服务端,核心逻辑几乎全在这里,它为生产者和消费者提供 RPC 接口,负责消息的存储、备份和删除,以及消费关系的维护,同时用来消息存储和生产消费转发。
1、单个Broker跟所有Namesrv保持心跳请求,心跳间隔为30秒 2、心跳请求中包括当前Broker信息(IP+端口等)以及存储所有topic信息 3、RocketMQ消息代理服务器主节点,负责接收Producer发送的消息、消息存储、Consumer拉取消息; Broker(Slave): RocketMQ消息代理服务器备份节点,主要是通过同步/异步的方式将主节点的消息同步过来进行备份,为RocketMQ集群的高可用性提供保障;
Producer:
1、负责生产消息,一般由业务系统负责生产消息。 2、一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。 3、RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。 4、同步和异步方式均需要Broker返回确认信息,单向发送不需要
Consumer:
Queue
主要用于支持点对点的消息传递模式,即生产者将消息发送至队列,队列存在于Broker中,消息者从该队列中取出消息。这种传递方式的特点是,一个队列可以被多个生产者或消费者共用,但是某条消息一旦被某消费者取出,它将不再存在于队列中。即一条消息只能传递给一个消费者。
Topic(主题):
1、主要用于发布/订阅的传递模式。 2、生产者可将消息发布到Topic中,该Topic可由多个消费者订阅,所有订阅该Topic的消费者都能收到生产者发布的消息。 3、所有订阅的消费者都接收消息后,消息才会从Topic中移除。即一条消息可以传递给多个消费者。 4、来代表一种数据的集合,Topic 并不具有真正的属性,它只是一类数据的集合,不同类型的数据我们应该放到不同的 Topic 中 5、Topic 会分布式的进行存储;
Topic与Broker的对应关系
注意: 当我们真正使用 MQ 时,第一步应该总是先创建一些 Topic,作为数据集合存放不同类型的消息,其实本质上来讲和使用数据库时总是先创建表结构是一样的。
什么是长轮询机制?
Consumer从消息队列获取消息的方式主要有两种:pull和push。两种都有一些问题,比如说pull的情况下,有时候可能导致消息在服务端堆积,消息处理延时较高,有时候又可能因为消息队列中没有消息而导致空拉取,造成资源浪费,而在push的情况下,可能导致超出客户端压力,造成客户端卡死甚至宕机。于是,把pull和push相结合,得到了长轮询。
长轮询的机制是由客户端发起pull请求,服务端接收到客户端的请求后,如果发现队列中没有消息,并不立即返回,而是持有该请求一段时间,在此期间,服务端不断轮询队列中是否有新消息,如果有,则用现有连接将消息返回给客户端,如果一段时间内还是没有新消息,则返回空。长轮询机制的好处在于,其本质还是pull,所以,消息处理的主动权还是在客户端手中,客户端就可以根据自己的能力去做消息处理。而服务端持有请求一段时间的机制又很大程序的避免了空拉取,减少了资源的浪费。但是,这种机制也有一定问题,当客户端数量过多时,服务端可能在时间段内需要持有过多的连接,这种请求下,也会对服务端造成压力。不过,一般来说,消息队列的承压能力还是比较可靠的,再加上集群的保障,基本不用担心这个问题。
Rocketmq的工作流程是怎样的?
1、先启动Namesrv,Namesrver启动后监听端口,等待Broker、Produer、Consumer连接上来,相当于一个路由控制中心。
2、Broker启动,跟所有的Namesrver保持长连接,每30s发送一次心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,Namesrv集群中就有Topic跟Broker的映射关系。
3、收发消息前,先创建topic,创建topic时需要指定该topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。
4、Producer启动并且发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建长连接,直接向Broker发消息。
5、Consumer跟Producer类似,跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker,然后直接跟Broker建立连接通道,开始消费消息。
RocketMq使用哪种方式消费消息,pull还是push?
RocketMq提供两种方式:pull和push进行消息的消费 而RocketMq的push方式,本质上也是采用pull的方式进行实现的。也就是说这两种方式本质上都是采用consumer轮询从broker拉取消息的 push方式里,consumer把轮询过程封装了一层,并注册了MessageListener监听器。当轮询取到消息后,便唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉好像消息是被推送过来的 其实想想,消息统一都发到了broker,而broker又不会主动去push消息,那么消息肯定都是需要消费者主动去拉的喽~
后面知识点太多了 还是自己看吧,有时间再总结!!!
RocketMQ中的延迟消息
RocketMQ的消费模式
RocketMQ练习:
如果以集群的形式来进行演示的话必定会使得我电脑不堪重负,所以我采用单机的模式来进行。
@rocketMQ配置
106.12.50.23 rocketmq-nameserver-1
106.12.50.23 rocketmq-master-1
18.191.180.186 rocketmq-nameserver-2
18.191.180.186 rocketmq-master-2
broker-a.properties
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. brokerClusterName=DefaultCluster brokerName=broker-a brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH
启动命令
sed -i 's#${user.home}#/usr/local/mq/rocketmq#g' *.xml 启动nameserver nohup sh mqnamesrv & 启动broker nohup sh mqbroker -c /usr/local/mq/rocketmq/conf/2m-2s-async/broker-a.properties > /dev/null 2>&1 & jps
单机模式启动配置broker-a.properties
# 集群名称 brokerClusterName=rocketmq-cluster # broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-a # 0 表示Master,>0 表示Slave brokerId=0 # nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver-1:9876 # 在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数 defaultTopicQueueNums=4 # 是否允许Broker 自动创建Topic, 建议线下开启, 线上关闭 autoCreateTopicEnable=true # 是否允许Broker 自动创建订阅组, 建议线下开启, 线上关闭 autoCreateSubscriptionGroup=true # Broker 对外服务的监听端口 listenPort=10911 # 删除文件时间点,默认是凌晨4点 deleteWhen=04 # 文件保留时间,默认是48小时 fileReservedTime=48 # commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每个文件默认存30w条, 根据业务情况调整 mapedFileSizeConsumeQueue=30000 # destroyMapedFileIntervalForcibly=12000 # redeleteHangedFileInterval=12000 # 检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 # 存储路径 storePathRootDir=/usr/local/mq/rocketmq/store # commitLog存储路径 storePathCommitLog=/usr/local/mq/rocketmq/store/commitlog # 消息队列储存路径 storePathConsumeQueue=/usr/local/mq/rocketmq/store/consumequeue # 消息索引粗存路径 storePathIndex=/usr/local/mq/rocketmq/store/index # checkpoint 文件储存路径 storeCheckpoint=/usr/local/mq/rocketmq/store/checkpoint # abort 文件存储路径 abortFile=/usr/local/mq/rocketmq/store/abort # 限制的消息大小 maxMessageSize=65536 # flushCommitLogLeastPages=4 # flushConsumeQueueLeastPages=2 # flushCommitLogThoroughInterval=10000 # flushConsumeQueueThoroughInterval=60000 # Broker的角色 # -ASYNC_MASTER 异步复制Master # -SYNC_MASTER 同步双写Master # -SLAVE brokerRole=ASYNC_MASTER # 刷盘方式 # - ASYNC_FLUSH 异步刷盘 # - SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH # checkTransactionMessageEnable=false # 发消息线程池数量 # sendMessageTreadPoolNums=128 # 拉消息线程池数量 # pullMessageTreadPoolNums=128lushDiskType=ASYNC_FLUSHH