SpringBoot集成RocketMq

简介: RocketMQ 是一款开源的分布式消息中间件,采用纯 Java 编写,支持事务消息、顺序消息、批量消息、定时消息及消息回溯等功能。其优势包括去除对 ZooKeeper 的依赖、支持异步和同步刷盘、高吞吐量及消息过滤等特性。RocketMQ 具备高可用性和高可靠性,适用于大规模分布式系统,能有效保障消息传输的一致性和顺序性。

介绍

RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。

RocketMQ 优势

目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,其主要优势有:

  • 去除对zk的依赖
  • 支持异步和同步两种方式刷磁盘
  • 单机支持的队列或者topic数量是5w
  • 支持消息重试
  • 支持严格按照一定的顺序发送消息
  • 支持定时发送消息
  • 支持根据消息ID来进行查询
  • 支持根据某个时间点进行消息的回溯
  • 支持对消息服务端的过滤
  • 消费并行度:顺序消费 取决于queue数量,乱序消费 取决于consumer数量

Rocketmq与其他mq的区别

特性

ActiveMQ

RabbitMQ

RocketMQ

kafka

开发语言

java

erlang

java

scala

单机吞吐量

万级

万级

10万级

10万级

时效性

ms级

us级

ms级

ms级以内

可用性

高(主从架构)

高(主从架构)

非常高(分布式架构)

非常高(分布式架构)

功能特性

成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好

基于erlang开发,所以并发能力很强,性能极其好,延时很低;管理界面较丰富

MQ功能比较完备,扩展性佳

只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。

RocketMQ 基本概念

Rocketmq架构原理

主题(Topic)

Apache RocketMQ 中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。主题通过TopicName来做唯一标识和区分。更多信息,请参见主题(Topic)

消息类型(MessageType)

Apache RocketMQ 中按照消息传输特性的不同而定义的分类,用于类型管理和安全校验。 Apache RocketMQ 支持的消息类型有普通消息、顺序消息、事务消息和定时/延时消息。

消息队列(MessageQueue)

队列是 Apache RocketMQ 中消息存储和传输的实际容器,也是消息的最小存储单元。 Apache RocketMQ 的所有主题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。队列通过QueueId来做唯一标识和区分。更多信息,请参见队列(MessageQueue)

消息(Message)

消息是 Apache RocketMQ 中的最小数据传输单元。生产者将业务数据的负载和拓展属性包装成消息发送到服务端,服务端按照相关语义将消息投递到消费端进行消费。更多信息,请参见消息(Message)

消息视图(MessageView)

消息视图是 Apache RocketMQ 面向开发视角提供的一种消息只读接口。通过消息视图可以读取消息内部的多个属性和负载信息,但是不能对消息本身做任何修改。

消息标签(MessageTag)

消息标签是Apache RocketMQ 提供的细粒度消息分类属性,可以在主题层级之下做消息类型的细分。消费者通过订阅特定的标签来实现细粒度过滤。更多信息,请参见消息过滤

消息位点(MessageQueueOffset)

消息是按到达Apache RocketMQ 服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。更多信息,请参见消费进度管理

消费位点(ConsumerOffset)

一条消息被某个消费者消费完成后不会立即从队列中删除,Apache RocketMQ 会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点。更多信息,请参见消费进度管理

消息索引(MessageKey)

消息索引是Apache RocketMQ 提供的面向消息的索引属性。通过设置的消息索引可以快速查找到对应的消息内容。

生产者(Producer)

生产者是Apache RocketMQ 系统中用来构建并传输消息到服务端的运行实体。生产者通常被集成在业务系统中,将业务消息按照要求封装成消息并发送至服务端。更多信息,请参见生产者(Producer)

事务检查器(TransactionChecker)

Apache RocketMQ 中生产者用来执行本地事务检查和异常事务恢复的监听器。事务检查器应该通过业务侧数据的状态来检查和判断事务消息的状态。更多信息,请参见事务消息

事务状态(TransactionResolution)

Apache RocketMQ 中事务消息发送过程中,事务提交的状态标识,服务端通过事务状态控制事务消息是否应该提交和投递。事务状态包括事务提交、事务回滚和事务未决。更多信息,请参见事务消息

消费者分组(ConsumerGroup)

消费者分组是Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。更多信息,请参见消费者分组(ConsumerGroup)

消费者(Consumer)

消费者是Apache RocketMQ 中用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。更多信息,请参见消费者(Consumer)

消费结果(ConsumeResult)

