RocketMQ Spring 集成

简介: 本文介绍如何在 Spring 框架下用消息队列 RocketMQ 收发消息。

作者:俏巴

概述

本文介绍如何在 Spring 框架下用消息队列 RocketMQ 收发消息。主要包括以下两部分内容:

  • 普通消息生产者和 Spring 集成
  • 消息消费者和 Spring 集成

测试流程

资源创建
1、管理门户创建实例、Topic及Group;
2、注意:如果程序在本地测试运行,请选择在公网区域创建。

代码测试

1、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>MavenSpringDemoMQ</groupId>
    <artifactId>MavenSpringDemoMQ</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <org.springframework.version>5.0.8.RELEASE</org.springframework.version>
    </properties>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.springframework/org.springframework.context -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>${org.springframework.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${org.springframework.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${org.springframework.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-expression</artifactId>
            <version>${org.springframework.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>${org.springframework.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-indexer</artifactId>
            <version>${org.springframework.version}</version>
        </dependency>
        <!--spring core end-->

        <!--spring aop start-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>${org.springframework.version}</version>
        </dependency>
        <!--spirng aop end-->

        <!--spring aspects start-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aspects</artifactId>
            <version>${org.springframework.version}</version>
        </dependency>
        <!--spring aspects end-->

        <!--spring instrumentation start -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-instrument</artifactId>
            <version>${org.springframework.version}</version>
        </dependency>
        <!--spring instrumentation end-->

        <!--RocketMQ jar依赖-->
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.7.9.Final</version>
        </dependency>
    </dependencies>
</project>

2、发送端配置文件producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown">
        <!-- Spring 接入方式支持 Java SDK 支持的所有配置项 -->
        <property name="properties" > <!--生产者配置信息-->
            <props>
                <prop key="AccessKey">********</prop>
                <prop key="SecretKey">********</prop>
                <prop key="NAMESRV_ADDR">http://MQ_INST_********_BaQUuiNE.mq-internet-access.mq-internet.aliyuncs.com:80</prop>
            </props>
        </property>
    </bean>
</beans>

3、发送端代码

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ProduceWithSpring {
    public static void main(String[] args) {
        /**
         * 生产者 Bean 配置在 producer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中
         */
        ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
        Producer producer = (Producer) context.getBean("producer");
        //循环发送消息
        for (int i = 0; i < 100; i++) {
            Message msg = new Message( //
                    // Message 所属的 Topic
                    "yutopic",
                    // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 的服务器过滤
                    "TagSpring",
                    // Message Body 可以是任何二进制形式的数据, 消息队列 RocketMQ 不做任何干预
                    // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
                    "Hello MQ".getBytes());
            // 设置代表消息的业务关键属性,请尽可能全局唯一
            // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发
            // 注意:不设置也不会影响消息正常收发
            msg.setKey("ORDERID_100");
            // 发送消息,只要不抛异常就是成功
            try {
                SendResult sendResult = producer.send(msg);
                assert sendResult != null;
                System.out.println("send success: " + sendResult.getMessageId());
            }catch (ONSClientException e) {
                System.out.println("发送失败");
            }
        }
        //关系producer
        producer.shutdown();
    }
}

4、消费端配置文件consumer.xml

<?xml version="1.0" encoding="UTF-8"?>

   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="msgListener" class="demo.DemoMessageListener"></bean> <!--Listener 配置-->
<!-- Group ID 订阅同一个 Topic,可以创建多个 ConsumerBean-->
<bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
    <property name="properties" > <!--消费者配置信息-->
        <props>
            <prop key="AccessKey">********</prop>
            <prop key="SecretKey">********</prop>
            <prop key="NAMESRV_ADDR">http://MQ_INST_********_BaQUuiNE.mq-internet-access.mq-internet.aliyuncs.com:80</prop>
            <prop key="GROUP_ID">GID_Spring</prop>
            <!--将消费者线程数固定为 50 个
            <prop key="ConsumeThreadNums">50</prop>
            -->
        </props>
    </property>
    <property name="subscriptionTable">
        <map>
            <entry value-ref="msgListener">
                <key>
                    <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
                        <property name="topic" value="yutopic"/>
                        <property name="expression" value="*"/><!--expression 即 Tag,可以设置成具体的 Tag,如 taga||tagb||tagc,也可设置成 *。 * 仅代表订阅所有 Tag,不支持通配-->
                    </bean>
                </key>
            </entry>
            <!--更多的订阅添加 entry 节点即可,如下所示-->
            <!--<entry value-ref="msgListener">-->
                <!--<key>-->
                    <!--<bean class="com.aliyun.openservices.ons.api.bean.Subscription">-->
                        <!--<property name="topic" value="TopicTestMQ-Other"/> &lt;!&ndash;订阅另外一个 Topic &ndash;&gt;-->
                        <!--<property name="expression" value="taga||tagb"/> &lt;!&ndash; 订阅多个 Tag &ndash;&gt;-->
                    <!--</bean>-->
                <!--</key>-->
            <!--</entry>-->
        </map>
    </property>
</bean>

5、DemoMessageListener

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
public class DemoMessageListener implements MessageListener {

public Action consume(Message message, ConsumeContext context) {
    System.out.println("Receive: " + message.getMsgID());
    try {
        //do something..
        return Action.CommitMessage;
    }catch (Exception e) {
        //消费失败
        return Action.ReconsumeLater;
    }
}
}`

6、消费端启动程序

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ConsumeWithSpring {
    public static void main(String[] args) {
        /**
         * 消费者 Bean 配置在 consumer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中
         */
        ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
        System.out.println("Consumer Started");
    }
}

测试效果

  • 发送端
    image.png
  • 消费端
    image.png

更多参考
Spring 集成

订阅关系一致

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用
|
3月前
|
消息中间件 弹性计算 Kubernetes
RabbitMQ与容器化技术的集成实践
【8月更文第28天】RabbitMQ 是一个开源消息代理和队列服务器,用于在分布式系统中存储、转发消息。随着微服务架构的普及,容器化技术(如 Docker 和 Kubernetes)成为了部署和管理应用程序的标准方式。本文将探讨如何使用 Docker 和 Kubernetes 在生产环境中部署和管理 RabbitMQ 服务,同时保证高可用性和弹性伸缩能力。
69 3
|
3月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
定时任务在企业应用中至关重要,常用于异步数据处理、自动化运维等场景。在单体应用中,利用Java的`java.util.Timer`或Spring的`@Scheduled`即可轻松实现。然而,进入微服务架构后,任务可能因多节点并发执行而重复。Spring Cloud Alibaba为此发布了Scheduling模块,提供轻量级、高可用的分布式定时任务解决方案,支持防重复执行、分片运行等功能,并可通过`spring-cloud-starter-alibaba-schedulerx`快速集成。用户可选择基于阿里云SchedulerX托管服务或采用本地开源方案(如ShedLock)
129 1
|
1月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
1月前
|
前端开发 Java 程序员
springboot 学习十五:Spring Boot 优雅的集成Swagger2、Knife4j
这篇文章是关于如何在Spring Boot项目中集成Swagger2和Knife4j来生成和美化API接口文档的详细教程。
109 1
|
1月前
|
存储 前端开发 Java
Spring Boot 集成 MinIO 与 KKFile 实现文件预览功能
本文详细介绍如何在Spring Boot项目中集成MinIO对象存储系统与KKFileView文件预览工具,实现文件上传及在线预览功能。首先搭建MinIO服务器,并在Spring Boot中配置MinIO SDK进行文件管理;接着通过KKFileView提供文件预览服务,最终实现文档管理系统的高效文件处理能力。
303 11
|
1月前
|
Java Spring
springboot 学习十一:Spring Boot 优雅的集成 Lombok
这篇文章是关于如何在Spring Boot项目中集成Lombok,以简化JavaBean的编写,避免冗余代码,并提供了相关的配置步骤和常用注解的介绍。
105 0
|
3月前
|
消息中间件 分布式计算 大数据
RabbitMQ与大数据平台的集成
【8月更文第28天】在现代的大数据处理架构中,消息队列作为数据传输的关键组件扮演着重要的角色。RabbitMQ 是一个开源的消息代理软件,它支持多种消息协议,能够为分布式系统提供可靠的消息传递服务。本篇文章将探讨如何使用 RabbitMQ 与 Hadoop 和 Spark 进行集成,以实现高效的数据处理和分析。
42 1
|
3月前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
152 4
|
3月前
|
人工智能 Java API
JeecgBoot 低代码平台快速集成 Spring AI
Spring 通过 Spring AI 项目正式启用了 AI(人工智能)生成提示功能。本文将带你了解如何在 Jeecg Boot 应用中集成生成式 AI,以及 Spring AI 如何与模型互动,包含 RAG 功能。
130 3
下一篇
无影云桌面