一、概念
RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。主要功能是异步解耦和流量削峰。
二、组成部分
RocketMQ主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分。这些角色通常以集群的方式存在,RocketMQ 基于纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。
三、RocketMQ 特点
支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
在一个队列中可靠的先进先出(FIFO)和严格的顺序传递 (RocketMQ可以保证严格的消息顺序,而ActiveMQ无法保证)
支持拉(pull)和推(push)两种消息模式
pull其实就是消费者主动从MQ中去拉消息,而push则像rabbit MQ一样,是MQ给消费者推送消息。但是RocketMQ的push其实是基于pull来实现的。
它会先由一个业务代码从MQ中pull消息,然后再由业务代码push给特定的应用/消费者。其实底层就是一个pull模式
单一队列百万消息的堆积能力 (RocketMQ提供亿级消息的堆积能力,这不是重点,重点是堆积了亿级的消息后,依然保持写入低延迟)
支持多种消息协议,如 JMS、MQTT 等
分布式高可用的部署架构,满足至少一次消息传递语义(RocketMQ原生就是支持分布式的,而ActiveMQ原生存在单点性)
提供 docker 镜像用于隔离测试和云集群部署
提供配置、指标和监控等功能丰富的 Dashboard
四、部署方式
Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的BrokerId来定义,BrokerId为0表Master,非0表示Slave。Master也可以部署多个。
从物理结构上看 Broker 的集群部署方式有四种:单 Master 、多 Master 、多 Master 多Slave(同步刷盘)、多 Master多 Slave(异步刷盘)。
Producer:就是消息生产者,可以集群部署。它会先和 NameServer 集群中的随机一台建立长连接,得知当前要发送的 Topic 存在哪台 Broker Master上,然后再与其建立长连接,支持多种负载平衡模式发送消息。
Consumer:消息消费者,也可以集群部署。它也会先和 NameServer 集群中的随机一台建立长连接,得知当前要消息的 Topic 存在哪台 Broker Master、Slave上,然后它们建立长连接,支持集群消费和广播消费消息。
Broker:主要负责消息的存储、查询消费,支持主从部署,一个 Master 可以对应多个 Slave,Master 支持读写,Slave 只支持读。Broker 会向集群中的每一台 NameServer 注册自己的路由信息。
NameServer:是一个很简单的 Topic 路由注册中心,支持 Broker 的动态注册和发现,保存 Topic 和 Borker 之间的关系。通常也是集群部署,但是各 NameServer 之间不会互相通信, 各 NameServer 都有完整的路由信息,即无状态。
示意图
先启动 NameServer 集群,各 NameServer 之间无任何数据交互,Broker 启动之后会向所有 NameServer 定期(每 30s)发送心跳包,包括:IP、Port、TopicInfo,NameServer 会定期扫描 Broker 存活列表,如果超过 120s 没有心跳则移除此 Broker 相关信息,代表下线。
这样每个 NameServer 就知道集群所有 Broker 的相关信息,此时 Producer 上线从 NameServer 就可以得知它要发送的某 Topic 消息在哪个 Broker 上,和对应的 Broker (Master 角色的)建立长连接,发送消息。
Consumer 上线也可以从 NameServer 得知它所要接收的 Topic 是哪个 Broker ,和对应的 Master、Slave 建立连接,接收消息。
#!/bin/bash
systemctl stop firewalld
mkdir -p /opt/rocketmq
cd /opt
tar -xvf jdk-8u131-linux-x64.tar.gz
cat <<EOF >>/etc/profile
JAVA_HOME=/opt/jdk1.8.0_131
PATH=${JAVA_HOME}/bin:$PATH
EOF
source /etc/profile
mv rocketmq-all-5.0.0-bin-release.zip /opt/rocketmq
unzip rocketmq-all-5.0.0-bin-release.zip
cd rocketmq-all-5.0.0-bin-release
cat <<EOF >>/etc/profile
# rocketmq 安装目录
export ROCKETMQ_HOME=/opt/rocketmq/rocketmq-all-5.0.0-bin-release
# 客户端NameServer的地址
export NAMESRV_ADDR=localhost:9876
# 将 rocketmq 环境变量加入到 PATH中
export PATH=$ROCKETMQ_HOME/bin:$PATH
EOF
source /etc/profile
cd /opt/rocketmq/rocketmq-all-5.0.0-bin-release/
nohup sh bin/mqnamesrv &
# 查看是否启动成功
cat <<EOF >>/opt/rocketmq/rocketmq-all-5.0.0-bin-release/bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
EOF
cat <<EOF >>/opt/rocketmq/rocketmq-all-5.0.0-bin-release/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
EOF
nohup sh bin/mqbroker &
jps
#关闭 broker
#sh bin/mqshutdown broker
# 关闭 namesrv
#sh bin/mqshutodwn namesrv
# 发送消息
#sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
# SendResult [sendStatus=SEND_OK, msgId= ...
# 接收消息
#sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
# ConsumeMessageThread_%d Receive New Messages: [MessageExt...