Apache RocketMQ 中PushConsumer消费监听器处理消息完成后返回的处理结果,用来标识本次消息是否正确处理。消费结果包含消费成功和消费失败。

订阅关系(Subscription)

订阅关系是Apache RocketMQ 系统中消费者获取消息、处理消息的规则和状态配置。订阅关系由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。更多信息,请参见订阅关系(Subscription)

消息过滤

消费者可以通过订阅指定消息标签(Tag)对消息进行过滤,确保最终只接收被过滤后的消息合集。过滤规则的计算和匹配在Apache RocketMQ 的服务端完成。更多信息,请参见消息过滤

重置消费位点

以时间轴为坐标,在消息持久化存储的时间范围内,重新设置消费者分组对已订阅主题的消费进度,设置完成后消费者将接收设定时间点之后,由生产者发送到Apache RocketMQ 服务端的消息。更多信息,请参见重置消费位点

消息轨迹

在一条消息从生产者发出到消费者接收并处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从生产者发出,经由Apache RocketMQ 服务端,投递给消费者的完整链路,方便定位排查问题。

消息堆积

生产者已经将消息发送到Apache RocketMQ 的服务端,但由于消费者的消费能力有限,未能在短时间内将所有消息正确消费掉,此时在服务端保存着未被消费的消息,该状态即消息堆积。

事务消息

事务消息是Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

定时/延时消息

定时/延时消息是Apache RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。

顺序消息

顺序消息是Apache RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。

RocketMQ环境搭建

安装NameServer

搜索/拉取镜像

docker pull rocketmqinc/rocketmq

创建namesrv数据存储路径

mkdir -p /docker/rocketmq/nameserver/logs /docker/rocketmq/nameserver/store

构建namesrv容器

docker run -d \
--restart=always \
--name rmqnamesrv \
-p 9876:9876 \
-v /docker/rocketmq/data/namesrv/logs:/root/logs \
-v /docker/rocketmq/data/namesrv/store:/root/store \
-e "MAX_POSSIBLE_HEAP=100000000" \
rocketmqinc/rocketmq \
sh mqnamesrv

参数说明

-d

以守护进程的方式启动

- -restart=always

docker重启时候容器自动重启

- -name rmqnamesrv

把容器的名字设置为rmqnamesrv

-p 9876:9876

把容器内的端口9876挂载到宿主机9876上面

-v /docker/rocketmq/nameserver/logs:/root/logs

目录挂载

-v /docker/rocketmq/nameserver/store

目录挂载

rmqnamesrv

容器的名字

-e “MAX_POSSIBLE_HEAP=100000000”

设置容器的最大堆内存为100000000

rocketmqinc/rocketmq

使用的镜像名称

sh mqnamesrv

启动namesrv服务

安装broker

创建broker.conf配置文件,我的目录是/docker/rocketmq/conf/broker.conf,文件内容如下

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1 = 主机的IP

构建broker容器

docker run -d  \
--restart=always \
--name rmqbroker \
--link rmqnamesrv:namesrv \
-p 10911:10911 \
-p 10909:10909 \
-v  /docker/rocketmq/data/broker/logs:/root/logs \
-v  /docker/rocketmq/data/broker/store:/root/store \
-v /docker/rocketmq/conf/broker.conf:/docker/rocketmq/conf/broker.conf \
-e "NAMESRV_ADDR=namesrv:9876" \
-e "MAX_POSSIBLE_HEAP=200000000" \
rocketmqinc/rocketmq \
sh mqbroker -c /docker/rocketmq/conf/broker.conf

参数说明

-d

以守护进程的方式启动

–restart=always

docker重启时候镜像自动重启

- -name rmqbroker

把容器的名字设置为rmqbroker

- --link rmqnamesrv:namesrv

和rmqnamesrv容器通信

-p 10911:10911

把容器的非vip通道端口挂载到宿主机

-p 10909:10909

把容器的vip通道端口挂载到宿主机

-e “NAMESRV_ADDR=namesrv:9876”

指定namesrv的地址为本机namesrv的ip地址:9876

-e “MAX_POSSIBLE_HEAP=200000000” rocketmqinc/rocketmq sh mqbroker

指定broker服务的最大堆内存

rocketmqinc/rocketmq

使用的镜像名称

sh mqbroker -c /docker/rocketmq/conf/broker.conf

指定配置文件启动broker节点

创建rockermq-console服务

拉取镜像

docker pull pangliang/rocketmq-console-ng

构建rockermq-console容器

docker run -d \
--restart=always \
--name rmqadmin \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=81.70.117.188:9876 \
-Dcom.rocketmq.sendMessageWithVIPChannel=false" \
-p 9999:8080 \
pangliang/rocketmq-console-ng

