介绍
RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
RocketMQ 优势
目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,其主要优势有:
- 去除对zk的依赖
- 支持异步和同步两种方式刷磁盘
- 单机支持的队列或者topic数量是5w
- 支持消息重试
- 支持严格按照一定的顺序发送消息
- 支持定时发送消息
- 支持根据消息ID来进行查询
- 支持根据某个时间点进行消息的回溯
- 支持对消息服务端的过滤
- 消费并行度:顺序消费 取决于queue数量,乱序消费 取决于consumer数量
Rocketmq与其他mq的区别
特性 |
ActiveMQ |
RabbitMQ |
RocketMQ |
kafka |
开发语言 |
java |
erlang |
java |
scala |
单机吞吐量 |
万级 |
万级 |
10万级 |
10万级 |
时效性 |
ms级 |
us级 |
ms级 |
ms级以内 |
可用性 |
高(主从架构) |
高(主从架构) |
非常高(分布式架构) |
非常高(分布式架构) |
功能特性 |
成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好 |
基于erlang开发,所以并发能力很强,性能极其好,延时很低;管理界面较丰富 |
MQ功能比较完备,扩展性佳 |
只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。 |
RocketMQ 基本概念
Rocketmq架构原理
主题(Topic)
Apache RocketMQ 中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。主题通过TopicName来做唯一标识和区分。更多信息,请参见主题(Topic)。
消息类型(MessageType)
Apache RocketMQ 中按照消息传输特性的不同而定义的分类,用于类型管理和安全校验。 Apache RocketMQ 支持的消息类型有普通消息、顺序消息、事务消息和定时/延时消息。
消息队列(MessageQueue)
队列是 Apache RocketMQ 中消息存储和传输的实际容器,也是消息的最小存储单元。 Apache RocketMQ 的所有主题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。队列通过QueueId来做唯一标识和区分。更多信息,请参见队列(MessageQueue)。
消息(Message)
消息是 Apache RocketMQ 中的最小数据传输单元。生产者将业务数据的负载和拓展属性包装成消息发送到服务端,服务端按照相关语义将消息投递到消费端进行消费。更多信息,请参见消息(Message)。
消息视图(MessageView)
消息视图是 Apache RocketMQ 面向开发视角提供的一种消息只读接口。通过消息视图可以读取消息内部的多个属性和负载信息,但是不能对消息本身做任何修改。
消息标签(MessageTag)
消息标签是Apache RocketMQ 提供的细粒度消息分类属性,可以在主题层级之下做消息类型的细分。消费者通过订阅特定的标签来实现细粒度过滤。更多信息,请参见消息过滤。
消息位点(MessageQueueOffset)
消息是按到达Apache RocketMQ 服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。更多信息,请参见消费进度管理。
消费位点(ConsumerOffset)
一条消息被某个消费者消费完成后不会立即从队列中删除,Apache RocketMQ 会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点。更多信息,请参见消费进度管理。
消息索引(MessageKey)
消息索引是Apache RocketMQ 提供的面向消息的索引属性。通过设置的消息索引可以快速查找到对应的消息内容。
生产者(Producer)
生产者是Apache RocketMQ 系统中用来构建并传输消息到服务端的运行实体。生产者通常被集成在业务系统中,将业务消息按照要求封装成消息并发送至服务端。更多信息,请参见生产者(Producer)。
事务检查器(TransactionChecker)
Apache RocketMQ 中生产者用来执行本地事务检查和异常事务恢复的监听器。事务检查器应该通过业务侧数据的状态来检查和判断事务消息的状态。更多信息,请参见事务消息。
事务状态(TransactionResolution)
Apache RocketMQ 中事务消息发送过程中,事务提交的状态标识,服务端通过事务状态控制事务消息是否应该提交和投递。事务状态包括事务提交、事务回滚和事务未决。更多信息,请参见事务消息。
消费者分组(ConsumerGroup)
消费者分组是Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。更多信息,请参见消费者分组(ConsumerGroup)。
消费者(Consumer)
消费者是Apache RocketMQ 中用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。更多信息,请参见消费者(Consumer)。
消费结果(ConsumeResult)
Apache RocketMQ 中PushConsumer消费监听器处理消息完成后返回的处理结果,用来标识本次消息是否正确处理。消费结果包含消费成功和消费失败。
订阅关系(Subscription)
订阅关系是Apache RocketMQ 系统中消费者获取消息、处理消息的规则和状态配置。订阅关系由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。更多信息,请参见订阅关系(Subscription)。
消息过滤
消费者可以通过订阅指定消息标签(Tag)对消息进行过滤,确保最终只接收被过滤后的消息合集。过滤规则的计算和匹配在Apache RocketMQ 的服务端完成。更多信息,请参见消息过滤。
重置消费位点
以时间轴为坐标,在消息持久化存储的时间范围内,重新设置消费者分组对已订阅主题的消费进度,设置完成后消费者将接收设定时间点之后,由生产者发送到Apache RocketMQ 服务端的消息。更多信息,请参见重置消费位点。
消息轨迹
在一条消息从生产者发出到消费者接收并处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从生产者发出,经由Apache RocketMQ 服务端,投递给消费者的完整链路,方便定位排查问题。
消息堆积
生产者已经将消息发送到Apache RocketMQ 的服务端,但由于消费者的消费能力有限,未能在短时间内将所有消息正确消费掉,此时在服务端保存着未被消费的消息,该状态即消息堆积。
事务消息
事务消息是Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
定时/延时消息
定时/延时消息是Apache RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。
顺序消息
顺序消息是Apache RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。
RocketMQ环境搭建
安装NameServer
搜索/拉取镜像
docker pull rocketmqinc/rocketmq
创建namesrv数据存储路径
mkdir -p /docker/rocketmq/nameserver/logs /docker/rocketmq/nameserver/store
构建namesrv容器
docker run -d \ --restart=always \ --name rmqnamesrv \ -p 9876:9876 \ -v /docker/rocketmq/data/namesrv/logs:/root/logs \ -v /docker/rocketmq/data/namesrv/store:/root/store \ -e "MAX_POSSIBLE_HEAP=100000000" \ rocketmqinc/rocketmq \ sh mqnamesrv
参数说明
-d |
以守护进程的方式启动 |
- -restart=always |
docker重启时候容器自动重启 |
- -name rmqnamesrv |
把容器的名字设置为rmqnamesrv |
-p 9876:9876 |
把容器内的端口9876挂载到宿主机9876上面 |
-v /docker/rocketmq/nameserver/logs:/root/logs |
目录挂载 |
-v /docker/rocketmq/nameserver/store |
目录挂载 |
rmqnamesrv |
容器的名字 |
-e “MAX_POSSIBLE_HEAP=100000000” |
设置容器的最大堆内存为100000000 |
rocketmqinc/rocketmq |
使用的镜像名称 |
sh mqnamesrv |
启动namesrv服务 |
安装broker
创建broker.conf配置文件,我的目录是/docker/rocketmq/conf/broker.conf,文件内容如下
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH brokerIP1 = 主机的IP
构建broker容器
docker run -d \ --restart=always \ --name rmqbroker \ --link rmqnamesrv:namesrv \ -p 10911:10911 \ -p 10909:10909 \ -v /docker/rocketmq/data/broker/logs:/root/logs \ -v /docker/rocketmq/data/broker/store:/root/store \ -v /docker/rocketmq/conf/broker.conf:/docker/rocketmq/conf/broker.conf \ -e "NAMESRV_ADDR=namesrv:9876" \ -e "MAX_POSSIBLE_HEAP=200000000" \ rocketmqinc/rocketmq \ sh mqbroker -c /docker/rocketmq/conf/broker.conf
参数说明
-d |
以守护进程的方式启动 |
–restart=always |
docker重启时候镜像自动重启 |
- -name rmqbroker |
把容器的名字设置为rmqbroker |
- --link rmqnamesrv:namesrv |
和rmqnamesrv容器通信 |
-p 10911:10911 |
把容器的非vip通道端口挂载到宿主机 |
-p 10909:10909 |
把容器的vip通道端口挂载到宿主机 |
-e “NAMESRV_ADDR=namesrv:9876” |
指定namesrv的地址为本机namesrv的ip地址:9876 |
-e “MAX_POSSIBLE_HEAP=200000000” rocketmqinc/rocketmq sh mqbroker |
指定broker服务的最大堆内存 |
rocketmqinc/rocketmq |
使用的镜像名称 |
sh mqbroker -c /docker/rocketmq/conf/broker.conf |
指定配置文件启动broker节点 |
创建rockermq-console服务
拉取镜像
docker pull pangliang/rocketmq-console-ng
构建rockermq-console容器
docker run -d \ --restart=always \ --name rmqadmin \ -e "JAVA_OPTS=-Drocketmq.namesrv.addr=81.70.117.188:9876 \ -Dcom.rocketmq.sendMessageWithVIPChannel=false" \ -p 9999:8080 \ pangliang/rocketmq-console-ng
参数说明
-d |
以守护进程的方式启动 |
–restart=always |
docker重启时候镜像自动重启 |
- -name rmqadmin |
把容器的名字设置为rmqadmin |
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.52.136:9876 |
设置namesrv服务的ip地址 |
-Dcom.rocketmq.sendMessageWithVIPChannel=false |
不使用vip通道发送消息 |
–p 9900:8080 |
上的9999端口 |
ip:9900 访问出现如下界面
SpringBoot集成RocketMQ
maven依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.2</version> </dependency>
配置文件配置RocketMQ
# RocketMQ配置 rocketmq: name-server: 81.70.117.188:9876 producer: group: tiger-group
生产者
单向发送
/** * @Author iron.guo * @Date 2023/1/11 * @Description RocketMq生产者 */ @AllArgsConstructor @RestController @Slf4j @RequestMapping("/rocketmq") public class ProducerController { private final RocketMQTemplate rocketService; /** * 普通消息投递 单向发送 * @return */ @GetMapping("sendRocketMsg") public ResponseResult<Boolean> sendRocketMsg(){ rocketService.convertAndSend("topic-tiger","rocketmq message"+ UUID.randomUUID()); return ResponseResult.Success(); } }
同步发送
/** * 同步发送 * * @throws Exception */ @GetMapping("/sync") public void sync() { String msg= "sync send rocketmq message"+ UUID.fastUUID(); SendResult sendResult = rocketService.syncSend("topic-tiger", msg); log.info("同步发送字符串{}, 发送结果{}", msg, sendResult); }
异步发送
/** * 异步发送 * * @throws Exception */ @GetMapping("async") public void async() { String msg= "async send rocketmq message"+ UUID.fastUUID(); rocketService.asyncSend("topic-tiger", msg, new SendCallback() { @Override public void onSuccess(SendResult var1) { log.info("异步发送成功{}", var1); } @Override public void onException(Throwable var1) { log.info("异步发送失败{}", var1); } }); }