文章保证,一路执行下去不会报错,报错你找我。如果觉得有用,希望屏幕前的大佬,点赞➕关注
只有想到的方面多时,才能在够实际运用中发现更多的问题,这也是作者给读者最好的礼物
使用这篇文章 docker 安装 rocketmq,肯定没问题
<br/>
新建 springboot 项目
新建一个 springboot 项目,pom 文件如下,rocketmq 使用 apache 封装的 jar
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.11.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.machen.study</groupId>
<artifactId>cloud-alibaba-rocketmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<modelVersion>4.0.0</modelVersion>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
<br/>
创建配置管理类
我们 nameserver 直接使用本地安装的就可以,topic 是 rocketmq 服务端默认就会创建的
public class RmqConfig {
/**
* Name Server 地址,因为是集群部署 所以有多个用 分号 隔开
*/
public static final String NAME_SERVER = "localhost:9876";
/**
* 主题名称 主题一般是服务器设置好 而不能在代码里去新建topic( 如果没有创建好,生产者往该主题发送消息 会报找不到topic错误)
*/
public static final String TOPIC = "TestTopic";
}
<br/>
Producer
新建消息生产端 Producer,负责生产消息
import lombok.Getter;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;
@Getter
@Component
public class Producer {
/**
* 生产者组
*/
private static final String producerGroup = "test_producer";
private DefaultMQProducer producer;
public Producer() throws MQClientException {
producerInit().start();
}
private DefaultMQProducer producerInit() {
producer = new DefaultMQProducer(producerGroup);
// 不开启 vip 通道 开通口端口会减 2
producer.setVipChannelEnabled(false);
// 绑定 name server
producer.setNamesrvAddr(RmqConfig.NAME_SERVER);
return producer;
}
}
<br/>
Consumer
建立消息消费端 Consumer,负责消费消息
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
@Slf4j
@Component
public class Consumer {
/**
* 消费者组
*/
private static final String CONSUMER_GROUP = "test_consumer";
public Consumer() throws MQClientException {
consumerInit().start();
}
private DefaultMQPushConsumer consumerInit() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr(RmqConfig.NAME_SERVER);
// 消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 订阅主题和 标签( * 代表所有标签)下信息
consumer.subscribe(RmqConfig.TOPIC, "*");
// 注册消费的监听 并在此监听中消费信息, 并返回消费的状态信息
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// msgs 中只收集同一个topic, 同一个 tag, 并且 key 相同的message
// 会把不同的消息分别放置到不同的队列中
try {
for (Message msg : msgs) {
// 消费者获取消息 这里只输出 不做后面逻辑处理
String body = new String(msg.getBody(), "utf-8");
log.info(" >>> [消费端] 获取消息主题 topic :: {}, 消费消息 body :: {} ", msg.getTopic(), body);
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
return consumer;
}
}
<br/>
测试 Controller
创建对应 Controller 测试类,生产 100 条消息由消费者执行,最终把返回信息进行打印
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class Controller {
@Autowired
private Producer producer;
@RequestMapping("/rocketmq/send")
public Object callback() throws Exception {
for (int i = 0; i < 100; i++) {
// 创建生产信息
Message message = new Message(RmqConfig.TOPIC, "testTag", (String.format("[生产者] 发送第 【%d】 次 mq 消息", i)).getBytes());
// 发送消息
SendResult sendResult = producer.getProducer().send(message);
log.info(" >>> [生产者] 发送消息返回对象 :: {}", sendResult);
}
return "执行成功";
}
}
<br/>
截取部分日志,输出打印如下:
2020-12-12 15:31:44.718 INFO 45296 --- [essageThread_11] c.m.study.cloudalibabarocketmq.Consumer : >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【90】 次 mq 消息
2020-12-12 15:31:44.720 INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller : >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDCD005B, offsetMsgId=C0A8010200002A9F000000000027CF47, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=0], queueOffset=3060]
2020-12-12 15:31:44.721 INFO 45296 --- [essageThread_12] c.m.study.cloudalibabarocketmq.Consumer : >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【91】 次 mq 消息
2020-12-12 15:31:44.722 INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller : >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDD0005C, offsetMsgId=C0A8010200002A9F000000000027D018, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=1], queueOffset=3062]
2020-12-12 15:31:44.724 INFO 45296 --- [essageThread_13] c.m.study.cloudalibabarocketmq.Consumer : >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【92】 次 mq 消息
2020-12-12 15:31:44.725 INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller : >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDD3005D, offsetMsgId=C0A8010200002A9F000000000027D0E9, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=2], queueOffset=3059]
2020-12-12 15:31:44.726 INFO 45296 --- [essageThread_14] c.m.study.cloudalibabarocketmq.Consumer : >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【93】 次 mq 消息
2020-12-12 15:31:44.728 INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller : >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDD5005E, offsetMsgId=C0A8010200002A9F000000000027D1BA, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=3], queueOffset=3058]
2020-12-12 15:31:44.729 INFO 45296 --- [essageThread_15] c.m.study.cloudalibabarocketmq.Consumer : >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【94】 次 mq 消息
2020-12-12 15:31:44.731 INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller : >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDD8005F, offsetMsgId=C0A8010200002A9F000000000027D28B, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=0], queueOffset=3061]
2020-12-12 15:31:44.732 INFO 45296 --- [essageThread_16] c.m.study.cloudalibabarocketmq.Consumer : >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【95】 次 mq 消息
2020-12-12 15:31:44.734 INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller : >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDDB0060, offsetMsgId=C0A8010200002A9F000000000027D35C, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=1], queueOffset=3063]
2020-12-12 15:31:44.735 INFO 45296 --- [essageThread_17] c.m.study.cloudalibabarocketmq.Consumer : >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【96】 次 mq 消息
2020-12-12 15:31:44.737 INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller : >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDDE0061, offsetMsgId=C0A8010200002A9F000000000027D42D, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=2], queueOffset=3060]
2020-12-12 15:31:44.738 INFO 45296 --- [essageThread_18] c.m.study.cloudalibabarocketmq.Consumer : >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【97】 次 mq 消息
2020-12-12 15:31:44.740 INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller : >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDE10062, offsetMsgId=C0A8010200002A9F000000000027D4FE, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=3], queueOffset=3059]
2020-12-12 15:31:44.741 INFO 45296 --- [essageThread_19] c.m.study.cloudalibabarocketmq.Consumer : >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【98】 次 mq 消息
2020-12-12 15:31:44.742 INFO 45296 --- [nio-8080-exec-1] c.m.s.cloudalibabarocketmq.Controller : >>> [生产者] 发送消息返回对象 :: SendResult [sendStatus=SEND_OK, msgId=0A10000EB0F018B4AAC23BFAFDE40063, offsetMsgId=C0A8010200002A9F000000000027D5CF, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=0], queueOffset=3062]
2020-12-12 15:31:44.745 INFO 45296 --- [essageThread_20] c.m.study.cloudalibabarocketmq.Consumer : >>> [消费端] 获取消息主题 topic :: TestTopic, 消费消息 body :: [生产者] 发送第 【99】 次 mq 消息
<br/>
根据文章这么一套走下来,可以说是玩过 rocketmq 了,后面就需要更深入去了解 rocketmq 特性以及具体使用了