参数说明

-d

以守护进程的方式启动

–restart=always

docker重启时候镜像自动重启

- -name rmqadmin

把容器的名字设置为rmqadmin

-e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.52.136:9876

设置namesrv服务的ip地址

-Dcom.rocketmq.sendMessageWithVIPChannel=false

不使用vip通道发送消息

–p 9900:8080

上的9999端口

ip:9900 访问出现如下界面

SpringBoot集成RocketMQ

maven依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

配置文件配置RocketMQ

# RocketMQ配置
rocketmq:
  name-server: 81.70.117.188:9876
  producer:
    group: tiger-group

生产者

单向发送

/**
 * @Author iron.guo
 * @Date 2023/1/11
 * @Description RocketMq生产者
 */
@AllArgsConstructor
@RestController
@Slf4j
@RequestMapping("/rocketmq")
public class ProducerController {
    private final RocketMQTemplate rocketService;
    /**
     * 普通消息投递  单向发送
     * @return
     */
    @GetMapping("sendRocketMsg")
    public ResponseResult<Boolean> sendRocketMsg(){
        rocketService.convertAndSend("topic-tiger","rocketmq message"+ UUID.randomUUID());
        return ResponseResult.Success();
    }
}

同步发送

/**
     * 同步发送
     *
     * @throws Exception
     */
    @GetMapping("/sync")
    public void sync() {
        String msg= "sync send rocketmq message"+ UUID.fastUUID();
        SendResult sendResult = rocketService.syncSend("topic-tiger", msg);
        log.info("同步发送字符串{}, 发送结果{}", msg, sendResult);
    }

异步发送

/**
     * 异步发送
     *
     * @throws Exception
     */
    @GetMapping("async")
    public void async() {
        String msg= "async send rocketmq message"+ UUID.fastUUID();
        rocketService.asyncSend("topic-tiger", msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult var1) {
                log.info("异步发送成功{}", var1);
            }
            @Override
            public void onException(Throwable var1) {
                log.info("异步发送失败{}", var1);
            }
        });
    }
相关文章
|
4天前
|
人工智能 运维 安全
|
2天前
|
人工智能 异构计算
敬请锁定《C位面对面》,洞察通用计算如何在AI时代持续赋能企业创新,助力业务发展!
敬请锁定《C位面对面》,洞察通用计算如何在AI时代持续赋能企业创新,助力业务发展!
|
9天前
|
人工智能 JavaScript 测试技术
Qwen3-Coder入门教程|10分钟搞定安装配置
Qwen3-Coder 挑战赛简介:无论你是编程小白还是办公达人,都能通过本教程快速上手 Qwen-Code CLI,利用 AI 轻松实现代码编写、文档处理等任务。内容涵盖 API 配置、CLI 安装及多种实用案例,助你提升效率,体验智能编码的乐趣。
823 109
|
3天前
|
机器学习/深度学习 人工智能 自然语言处理
B站开源IndexTTS2,用极致表现力颠覆听觉体验
在语音合成技术不断演进的背景下,早期版本的IndexTTS虽然在多场景应用中展现出良好的表现,但在情感表达的细腻度与时长控制的精准性方面仍存在提升空间。为了解决这些问题,并进一步推动零样本语音合成在实际场景中的落地能力,B站语音团队对模型架构与训练策略进行了深度优化,推出了全新一代语音合成模型——IndexTTS2 。
420 9
|
3天前
|
人工智能 测试技术 API
智能体(AI Agent)搭建全攻略:从概念到实践的终极指南
在人工智能浪潮中,智能体(AI Agent)正成为变革性技术。它们具备自主决策、环境感知、任务执行等能力,广泛应用于日常任务与商业流程。本文详解智能体概念、架构及七步搭建指南,助你打造专属智能体,迎接智能自动化新时代。
|
4天前
|
机器学习/深度学习 传感器 算法
Edge Impulse:面向微型机器学习的MLOps平台——论文解读
Edge Impulse 是一个面向微型机器学习(TinyML)的云端MLOps平台,致力于解决嵌入式与边缘设备上机器学习开发的碎片化与异构性难题。它提供端到端工具链,涵盖数据采集、信号处理、模型训练、优化压缩及部署全流程,支持资源受限设备的高效AI实现。平台集成AutoML、量化压缩与跨硬件编译技术,显著提升开发效率与模型性能,广泛应用于物联网、可穿戴设备与边缘智能场景。
188 127