springboot 如何优雅的使用 alibaba rocketmq 消息队列

简介: springboot 如何优雅的使用 alibaba rocketmq 消息队列
文章保证,一路执行下去不会报错,报错你找我。如果觉得有用,希望屏幕前的大佬,点赞➕关注

只有想到的方面多时,才能在够实际运用中发现更多的问题,这也是作者给读者最好的礼物

使用这篇文章 docker 安装 rocketmq,肯定没问题

<br/>

新建 springboot 项目


新建一个 springboot 项目,pom 文件如下,rocketmq 使用 apache 封装的 jar

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.11.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cn.machen.study</groupId>
    <artifactId>cloud-alibaba-rocketmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <modelVersion>4.0.0</modelVersion>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

</project>

<br/>

创建配置管理类


我们 nameserver 直接使用本地安装的就可以,topic 是 rocketmq 服务端默认就会创建的

public class RmqConfig {
    /**
     * Name Server 地址,因为是集群部署 所以有多个用 分号 隔开
     */
    public static final String NAME_SERVER = "localhost:9876";
    /**
     * 主题名称 主题一般是服务器设置好 而不能在代码里去新建topic( 如果没有创建好,生产者往该主题发送消息 会报找不到topic错误)
     */
    public static final String TOPIC = "TestTopic";
}

<br/>

Producer


新建消息生产端 Producer,负责生产消息

import lombok.Getter;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;

@Getter
@Component
public class Producer {
    /**
     * 生产者组
     */
    private static final String producerGroup = "test_producer";
    private DefaultMQProducer producer;

    public Producer() throws MQClientException {
        producerInit().start();
    }

    private DefaultMQProducer producerInit() {
        producer = new DefaultMQProducer(producerGroup);
        // 不开启 vip 通道 开通口端口会减 2
        producer.setVipChannelEnabled(false);
        // 绑定 name server
        producer.setNamesrvAddr(RmqConfig.NAME_SERVER);
        return producer;
    }
}

<br/>

Consumer


建立消息消费端 Consumer,负责消费消息

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

@Slf4j
@Component
public class Consumer {
    /**
     * 消费者组
     */
    private static final String CONSUMER_GROUP = "test_consumer";

    public Consumer() throws MQClientException {
        consumerInit().start();
    }

    private DefaultMQPushConsumer consumerInit() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr(RmqConfig.NAME_SERVER);
        // 消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 订阅主题和 标签( * 代表所有标签)下信息
        consumer.subscribe(RmqConfig.TOPIC, "*");
        // 注册消费的监听 并在此监听中消费信息, 并返回消费的状态信息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            // msgs 中只收集同一个topic, 同一个 tag, 并且 key 相同的message
            // 会把不同的消息分别放置到不同的队列中
            try {
                for (Message msg : msgs) {
                    // 消费者获取消息 这里只输出 不做后面逻辑处理
                    String body = new String(msg.getBody(), "utf-8");
                    log.info("  >>> [消费端] 获取消息主题 topic :: {}, 消费消息 body :: {} ", msg.getTopic(), body);
                }
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        return consumer;
    }
}

<br/>

测试 Controller


创建对应 Controller 测试类,生产 100 条消息由消费者执行,最终把返回信息进行打印

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class Controller {

    @Autowired
    private Producer producer;

    @RequestMapping("/rocketmq/send")
    public Object callback() throws Exception {
        for (int i = 0; i < 100; i++) {
            // 创建生产信息
            Message message = new Message(RmqConfig.TOPIC, "testTag", (String.format("[生产者] 发送第 【%d】 次 mq 消息", i)).getBytes());
            // 发送消息
            SendResult sendResult = producer.getProducer().send(message);
            log.info("  >>> [生产者] 发送消息返回对象 :: {}", sendResult);
        }

        return "执行成功";
    }
}

<br/>

截取部分日志,输出打印如下:

2020-12-12 15:31:44.718  INFO 45296 --- [essageThread_11] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【90】 次 mq 消息 
2020-12-12 15:31:44.720  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDCD005B, offsetMsgId=C0A8010200002A9F000000000027CF47, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=0], queueOffset=3060]
2020-12-12 15:31:44.721  INFO 45296 --- [essageThread_12] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【91】 次 mq 消息 
2020-12-12 15:31:44.722  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDD0005C, offsetMsgId=C0A8010200002A9F000000000027D018, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=1], queueOffset=3062]
2020-12-12 15:31:44.724  INFO 45296 --- [essageThread_13] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【92】 次 mq 消息 
2020-12-12 15:31:44.725  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDD3005D, offsetMsgId=C0A8010200002A9F000000000027D0E9, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=2], queueOffset=3059]
2020-12-12 15:31:44.726  INFO 45296 --- [essageThread_14] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【93】 次 mq 消息 
2020-12-12 15:31:44.728  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDD5005E, offsetMsgId=C0A8010200002A9F000000000027D1BA, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=3], queueOffset=3058]
2020-12-12 15:31:44.729  INFO 45296 --- [essageThread_15] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【94】 次 mq 消息 
2020-12-12 15:31:44.731  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDD8005F, offsetMsgId=C0A8010200002A9F000000000027D28B, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=0], queueOffset=3061]
2020-12-12 15:31:44.732  INFO 45296 --- [essageThread_16] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【95】 次 mq 消息 
2020-12-12 15:31:44.734  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDDB0060, offsetMsgId=C0A8010200002A9F000000000027D35C, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=1], queueOffset=3063]
2020-12-12 15:31:44.735  INFO 45296 --- [essageThread_17] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【96】 次 mq 消息 
2020-12-12 15:31:44.737  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDDE0061, offsetMsgId=C0A8010200002A9F000000000027D42D, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=2], queueOffset=3060]
2020-12-12 15:31:44.738  INFO 45296 --- [essageThread_18] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【97】 次 mq 消息 
2020-12-12 15:31:44.740  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDE10062, offsetMsgId=C0A8010200002A9F000000000027D4FE, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=3], queueOffset=3059]
2020-12-12 15:31:44.741  INFO 45296 --- [essageThread_19] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【98】 次 mq 消息 
2020-12-12 15:31:44.742  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDE40063, offsetMsgId=C0A8010200002A9F000000000027D5CF, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=0], queueOffset=3062]
2020-12-12 15:31:44.745  INFO 45296 --- [essageThread_20] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【99】 次 mq 消息 

<br/>

根据文章这么一套走下来,可以说是玩过 rocketmq 了,后面就需要更深入去了解 rocketmq 特性以及具体使用了

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
42 6
|
3月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
220 9
|
3月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
116 10
|
2月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
3月前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
3月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
3月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
83 4
|
4月前
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
100 16
|
4月前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
86 9
|
4月前
|
消息中间件 弹性计算 运维
云消息队列RabbitMQ实践
本评测报告详细分析了阿里云云消息队列 RabbitMQ 版的实践原理、部署体验及核心优势。报告认为其在解决消息积压、脑裂难题及弹性伸缩方面表现优秀,但建议进一步细化架构优化策略和技术细节描述。部署文档详尽,对初学者友好,但仍需加强网络配置和版本兼容性说明。实际部署展示了其高可用性和成本优化能力,适用于高并发消息处理和分布式系统数据同步。为进一步提升方案,建议增加安全性配置指导、性能调优建议及监控告警系统设置。

相关产品

  • 云消息队列 MQ