架构及组件说明
name server:注册中心
broker:消息处理
procucer:生成消息
consumer:消费消息
一、下载安装包
https://rocketmq.apache.org/zh/download
1、RocketMQ下载
2、RocketMQ仪表盘下载
二、修改RocketMQ配置参数
将 ocketmq-all-4.9.4-bin-release文件复制到合适的位置
1、修改conf/broker.conf配置
查询自己IP
添加如下配置(IP使用自己的),并保存。
brokerIP1=192.168.31.199 namesrvAddr=192.168.31.199:9876
2、设置ROCKETMQ_HOME环境变量
文件路径使用自己的
set ROCKETMQ_HOME=D:\ProgramFiles\rocketmq-all-4.9.4-bin-release
三、启动服务
1、启动Namesrv
在rocketmq文件的bin目录下,进入cmd
start mqnamesrv.cmd
2、启动Broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
三、启动仪表盘
1、修改配置文件
2、启动项目
3、打开仪表盘
http://localhost:8080/#/
按需修改为中文
查看消费者(非必须)
四、分别创建springboot生成者和消费者
1、生产者
创建普通springboot项目,添加依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.1</version> </dependency>
修改配置文件
# 应用名称 spring: application: name: rocket-producer # 应用服务 WEB 访问端口 server: port: 8002 rocketmq: name-server: localhost:9876 producer: group: my-group
创建测试代码
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; @Component public class SendMessage { @Resource private RocketMQTemplate rocketMQTemplate; @Scheduled(fixedRate = 5000) public void run(){ //发送消息 rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!"); } }
启动类添加@EnableScheduling注解
项目目录
2、消费者
创建普通springboot项目,添加依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.1</version> </dependency>
修改配置文件
# 应用名称 spring: application: name: rocket-consumer server: port: 8001 rocketmq: name-server: localhost:9876
创建测试代码
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") class MyConsumer1 implements RocketMQListener<String> { /** *需要注意的是,onMessage()封装了ACK机制,消费者往外抛异常时,RocketMQ认为消费失败,重新发送该条消息,否则默认消费成功 */ @Override public void onMessage(String s) { System.out.println(s); } }
项目目录
五、测试
1、启动生产者、消费者
接受消息正常
2、查看控制台