Springcloud之Rocketmq发送消息

简介: Springcloud之Rocketmq发送消息

文章目录

简介

前面已经介绍了rocketmq的简单使用,本篇我们介绍使用订单服务下单成功之后发送短信消息。上篇docker下使用rocketmq,估计是版本问题,本篇使用一个低版本的rocketmq。

rocketmq下载

官网下载一个4.4的安装包,

解压安装

d9e1ca7c74964e96812a93fff29c7d2f.png

解压

unzip rocketmq-all-4.4.0-bin-release.zip

修改名称

MV rocketmq-all-4.4.0-bin-release rocketmq

01249802d2b1491cbd5cab76b128460c.png

启动namesvr

启动

nohup ./bin/mqnamesrv &

查看日志

tail -f /root/logs/rocketmqlogs/namesrv.log

启动broker

broker.conf需要修改增加

namesrvAddr=127.0.0.1:9876

brokerIP1=192.168.5.130  

修改固定的jvm参数 JAVA_OPT=“${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g” 根据实际情况改小点

-Xms256m -Xmx256m -Xmn128m

[root@elite rocketmq]#  vim bin/runbroker.sh  

[root@elite rocketmq]# vim bin/runserver.sh  

启动

nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true -c ../conf/broker.conf  &

查看日志

tail -f /root/logs/rocketmqlogs/broker.log

关闭服务

bin/mqshutdown namesrv

bin/mqshutdown broker

测试

发送消息

bin/tools.sh  org.apache.rocketmq.example.quickstart.Producer

456003c7648f47fb814f46b9f0a24271.png

消费消息

bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer


3bcd2e3bf07f41c981c15a38cbb5e20c.png

手动创建topic

./mqadmin updateTopic -n localhost:9876  -b localhost:10911  -t order-topic

项目搭建

依赖

用户与订单模块中都需要添加

<dependency>

           <groupId>org.apache.rocketmq</groupId>

           <artifactId>rocketmq-spring-boot-starter</artifactId>

           <version>2.0.2</version>

       </dependency>

       <dependency>

           <groupId>org.apache.rocketmq</groupId>

           <artifactId>rocketmq-client</artifactId>

           <version>4.4.0</version>

       </dependency>

yml配置地址

#rocketmq配置

rocketmq:

 name-server: 192.168.5.130:9876

 producer:

   group: springcloud-order

订单模块

package com.elite.springcloud.controller;

import com.elite.springcloud.entity.Order;

import com.elite.springcloud.entity.Product;

import com.elite.springcloud.entity.User;

import com.elite.springcloud.interfaces.ProductService;

import com.elite.springcloud.service.IOrderService;

import org.apache.rocketmq.spring.core.RocketMQTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import java.math.BigDecimal;

/**

* <p>

* 订单表 前端控制器

* 下单控制

* </p>

*

* @author elite

* @since 2022-09-10

*/

@RestController

@RequestMapping("/springcloud/order/rocketmq")

public class OrderRocketMqController {

   //订单服务

   @Autowired

   IOrderService orderService;

   @Autowired

   ProductService productService;

   @Autowired

   RocketMQTemplate rocketMQTemplate;

   /**

    * 模拟下单 传入商品id,用户随机

    * @param product_id

    * @return

    */

   @GetMapping("/saveOrder/{product_id}")

   public String saveOrder(@PathVariable("product_id")Integer product_id ){

       //获取商品

       Product product = productService.getProductById(product_id);

       if (product == null){

           return "商品信息不存在";

       }

       //用户信息

       User user = new User();

       user.setUserId(1);

       //订单信息

       Order order = new Order();

       order.setOrderNo(10);

       order.setProductId(product_id);

       order.setUserId(user.getUserId());

       order.setOrderNum(1);

       order.setOrderAmt(product.getProductPrice());

       order.setOrderStatus("下单");

       order.setPayStatus("支付成功");

       order.setCreateBy("牛奶糖");

       order.setUpdateBy("牛奶糖");

       orderService.save(order);

       //下单成功之后,将消息放到mq中

       rocketMQTemplate.syncSend("order-topic", order,6000);

       //发布下单消息

       return "下单成功"+order.toString();

   }

}


用户模块添加监听

//监听消息

@Slf4j

@Service

@RocketMQMessageListener(consumerGroup = "springcloud-user", topic = "order-topic")

public class MessageLsner implements RocketMQListener<Order> {

   @Override

   public void onMessage(Order order) {

       log.info("订单信息:", JSON.toJSONString(order));

   }

}


测试模块

发送下订单请求:

74a5500fdabb4fe1bb4abf9c8fbd5366.png

存数据库订单

3d680e1615a544f194fff3719fef8def.png

监听消息

6036f0100f294e3994202f20aacc64ed.png

由于Sms开通有问题,这里就不讲解了。


重点问题:

启动broker前修改内存配置

修改固定的jvm参数 JAVA_OPT=“${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g” 根据实际情况改小点

##-Xms256m -Xmx256m -Xmn128m

[root@elite rocketmq]#  vim bin/runbroker.sh  

[root@elite rocketmq]# vim bin/runserver.sh  

  1. 无法自动创建主题

[root@elite conf]# cat broker.conf  

brokerClusterName = DefaultCluster

brokerName = broker-a

brokerId = 0

deleteWhen = 04

fileReservedTime = 48

brokerRole = ASYNC_MASTER

flushDiskType = ASYNC_FLUSH

namesrvAddr=localhost:9876 ## 添加namesrv

brokerIP1=192.168.5.130 ##添加ip

autoCreateTopicEnable=true ###开启自动创建topic

启动broker加参数指向配置文件,否则配置文件不生效。

bin/mqbroker -n localhost:9876 -c conf/broker.conf

  1. 发送消息提示内存不足
  2. org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14  DESC: service not available now, maybe disk full, CL:  0.97 CQ:  0.97 INDEX:  0.97, maybe your broker machine memory too small.

For more information, please visit the url, http://rocketmq.apache.org/docs/faq/

###切换到到 rocketmq 配置文件所在路径

vim /usr/rocketmq/conf/2m-2s-async/broker-a.properties

###最后一行增加 diskMaxUsedSpaceRatio=99,表示剩余磁盘比例不足99%才报错

diskMaxUsedSpaceRatio=99

###wq 保存退出

改了,重启namesrv,broker.

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
消息中间件 缓存 物联网
MQTT常见问题之MQTT发送消息到阿里云服务器被拒如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
8月前
|
消息中间件
SpringCloud Stream集成RabbitMQ
SpringCloud Stream集成RabbitMQ
425 0
|
消息中间件 Java Maven
微服务技术系列教程(34) - SpringCloud-使用RabbitMQ实现消息驱动
微服务技术系列教程(34) - SpringCloud-使用RabbitMQ实现消息驱动
211 0
|
7月前
|
消息中间件 测试技术 RocketMQ
消息队列 MQ产品使用合集之在异步发送消息函数sendMessage()中出现了错误,错误代码为-3,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 存储 Java
SpringCloud基础4——RabbitMQ和SpringAMQP
消息队列MQ、RabbitMQ、SpringAMQP高级消息队列协议、发布/订阅模型、fanout、direct、topic模式
SpringCloud基础4——RabbitMQ和SpringAMQP
|
5月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
75 0
|
8月前
|
消息中间件 Java RocketMQ
Spring Cloud RocketMQ:构建可靠消息驱动的微服务架构
【4月更文挑战第28天】消息队列在微服务架构中扮演着至关重要的角色,能够实现服务之间的解耦、异步通信以及数据分发。Spring Cloud RocketMQ作为Apache RocketMQ的Spring Cloud集成,为微服务架构提供了可靠的消息传输机制。
268 1
|
消息中间件 存储 canal
3分钟白话RocketMQ系列—— 如何发送消息
3分钟白话RocketMQ系列—— 如何发送消息
311 0
|
8月前
|
消息中间件 Java 数据安全/隐私保护
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
|
8月前
|
存储 缓存 物联网
MQTT常见问题之MQTT发送消息过多内存不够处理不过来如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总: