Spring Boot快速集成RocketMQ实战教程

简介: Spring Boot快速集成RocketMQ实战教程

前言

RocketMQ是目前主流的消息中间件之一,并且自身就支持分布式功能。最初由阿里巴巴团队开发,并且经历过双十一等海量消息场景的考验,后捐赠给Apache开源基金会,这也是为什么我们经常听说RocketMQ是阿里巴巴的消息中间件,项目却在Apache的顶级项目中。

网络上通过SpringBoot集成RocketMQ的教程很多,但大多数都无法做到快速、通用的进行集成。本篇文章带大家快速完成基于Spring Boot的集成使用,同时针对一些集成过程中的概念和使用方法以实例进行讲解。

RocketMQ的部署

关于RocketMQ的部署,通常有单Master模式、多Master模式、多Master多Slave模式(异步复制或同步双写)等。

本文重点介绍RocketMQ的集成部分,就不再这里讲解如何部署Master的部署过程,读者学习时只需部署单机模式或基于Docker部署即可。

依赖集成

首先创建一个SpringBoot项目,为了方便通过浏览器访问测试,引入web对应的starter。













<parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>2.4.0</version>    <relativePath/> </parent><dependencies>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-web</artifactId>    </dependency></dependencies>

上面的依赖以及可以完成一个基于SpringBoot的web项目了。下面需要集成RocketMQ的依赖。

在此步骤中有两个选择,一个就是直接引入RocketMQ的依赖,比如:






<dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-client</artifactId>    <version>4.3.0</version></dependency>

但此种方式需要进行大量的配置及实例化操作,并不能够达到快速集成、方便使用的目的。

这里我们采用RocketMQ官方提供的基于spring的集成。项目的源码及依赖使用位于GitHub上:https://github.com/apache/rocketmq-spring

在该项目的ReadMe中已经清晰的描述了如何引入依赖:







<!--add dependency in pom.xml--><dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-spring-boot-starter</artifactId>    <version>${RELEASE.VERSION}</version></dependency>

我们只需按照说明,引入对应的依赖即可,这里采用2.1.1版本。因此,引入依赖文件如下:






<dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-spring-boot-starter</artifactId>    <version>2.1.1</version></dependency>

引入依赖之后,剩下的就是配置文件的配置和使用了。

配置文件

我们知道,SpringBoot默认的starter内置了很多配置文件,直接通过yml文件进行配置即可使用。这里引入了rocketmq的starter,虽然并不是官方的,但使用方式基本一致。

在yml文件中配置如下参数:

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: test-group

name-server参数对应的就是部署的RocketMQ的Nameserver服务,如果有多个的话用英文分号(;)进行分割。

如果使用的是SpringBoot2.0+的框架或者是JDK10,可将name-server改成nameServer。否则,可能会出现一些奇怪的bug。

上面是简化了的最基础的配置,其他的配置均采用默认配置,如果需要定制化配置,可对具体参数按照统一形式进行配置。

生产者示例

当完成了上面的集成,生产者使用其实非常简单,只需要在使用的地方注入RocketMQTemplate对象,然后调用其对应的发送方法即可。简单示例如下:












@Componentpublic class TestSendService {
    @Resource    private RocketMQTemplate rocketMQTemplate;
    public void send() {        rocketMQTemplate.send("test-topic-1",                MessageBuilder.withPayload("Hello, World! I'm from spring message").build());    }}

但如果是在项目中,这样每次使用都注入一个RocketMQTemplate并不符合面向对象的思想,而且RocketMQTemplate还提供了多个常用的方法,比如同步、异步、直接发送等模式。

我们可以将其封装成为一个通用的Service,这样其他服务只需注入对应的Service,调用公共的方法即可,并且注明每个方法的使用场景。

抽象出来的Service接口如下:



































/** * Rocket MQ 对应服务封装 * **/public interface RocketMqService {
    /**     * 同步发送消息<br/>     * <p>     * 当发送的消息很重要是,且对响应时间不敏感的时候采用sync方式;     *     * @param mqMsg 发送消息实体类     */    void send(MqMsg mqMsg);
    /**     * 异步发送消息,异步返回消息结果<br/>     * <p>     * 当发送的消息很重要,且对响应时间非常敏感的时候采用async方式;     *     * @param mqMsg 发送消息实体类     */    void asyncSend(MqMsg mqMsg);
    /**     * 直接发送发送消息,不关心返回结果,容易消息丢失,适合日志收集、不精确统计等消息发送;<br/>     * <p>     * 当发送的消息不重要时,采用one-way方式,以提高吞吐量;     *     * @param mqMsg 发送消息实体类     */    void syncSendOrderly(MqMsg mqMsg);
}

定义了不同类型消息发送的方法,同时在注释部分说明具体方法的使用场景。其中将发送的参数封装为MqMsg对象,MqMsg的结构如下:




















public class MqMsg {
    /**     * 一级消息:消息topic     */    private String topic;
    /**     * 二级消息:消息topic对应的tags     */    private String tags;
    /**     * 消息内容     */    private String content;        // 省略getter/setter方法}

其中,topic为消息的主题,content为消息的内容,具体内容可根据生产者和消费者之间进行协定。

针对上述的接口,提供具体的方法实现:







































@Service("rocketMqService")public class RocketMqServiceImpl implements RocketMqService {
    private static final Logger log = LoggerFactory.getLogger(RocketMqServiceImpl.class);
    @Resource    private RocketMQTemplate rocketMQTemplate;
    @Override    public void send(MqMsg mqMsg) {        log.info("send发送消息到mqMsg={}", mqMsg);        rocketMQTemplate.send(mqMsg.getTopic() + ":" + mqMsg.getTags(),                MessageBuilder.withPayload(mqMsg.getContent()).build());    }
    @Override    public void asyncSend(MqMsg mqMsg) {        log.info("asyncSend发送消息到mqMsg={}", mqMsg);        rocketMQTemplate.asyncSend(mqMsg.getTopic() + ":" + mqMsg.getTags(), mqMsg.getContent(),                new SendCallback() {                    @Override                    public void onSuccess(SendResult sendResult) {                        // 成功不做日志记录或处理                    }
                    @Override                    public void onException(Throwable throwable) {                        log.info("mqMsg={}消息发送失败", mqMsg);                    }                });    }
    @Override    public void syncSendOrderly(MqMsg mqMsg) {        log.info("syncSendOrderly发送消息到mqMsg={}", mqMsg);        rocketMQTemplate.sendOneWay(mqMsg.getTopic() + ":" + mqMsg.getTags(), mqMsg.getContent());    }}

其中异步发送方法asyncSend的异步返回结果中可以根据具体的业务场景进行针对性的处理。

上述方法中均默认使用了tag对topic进行分类。如果具体的业务中不需要tag,则可对上述方法中拼接的冒号+tag部分去除。本实例使用tag进行分类,方便用到时可以借鉴。

完成了上面的封装之后,在业务使用场景中只需注入对应的Service即可,后面测试时我们会进行演示。

消费者示例

关于消费者我们可以直接实现RocketMQListener接口,然后通过@RocketMQMessageListener注解来匹配目标消息。

这里为了演示统一topic下不同的tag的使用方法,分两个消费者来进行演示,直接看代码:


















/** * 消息队列消费端使用示例 * **/@Service@RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC        , consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_REGISTERED        , selectorExpression = MqTopicConstant.DEMO_TAG_REGISTERED)public class MqRegisteredListenerDemo implements RocketMQListener<String> {
    private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class);
    @Override    public void onMessage(String message) {        log.info("received registered message: {}", message);    }}

上面的消费者监听器是针对DEMO_TAG_REGISTERED(自定义变量)这个tag来进行处理的,如果不需要tag,可去掉selectorExpression元素配置即可。@RocketMQMessageListener注解里selectorExpression默认是*,也就是接收topic下全部消息。

上面用到了常量类MqTopicConstant,主要是统一定义对应的topic和tag,内容如下:






























/** * 消息队列相关常亮配置,包括group、topic、tag * **/public class MqTopicConstant {
    /**     * 示例消息队列,topic1个     */    public static final String DEMO_TOPIC = "test-top-1";
    /**     * 注册tag     */    public static final String DEMO_TAG_REGISTERED = "registered";
    /**     * 修改tag     */    public static final String DEMO_TAG_MODIFY = "modify";
    /**     * consumer group     */    public static final String DEMO_CONSUMER_GROUP_REGISTERED = "consumer_test-top-1_registered";
    public static final String DEMO_CONSUMER_GROUP_MODIFY = "consumer_test-top-1_modify";
}

下面再来看看第二个监听“更新”功能的tag代码实现:


















/** * 消息队列消费端使用示例 * **/@Service@RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC        , consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_MODIFY        , selectorExpression = MqTopicConstant.DEMO_TAG_MODIFY)public class MqModifyListenerDemo implements RocketMQListener<String> {
    private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class);
    @Override    public void onMessage(String message) {        log.info("received modify message: {}", message);    }}

这个消费者与第一个不同的地方有两个,均在注解部分。其中topic两个是相同的,selectorExpression一个针对注册功能的tag进行过滤,一个针对修改信息的功能进行过滤。而消费组consumerGroup对应的值不能相同,否则启动时会抛出异常。

经过上述步骤,已经完成了生产者、消费者的配置。下面就同一个测试来验证服务的执行情况。

测试验证

定义一个Controller,用于外部请求,触发用户注册和修改操作的消息发送。关于测试也可以使用单元测试,但需要让线层阻塞,防止消费者还没接收到消息时,单元测试已经执行完成。































