springboot 如何优雅的使用 alibaba rocketmq 消息队列

简介: springboot 如何优雅的使用 alibaba rocketmq 消息队列
文章保证,一路执行下去不会报错,报错你找我。如果觉得有用,希望屏幕前的大佬,点赞➕关注

只有想到的方面多时,才能在够实际运用中发现更多的问题,这也是作者给读者最好的礼物

使用这篇文章 docker 安装 rocketmq,肯定没问题

<br/>

新建 springboot 项目


新建一个 springboot 项目,pom 文件如下,rocketmq 使用 apache 封装的 jar

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.11.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cn.machen.study</groupId>
    <artifactId>cloud-alibaba-rocketmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <modelVersion>4.0.0</modelVersion>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

</project>

<br/>

创建配置管理类


我们 nameserver 直接使用本地安装的就可以,topic 是 rocketmq 服务端默认就会创建的

public class RmqConfig {
    /**
     * Name Server 地址,因为是集群部署 所以有多个用 分号 隔开
     */
    public static final String NAME_SERVER = "localhost:9876";
    /**
     * 主题名称 主题一般是服务器设置好 而不能在代码里去新建topic( 如果没有创建好,生产者往该主题发送消息 会报找不到topic错误)
     */
    public static final String TOPIC = "TestTopic";
}

<br/>

Producer


新建消息生产端 Producer,负责生产消息

import lombok.Getter;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;

@Getter
@Component
public class Producer {
    /**
     * 生产者组
     */
    private static final String producerGroup = "test_producer";
    private DefaultMQProducer producer;

    public Producer() throws MQClientException {
        producerInit().start();
    }

    private DefaultMQProducer producerInit() {
        producer = new DefaultMQProducer(producerGroup);
        // 不开启 vip 通道 开通口端口会减 2
        producer.setVipChannelEnabled(false);
        // 绑定 name server
        producer.setNamesrvAddr(RmqConfig.NAME_SERVER);
        return producer;
    }
}

<br/>

Consumer


建立消息消费端 Consumer,负责消费消息

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

@Slf4j
@Component
public class Consumer {
    /**
     * 消费者组
     */
    private static final String CONSUMER_GROUP = "test_consumer";

    public Consumer() throws MQClientException {
        consumerInit().start();
    }

    private DefaultMQPushConsumer consumerInit() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr(RmqConfig.NAME_SERVER);
        // 消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 订阅主题和 标签( * 代表所有标签)下信息
        consumer.subscribe(RmqConfig.TOPIC, "*");
        // 注册消费的监听 并在此监听中消费信息, 并返回消费的状态信息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            // msgs 中只收集同一个topic, 同一个 tag, 并且 key 相同的message
            // 会把不同的消息分别放置到不同的队列中
            try {
                for (Message msg : msgs) {
                    // 消费者获取消息 这里只输出 不做后面逻辑处理
                    String body = new String(msg.getBody(), "utf-8");
                    log.info("  >>> [消费端] 获取消息主题 topic :: {}, 消费消息 body :: {} ", msg.getTopic(), body);
                }
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        return consumer;
    }
}

<br/>

测试 Controller


创建对应 Controller 测试类,生产 100 条消息由消费者执行,最终把返回信息进行打印

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class Controller {

    @Autowired
    private Producer producer;

    @RequestMapping("/rocketmq/send")
    public Object callback() throws Exception {
        for (int i = 0; i < 100; i++) {
            // 创建生产信息
            Message message = new Message(RmqConfig.TOPIC, "testTag", (String.format("[生产者] 发送第 【%d】 次 mq 消息", i)).getBytes());
            // 发送消息
            SendResult sendResult = producer.getProducer().send(message);
            log.info("  >>> [生产者] 发送消息返回对象 :: {}", sendResult);
        }

        return "执行成功";
    }
}

<br/>

截取部分日志,输出打印如下:

