重大发现!消息中间件——RocketMQ(一) 环境搭建(完整版)下

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: 重大发现!消息中间件——RocketMQ(一) 环境搭建(完整版)下

3. Console监控平台说明

这里不做过多介绍,可以参考以下文章

官网地址:https://github.com/apache/roc...

其他博客地址:https://guozh.net/rocketmqzhi...

3. 案例测试

案例整合环境:SpringBoot环境案例来源于网络

3.1 pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.coderprogramming.rocketmq</groupId>
    <artifactId>rocketmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rocketmq</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

3.2 Producer生产者

**
 * @Description: 生产者
 * @author Coder编程
 * @date 2019/5/8 17:08
 */
@Component
public class Producer {
    /**
     * 生产者的组名
     */
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;
    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;
    public void orderedProducer() throws MQClientException, InterruptedException {
        /**
         * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例
         * 注意:ProducerGroupName需要由应用来保证唯一
         * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
         * 因为服务器会回查这个Group下的任意一个Producer
         */
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrvAddr);
        /**
         * Producer对象在使用之前必须要调用start初始化,初始化一次即可 注意:切记不可以在每次发送消息时,都调用start方法
         */
        producer.start();
        /**
         * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
         * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态
         * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,
         * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
         */
        try {
            for (int i = 0; i < 10; i++) {
                    Message msg = new Message("Topic1",// topic
                            "TagA",// tag
                            "001",// key
                            ("Send Msg:Hello MetaQ1").getBytes());// body
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                    Message msg2 = new Message("Topic2",// topic
                            "TagB",// tag
                            "002",// key
                            ("Send Msg:Hello MetaQ2").getBytes());// body
                    SendResult sendResult2 = producer.send(msg2);
                    System.out.println(sendResult2);
                    Message msg3 = new Message("Topic3",// topic
                            "TagC",// tag
                            "003",// key
                            ("Send Msg:Hello MetaQ3").getBytes());// body
                    SendResult sendResult3 = producer.send(msg3);
                    System.out.println(sendResult3);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        /**
         * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
         * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
         */
        producer.shutdown();
    }
}

3.3 Consumer消费者

/**
 * @Description: 消费者
 * @author Coder编程
 * @date 2019/5/8 17:08
 */
@Component
public class Consumer {
    /**
     * 生产者的组名
     */
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;
    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;
    /**
     * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。
     * 但是实际PushConsumer内部是使用长轮询Pull方式从Broker拉消息,然后再回调用户Listener方法
     */
    public void orderedConsumer() throws InterruptedException,MQClientException {
        /**
         * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例
         * 注意:ConsumerGroupName需要由应用来保证唯一
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(producerGroup);
        // consumer.setNamesrvAddr("10.10.0.102:9876");
        consumer.setNamesrvAddr(namesrvAddr);
        /**
         * 订阅指定topic下tags分别等于TagA或TagC或TagD
         */
        consumer.subscribe("Topic1", "TagA || TagC || TagD");
        /**
         * 订阅指定topic下所有消息<br>
         * 注意:一个consumer对象可以订阅多个topic
         */
        consumer.subscribe("Topic2", "*");
        consumer.subscribe("Topic3", "*");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            /**
             * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                MessageExt msg = msgs.get(0);
                if (msg.getTopic().equals("Topic1")) {
                    if (null != msg.getTags()) {
                        // 执行Topic1的消费逻辑
                        if (msg.getTags().equals("TagA")) {
                            // 执行TagA的消费
                            System.out.println("TagA开始。");
                        } else if (msg.getTags().equals("TagC")) {
                            System.out.println("TagC开始。");
                            // 执行TagC的消费
                        } else if (msg.getTags().equals("TagD")) {
                            // 执行TagD的消费
                            System.out.println("TagD开始。");
                        }
                    }
                } else if (msg.getTopic().equals("Topic2")) {
                    // 执行Topic2的消费逻辑
                    System.out.println("Topic2");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        /**
         * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
         */
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

3.3 properties配置文件

# 消费者的组名
apache.rocketmq.consumer.PushConsumer=PushConsumer
# 生产者的组名
apache.rocketmq.producer.producerGroup=Producer
# NameServer地址
apache.rocketmq.namesrvAddr=192.168.220.72:9876
# 设置应用端口
server.port=8089

3.4 测试代码

/**
 * @author Coder编程
 * @Title: HelloWord
 * @ProjectName rocketmq
 * @Description: Hello World
 * @date 2019/5/814:14
 */
@RestController
public class Test {
    @Autowired
    private Producer producer;
    @Autowired
    private Consumer consumer;
    @RequestMapping("/test")
    public String testMQ2() {
        try {
            System.out.println("-----------------开始生产-----------------");
            producer.orderedProducer();
            System.out.println("-----------------开始消费-----------------");
            consumer.orderedConsumer();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "success";
    }
}

4.奉上源码

以上安装jar包和案例测试源码已经上传至GitHub/Gitee

a9b36f6ca8f1d08d8494c7de3e73d8d.png

源码地址:

Github地址

Gitee地址

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
4月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
4月前
|
消息中间件 存储 中间件
消息中间件-RocketMQ技术(一)
消息中间件-RocketMQ技术(一)
|
2月前
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
138 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
3月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
159 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
2月前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
3月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
228 10
|
3月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
119 10
|
2月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
3月前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!

相关产品

  • 云消息队列 MQ