springboot 如何优雅的使用 alibaba rocketmq 消息队列

简介: springboot 如何优雅的使用 alibaba rocketmq 消息队列
文章保证,一路执行下去不会报错,报错你找我。如果觉得有用,希望屏幕前的大佬,点赞➕关注

只有想到的方面多时,才能在够实际运用中发现更多的问题,这也是作者给读者最好的礼物

使用这篇文章 docker 安装 rocketmq,肯定没问题

<br/>

新建 springboot 项目


新建一个 springboot 项目,pom 文件如下,rocketmq 使用 apache 封装的 jar

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.11.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cn.machen.study</groupId>
    <artifactId>cloud-alibaba-rocketmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <modelVersion>4.0.0</modelVersion>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

</project>

<br/>

创建配置管理类


我们 nameserver 直接使用本地安装的就可以,topic 是 rocketmq 服务端默认就会创建的

public class RmqConfig {
    /**
     * Name Server 地址,因为是集群部署 所以有多个用 分号 隔开
     */
    public static final String NAME_SERVER = "localhost:9876";
    /**
     * 主题名称 主题一般是服务器设置好 而不能在代码里去新建topic( 如果没有创建好,生产者往该主题发送消息 会报找不到topic错误)
     */
    public static final String TOPIC = "TestTopic";
}

<br/>

Producer


新建消息生产端 Producer,负责生产消息

import lombok.Getter;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;

@Getter
@Component
public class Producer {
    /**
     * 生产者组
     */
    private static final String producerGroup = "test_producer";
    private DefaultMQProducer producer;

    public Producer() throws MQClientException {
        producerInit().start();
    }

    private DefaultMQProducer producerInit() {
        producer = new DefaultMQProducer(producerGroup);
        // 不开启 vip 通道 开通口端口会减 2
        producer.setVipChannelEnabled(false);
        // 绑定 name server
        producer.setNamesrvAddr(RmqConfig.NAME_SERVER);
        return producer;
    }
}

<br/>

Consumer


建立消息消费端 Consumer,负责消费消息

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

@Slf4j
@Component
public class Consumer {
    /**
     * 消费者组
     */
    private static final String CONSUMER_GROUP = "test_consumer";

    public Consumer() throws MQClientException {
        consumerInit().start();
    }

    private DefaultMQPushConsumer consumerInit() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr(RmqConfig.NAME_SERVER);
        // 消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 订阅主题和 标签( * 代表所有标签)下信息
        consumer.subscribe(RmqConfig.TOPIC, "*");
        // 注册消费的监听 并在此监听中消费信息, 并返回消费的状态信息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            // msgs 中只收集同一个topic, 同一个 tag, 并且 key 相同的message
            // 会把不同的消息分别放置到不同的队列中
            try {
                for (Message msg : msgs) {
                    // 消费者获取消息 这里只输出 不做后面逻辑处理
                    String body = new String(msg.getBody(), "utf-8");
                    log.info("  >>> [消费端] 获取消息主题 topic :: {}, 消费消息 body :: {} ", msg.getTopic(), body);
                }
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        return consumer;
    }
}

<br/>

测试 Controller


创建对应 Controller 测试类,生产 100 条消息由消费者执行,最终把返回信息进行打印

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class Controller {

    @Autowired
    private Producer producer;

    @RequestMapping("/rocketmq/send")
    public Object callback() throws Exception {
        for (int i = 0; i < 100; i++) {
            // 创建生产信息
            Message message = new Message(RmqConfig.TOPIC, "testTag", (String.format("[生产者] 发送第 【%d】 次 mq 消息", i)).getBytes());
            // 发送消息
            SendResult sendResult = producer.getProducer().send(message);
            log.info("  >>> [生产者] 发送消息返回对象 :: {}", sendResult);
        }

        return "执行成功";
    }
}

<br/>

截取部分日志,输出打印如下:

2020-12-12 15:31:44.718  INFO 45296 --- [essageThread_11] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【90】 次 mq 消息 
2020-12-12 15:31:44.720  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDCD005B, offsetMsgId=C0A8010200002A9F000000000027CF47, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=0], queueOffset=3060]
2020-12-12 15:31:44.721  INFO 45296 --- [essageThread_12] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【91】 次 mq 消息 
2020-12-12 15:31:44.722  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDD0005C, offsetMsgId=C0A8010200002A9F000000000027D018, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=1], queueOffset=3062]
2020-12-12 15:31:44.724  INFO 45296 --- [essageThread_13] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【92】 次 mq 消息 
2020-12-12 15:31:44.725  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDD3005D, offsetMsgId=C0A8010200002A9F000000000027D0E9, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=2], queueOffset=3059]
2020-12-12 15:31:44.726  INFO 45296 --- [essageThread_14] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【93】 次 mq 消息 
2020-12-12 15:31:44.728  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDD5005E, offsetMsgId=C0A8010200002A9F000000000027D1BA, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=3], queueOffset=3058]
2020-12-12 15:31:44.729  INFO 45296 --- [essageThread_15] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【94】 次 mq 消息 
2020-12-12 15:31:44.731  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDD8005F, offsetMsgId=C0A8010200002A9F000000000027D28B, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=0], queueOffset=3061]
2020-12-12 15:31:44.732  INFO 45296 --- [essageThread_16] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【95】 次 mq 消息 
2020-12-12 15:31:44.734  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDDB0060, offsetMsgId=C0A8010200002A9F000000000027D35C, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=1], queueOffset=3063]
2020-12-12 15:31:44.735  INFO 45296 --- [essageThread_17] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【96】 次 mq 消息 
2020-12-12 15:31:44.737  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDDE0061, offsetMsgId=C0A8010200002A9F000000000027D42D, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=2], queueOffset=3060]
2020-12-12 15:31:44.738  INFO 45296 --- [essageThread_18] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【97】 次 mq 消息 
2020-12-12 15:31:44.740  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDE10062, offsetMsgId=C0A8010200002A9F000000000027D4FE, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=3], queueOffset=3059]
2020-12-12 15:31:44.741  INFO 45296 --- [essageThread_19] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【98】 次 mq 消息 
2020-12-12 15:31:44.742  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDE40063, offsetMsgId=C0A8010200002A9F000000000027D5CF, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=0], queueOffset=3062]
2020-12-12 15:31:44.745  INFO 45296 --- [essageThread_20] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【99】 次 mq 消息 

<br/>

根据文章这么一套走下来,可以说是玩过 rocketmq 了,后面就需要更深入去了解 rocketmq 特性以及具体使用了

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 Java RocketMQ
消息队列 MQ产品使用合集之当SpringBoot应用因网络不通而启动失败时,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
缓存 NoSQL Java
SpringBoot的三种缓存技术(Spring Cache、Layering Cache 框架、Alibaba JetCache 框架)
Spring Cache 是 Spring 提供的简易缓存方案,支持本地与 Redis 缓存。通过添加 `spring-boot-starter-data-redis` 和 `spring-boot-starter-cache` 依赖,并使用 `@EnableCaching` 开启缓存功能。JetCache 由阿里开源,功能更丰富,支持多级缓存和异步 API,通过引入 `jetcache-starter-redis` 依赖并配置 YAML 文件启用。Layering Cache 则提供分层缓存机制,需引入 `layering-cache-starter` 依赖并使用特定注解实现缓存逻辑。
996 1
SpringBoot的三种缓存技术(Spring Cache、Layering Cache 框架、Alibaba JetCache 框架)
|
3月前
|
消息中间件 存储 Java
【揭秘】RocketMQ内部运作大揭秘:一探究竟,原来消息队列是这样工作的!
【8月更文挑战第19天】RocketMQ是一款高性能、高可用的消息中间件,在分布式系统中至关重要。它采用发布/订阅模式,支持高吞吐量的消息传递。核心组件包括管理元数据的NameServer、存储消息的Broker以及Producer和Consumer。RocketMQ支持发布/订阅与点对点两种模型,并具备复杂的消息持久化和路由机制。通过Java API示例,可轻松实现消息的发送与接收。RocketMQ凭借其出色的特性和可靠性,成为大型分布式系统首选的消息解决方案。
73 5
|
3月前
|
Java 微服务 Spring
SpringBoot+Vue+Spring Cloud Alibaba 实现大型电商系统【分布式微服务实现】
文章介绍了如何利用Spring Cloud Alibaba快速构建大型电商系统的分布式微服务,包括服务限流降级等主要功能的实现,并通过注解和配置简化了Spring Cloud应用的接入和搭建过程。
SpringBoot+Vue+Spring Cloud Alibaba 实现大型电商系统【分布式微服务实现】
|
3月前
|
消息中间件 Java Maven
|
4月前
|
消息中间件 缓存 IDE
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
|
4月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
325 1
|
4月前
|
消息中间件 存储 Java
消息队列 MQ使用问题之如何将RocketMQ中某个集群的topic迁移到另一个集群
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 存储 RocketMQ
MetaQ/RocketMQ 原理问题之在解耦场景中,消息队列工作的问题如何解决
MetaQ/RocketMQ 原理问题之在解耦场景中,消息队列工作的问题如何解决
|
4月前
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成

相关产品

  • 云消息队列 MQ
  • 下一篇
    无影云桌面