2020-12-12 15:31:44.718  INFO 45296 --- [essageThread_11] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【90】 次 mq 消息 
2020-12-12 15:31:44.720  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDCD005B, offsetMsgId=C0A8010200002A9F000000000027CF47, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=0], queueOffset=3060]
2020-12-12 15:31:44.721  INFO 45296 --- [essageThread_12] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【91】 次 mq 消息 
2020-12-12 15:31:44.722  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDD0005C, offsetMsgId=C0A8010200002A9F000000000027D018, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=1], queueOffset=3062]
2020-12-12 15:31:44.724  INFO 45296 --- [essageThread_13] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【92】 次 mq 消息 
2020-12-12 15:31:44.725  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDD3005D, offsetMsgId=C0A8010200002A9F000000000027D0E9, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=2], queueOffset=3059]
2020-12-12 15:31:44.726  INFO 45296 --- [essageThread_14] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【93】 次 mq 消息 
2020-12-12 15:31:44.728  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDD5005E, offsetMsgId=C0A8010200002A9F000000000027D1BA, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=3], queueOffset=3058]
2020-12-12 15:31:44.729  INFO 45296 --- [essageThread_15] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【94】 次 mq 消息 
2020-12-12 15:31:44.731  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDD8005F, offsetMsgId=C0A8010200002A9F000000000027D28B, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=0], queueOffset=3061]
2020-12-12 15:31:44.732  INFO 45296 --- [essageThread_16] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【95】 次 mq 消息 
2020-12-12 15:31:44.734  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDDB0060, offsetMsgId=C0A8010200002A9F000000000027D35C, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=1], queueOffset=3063]
2020-12-12 15:31:44.735  INFO 45296 --- [essageThread_17] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【96】 次 mq 消息 
2020-12-12 15:31:44.737  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDDE0061, offsetMsgId=C0A8010200002A9F000000000027D42D, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=2], queueOffset=3060]
2020-12-12 15:31:44.738  INFO 45296 --- [essageThread_18] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【97】 次 mq 消息 
2020-12-12 15:31:44.740  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDE10062, offsetMsgId=C0A8010200002A9F000000000027D4FE, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=3], queueOffset=3059]
2020-12-12 15:31:44.741  INFO 45296 --- [essageThread_19] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【98】 次 mq 消息 
2020-12-12 15:31:44.742  INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller    :   >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDE40063, offsetMsgId=C0A8010200002A9F000000000027D5CF, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=0], queueOffset=3062]
2020-12-12 15:31:44.745  INFO 45296 --- [essageThread_20] c.m.study.cloudalibabarocketmq.Consumer  :   >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【99】 次 mq 消息 

<br/>

根据文章这么一套走下来,可以说是玩过 rocketmq 了,后面就需要更深入去了解 rocketmq 特性以及具体使用了

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4天前
|
消息中间件 NoSQL Java
springboot redis 实现消息队列
springboot redis 实现消息队列
70 1
|
4天前
|
消息中间件 Java Maven
一文搞懂Spring Boot整合RocketMQ
一文搞懂Spring Boot整合RocketMQ
135 0
|
4天前
|
消息中间件 NoSQL 数据库
一文讲透消息队列RocketMQ实现消费幂等
这篇文章,我们聊聊消息队列中非常重要的最佳实践之一:消费幂等。
一文讲透消息队列RocketMQ实现消费幂等
|
4天前
|
消息中间件 NoSQL Java
Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案【redis实战 二】
Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案【redis实战 二】
305 1
|
4天前
|
消息中间件 存储 Java
Alibaba开发十年,写出这本“MQ技术手册”,看完我愣住了
消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。虽然说,目前状况是Kafka更为火热,但更为广泛的应该还属老牌的RabbtiMQ和Alibaba自主研发的RocketMQ。
|
4天前
|
消息中间件 Java
springboot整合消息队列——RabbitMQ
springboot整合消息队列——RabbitMQ
82 0
|
4天前
|
消息中间件 存储 运维
为什么选择云消息队列 RocketMQ 版
为什么选择云消息队列 RocketMQ 版
10 1
|
4天前
|
消息中间件 存储 监控
解析RocketMQ:高性能分布式消息队列的原理与应用
RocketMQ是阿里开源的高性能分布式消息队列,具备低延迟、高吞吐和高可靠性,广泛应用于电商、金融等领域。其核心概念包括Topic、Producer、Consumer、Message和Name Server/Broker。RocketMQ支持异步通信、系统解耦、异步处理和流量削峰。关键特性有分布式架构、顺序消息、高可用性设计和消息事务。提供发布/订阅和点对点模型,以及消息过滤功能。通过集群模式、存储方式、发送和消费方式的选择进行性能优化。RocketMQ易于部署,可与Spring集成,并与Kafka等系统对比各有优势,拥有丰富的生态系统。
172 4
|
4天前
|
消息中间件 存储 缓存
消息队列学习之rocketmq
【4月更文挑战第1天】消息队列学习之rocketmq
18 0
|
4天前
|
消息中间件 数据库 RocketMQ
Springboot+RocketMQ通过事务消息优雅的实现订单支付功能
RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账, A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银 行账户扣除一万元”这个操作同时成功或者同时失败。RocketMQ采用两阶段提交的方式实现事务消息。
142 0

热门文章

最新文章

相关产品

  • 云消息队列 MQ