@RestController@RequestMapping("/demo")public class DemoController {
    @Resource    private RocketMqService rocketMqService;
    @GetMapping("/send")    public void send() {        MqMsg mqMsg = new MqMsg();        mqMsg.setTopic(MqTopicConstant.DEMO_TOPIC);        mqMsg.setTags(MqTopicConstant.DEMO_TAG_REGISTERED);
        // 此处可为其他VO对象,替换掉Map        Map<String, String> userInfo = new HashMap<>();        userInfo.put("username", "zhangsan");        userInfo.put("age", "12");        // 此处可封装为json等格式        mqMsg.setContent(userInfo.toString());        // 第一个发送注册消息        rocketMqService.asyncSend(mqMsg);
        mqMsg.setTags(MqTopicConstant.DEMO_TAG_MODIFY);        userInfo.put("age", "18");        // 此处可封装为json等格式        mqMsg.setContent(userInfo.toString());        // 发送修改消息        rocketMqService.asyncSend(mqMsg);    }}

在上述测试中第一部分发送了注册用户的消息,第二部分针对注册的消息进行了修改,又发送了一个消息。消息内容直接将Map转换为字符串了,在实战的过程中可根据双方协商,比如采用Json或其他序列化方法。

然后在浏览器访问:http://localhost:8080/demo/send ,触发消息的发送。

此时查看控制台,几乎在瞬间,就可以看到如下日志信息:





2020-11-24 19:31:17.900  INFO 92401 --- [nio-8080-exec-1] c.e.r.service.impl.RocketMqServiceImpl   : asyncSend发送消息到mqMsg=com.example.rocketmq.vo.MqMsg@7cda781f2020-11-24 19:31:17.906  INFO 92401 --- [nio-8080-exec-1] c.e.r.service.impl.RocketMqServiceImpl   : asyncSend发送消息到mqMsg=com.example.rocketmq.vo.MqMsg@7cda781f2020-11-24 19:31:17.942  INFO 92401 --- [MessageThread_1] c.e.r.listener.MqRegisteredListenerDemo  : received registered message: {age=12, username=zhangsan}2020-11-24 19:31:17.942  INFO 92401 --- [MessageThread_1] c.e.r.listener.MqRegisteredListenerDemo  : received modify message: {age=18, username=zhangsan}

很显然,消费者已经成功接收到消息,并且同一个topic,根据不同的tag分别进行处理。至此,一个完整是示例演示完毕。

小结

关于消息队列,还是其他很多方法都位于RocketMQTemplate当中,根据业务需要可以继续封装对应的Service。关于tag也还有不同的使用方式,大家可基于该文提供的基本框架和基本思路进一步完善。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3天前
|
Java 应用服务中间件 测试技术
深入探索Spring Boot Web应用源码及实战应用
【5月更文挑战第11天】本文将详细解析Spring Boot Web应用的源码架构,并通过一个实际案例,展示如何构建一个基于Spring Boot的Web应用。本文旨在帮助读者更好地理解Spring Boot的内部工作机制,以及如何利用这些机制优化自己的Web应用开发。
14 3
|
4天前
|
安全 Java 开发者
深入理解Spring Boot配置绑定及其实战应用
【4月更文挑战第10天】本文详细探讨了Spring Boot中配置绑定的核心概念,并结合实战示例,展示了如何在项目中有效地使用这些技术来管理和绑定配置属性。
11 1
|
15天前
|
安全 Java 测试技术
Spring Boot集成支付宝支付:概念与实战
【4月更文挑战第29天】在电子商务和在线业务应用中,集成有效且安全的支付解决方案是至关重要的。支付宝作为中国领先的支付服务提供商,其支付功能的集成可以显著提升用户体验。本篇博客将详细介绍如何在Spring Boot应用中集成支付宝支付功能,并提供一个实战示例。
36 2
|
3天前
|
NoSQL Java MongoDB
【MongoDB 专栏】MongoDB 与 Spring Boot 的集成实践
【5月更文挑战第11天】本文介绍了如何将非关系型数据库MongoDB与Spring Boot框架集成,以实现高效灵活的数据管理。Spring Boot简化了Spring应用的构建和部署,MongoDB则以其对灵活数据结构的处理能力受到青睐。集成步骤包括:添加MongoDB依赖、配置连接信息、创建数据访问对象(DAO)以及进行数据操作。通过这种方式,开发者可以充分利用两者优势,应对各种数据需求。在实际应用中,结合微服务架构等技术,可以构建高性能、可扩展的系统。掌握MongoDB与Spring Boot集成对于提升开发效率和项目质量至关重要,未来有望在更多领域得到广泛应用。
【MongoDB 专栏】MongoDB 与 Spring Boot 的集成实践
|
3天前
|
消息中间件 JSON Java
RabbitMQ的springboot项目集成使用-01
RabbitMQ的springboot项目集成使用-01
|
3天前
|
搜索推荐 Java 数据库
springboot集成ElasticSearch的具体操作(系统全文检索)
springboot集成ElasticSearch的具体操作(系统全文检索)
|
4天前
|
消息中间件 Java Spring
Springboot 集成Rabbitmq之延时队列
Springboot 集成Rabbitmq之延时队列
4 0
|
4天前
|
网络协议 Java Spring
Springboot 集成websocket
Springboot 集成websocket
8 0
|
4天前
|
Java 测试技术 API
Spring Boot 单元测试 0基础教程
Spring Boot 单元测试 0基础教程
8 0
|
5天前
|
Java Spring 容器
深入理解Spring Boot启动流程及其实战应用
【5月更文挑战第9天】本文详细解析了Spring Boot启动流程的概念和关键步骤,并结合实战示例,展示了如何在实际开发中运用这些知识。
17 2