Springboot整合RocketMq

简介: Springboot整合RocketMq

文章目录

介绍

MQ,Message Queue,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程API的软件系统。消息即数据。RocketMQ是⼀款阿⾥巴巴开源的消息中间件,主要用于限流,异步解耦操作,如付款之后短信通知,订单发货通知等等,都是异步进行执行。

rocketmq搭建

查找rockermq镜像

docker search rocketmq

c15b930e689749db98074e67505a5c9a.png

拉取镜像

docker pull rocketmqinc/rocketmq

207813438c694339ad111a291f5c0d80.png

创建目录

mkdir -p  /docker/rocketmq/data/namesrv/logs   /docker/rocketmq/data/namesrv/store

构建namesvr容器

#构建namesrv容器

docker run -d --restart=always --name rmqnamesrv --privileged=true -p 9876:9876  -v /docker/rocketmq/nameserver/logs:/root/logs -v /docker/rocketmq/nameserver/store:/root/store -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv

启动broker

docker run -d --restart=always --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 --privileged=true -v /docker/rocketmq/data/broker/logs:/root/logs -v /docker/rocketmq/data/broker/store:/root/store -v /docker/rocketmq/conf/broker.conf:/opt/docker/rocketmq/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c /opt/docker/rocketmq/broker.conf

安装控制台

docker pull pangliang/rocketmq-console-ng

控制台

docker run -d --restart=always --name rmqadmin -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.5.130:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 pangliang/rocketmq-console-ng

核心的概念

消息(Message)

消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。

队列(Queue)

存储消息的物理实体。一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。一个Topic的Queue也被称为一个Topic中消息的分区(Partition)。

主题(Topic)

Topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位

标签(Tag)

为消息设置的标签,用于同一主题下区分不同类型的消息。

Producer

消息生产者,负责一般由业务系统。

Consumer

消息消费者,负责消费消息。一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务

处理。

项目构建

目录结构



8d444b0783f1400698b492569a9cb3d4.png

依赖引入

 <!--rocketmq-->

     <dependency>

       <groupId>org.apache.rocketmq</groupId>

       <artifactId>rocketmq-spring-boot-starter</artifactId>

       <version>2.2.2</version>

     </dependency>


消息提供者

yml配置

server:

 port: 8022

#应用名字

spring:

 application:

   name: RocketMqOrderProvider

#rocketmq配置

rocketmq:

 name-server: 192.168.5.130:9876

 producer:

   group: order-producer-group

模板发送消息

@RestController

public class MqOrderController {

   @Autowired

   private RocketMQTemplate rocketMQTemplate;

   @RequestMapping("/sendOrderMessage")

   public String sendOrderMessage() {

       Order order = new Order();

       order.setOrderId(1);

       order.setOrderNo(1);

       order.setProductId(1);

       order.setUserId(1);

       order.setOrderNum(5);

       order.setOrderAmt(new BigDecimal("100.0"));

       order.setOrderStatus("下单");

       order.setPayStatus("未支付");

       order.setCreateUser("elite");

       order.setCreateTime(LocalDateTime.now());

       rocketMQTemplate.syncSend("order-topic",order.toString(),6000);

       return order.toString();

   }

}

消息接收者

yml配置

server:

 port: 8023

#应用名字

spring:

 application:

   name: RocketMqOrderConsumer

rocketmq:

 name-server: 192.168.5.130:9876

 producer:

   group: order-consumer-group

消息监听

@Component

@RocketMQMessageListener(topic = "order-topic",consumerGroup = "my-consumer-group")

@Slf4j

public class ConsumerListener implements RocketMQListener<String> {

   @Override

   public void onMessage(String orderInfo) {

       System.out.println(orderInfo);

   }

}

测试连接不上服务问题

See http://rocketmq.apache.org/docs/faq/ for further details.; nested exception is org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [6288]ms, Topic: order-topic, BrokersSent: [2e116b166e4d, 2e116b166e4d, 2e116b166e4d]

See http://rocketmq.apache.org/docs/faq/ for further details.] with root cause

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to 172.17.0.4:10911 failed

at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:407) ~[rocketmq-remoting-4.9.3.jar:4.9.3]

问题排查中。。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 Java 网络架构
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
35 6
|
5月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
5月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
1037 3
|
5月前
|
消息中间件 Java Maven
|
6月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
405 1
|
6月前
|
消息中间件 Java 数据安全/隐私保护
Spring Boot与RabbitMQ的集成
Spring Boot与RabbitMQ的集成
|
6月前
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成
|
6月前
|
消息中间件 Java Spring
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
790 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!