Rocketmq-spring入门与实践

简介: 本场景带您体验如何在 Spring 生态中优雅地使用 Apache RocketMQ,感受最受欢迎业务开发框架与最受欢迎消息平台结合的魅力。

Rocketmq-spring入门与实践

1. 创建资源

开始实验之前,您需要先创建ECS实例资源。

  1. 在实验室页面,单击创建资源
  2. (可选)在实验室页面左侧导航栏中,单击云产品资源列表,可查看本次实验资源相关信息(例如IP地址、用户信息等)。

说明 资源创建过程需要1~3分钟。


2. 启动名称服务器和代理

本步骤指导您如何在已搭建好RocketMQ实例中启动名称服务器和代理。

  1. 执行如下命令,切换目录至rocketmq-4.9.3/bin下。
cd /root/rocketmq-4.9.3/bin
  1. 执行如下命令,启动服务器。

说明 按CTRL+C可结束当前进程。

nohup sh mqnamesrv &

tail -f ~/logs/rocketmqlogs/namesrv.log

若启动代理失败,解决方案如下。

  1. 执行如下命令,查看 mqnamesrv 进程。
ps -ef |grep mqnamesrv

  1. 执行如下命令,清除掉该进程(该命令进程编号以上图示例)。
kill -9 6987
  1. 执行如下命令,清除Java进程。
sh mqshutdown namesrv
sh mqshutdown broker

  1. 执行如下命令,启动代理。

说明 按CTRL+C可结束当前进程。

nohup sh mqbroker -n localhost:9876 &

tail -f ~/logs/rocketmqlogs/broker.log

4.执行如下命令,创建topic。

cd /root/rocketmq-4.9.3/bin
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendDataType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendOperationType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t syncSendOrderly
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendAndReceive
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t broadcastConsumer
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t transactionMessage


3. 启动 java 工程

本步骤指导您如何启动java工程。

  1. 执行如下命令,启动Java工程。

说明 首次运行可能要花费3分钟左右的编译时间。

cd /root/rocketmq-handson-apply
mvn install -Dmaven.test.skip
export local=$RANDOM
java -jar  rocketmq-spring/target/rocketmq-spring-1.0.0.jar

启动成功,返回信息如下。

  1. 单击当前页面右上角,再开启一个会话


4. 访问并测试执行效果

本步骤指导您如何进行测试,并查看其相应效果。

  1. 消息类型测试。
  1. 发送对象。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendObject?id=1&name=xiaoming&action=go2&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 发送message。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendMessage?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 发送List。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendCollection?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 测试不同发送行为。
  1. 同步发送。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/syncSend?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 异步发送。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/asyncSend?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. OneWay发送。

执行如下命令, 向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/oneWaySend?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 发送事务消息。

执行如下命令, 向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=1"
wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=2"
wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 发送同步RPC消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendAndReceive?id=2&name=xiaoming&action=des&ags=3"

预期返回结果如下。

再观察会话1中的日志,会观察到如下的输出。


5. 细节点

本步骤将讲解rocketmq-spring的tag设置与原生不同以及关于 Message 对象的 header。

  1. rocketmq-spring的tag设置与原生不同。

现在把topic与tag合成了destination。destination=topic or topic":"tag。实例请看MessageSendController第152行。

  1. 关于 Message 对象的 header。

用于为消息提供附加信息,同时降低了消息体反序列化带来的额外开销。

  1. RocketMQ默认请求头。

请求头

作用

类型

RocketMQHeaders.KEYS

消息key,为发送消息设定keys

string


6. 生产者

本步骤将讲解生产者的创捷和各属性信息。

  1. 创建生产者。
  1. 默认生产者创建如下。

  1. 自定义生产者创建如下。

  1. 继承RocketMQTemplate对象。
  2. 使用ExtRocketMQTemplateConfiguration 注解。

配置名

作用

类型

默认值

是否必须

存在配置文件

默认配置名

存在注解

nameServer

注册服务地址

String

rocketmq.name-server

group

生产组

String

rocketmq.producer.group

sendMessageTimeout

发送超时

int

3000

rocketmq.producer.send-message-timeout

compressMessageBodyThreshold

数据压缩限制

int

1024*4

rocketmq.producer.compress-message-body-threshold

retryTimesWhenSendFailed

同步发送失败重试次数

int

2

rocketmq.producer.retry-times-when-send-failed

retryTimesWhenSendAsyncFailed

异步发送失败重试次数

int

2

rocketmq.producer.retry-times-when-send-async-fsailed

retryNextServer

内部发送失败时重试另一个代理

boolean

false

rocketmq.producer.retry-next-server

maxMessageSize

消息最大字节数

int

4M

rocketmq.producer.max-message-size

enableMsgTrace

开启消息轨迹

boolean

true

rocketmq.producer.enable-msg-trace

customizedTraceTopic

消息轨迹topic

String

RMQSYSTRACE_TOPIC

rocketmq.producer.customized-trace-topic

accessKey

acl需要的ak

String

rocketmq.producer.access-key

secretKey

acl需要的sk

String

  1. 配置属性信息。
  2. 声明自定义消费者的全局变量,并使用@Autowired注解。
  3. 消息生产。
  4. 发送形参说明如下。

类型

说明

destination

String

消息topic或者topic与tag组合

message

Message

发送对象

payload

Object

发送对象

timeout

long

发送超时时长,如果没有,使用全局配置或者默认配置

delayLevel

int

数据压缩级别

messages

发送集合消息

集合内部数据只能是message或者子类

hashKey

String

循序消费的消息需要, 用于确定消息的队列

sendCallback

SendCallback

异步发送回调对象

rocketMQLocalRequestCallback

RocketMQLocalRequestCallback

异步RPC回调对象

type

Type

同步RPC消息返回对象

  1. 消息类型解读。

消息类型

出处

是否返回SendResult

优势

劣势

Object

rocketmq-spring

简单,方便

扩展度低

message

rocketmq-spring

可以扩展

相对object使用复杂

convertAndSend

spring-cloud-steam

spring标准

说明 三种消息都是使用convert进行数据序列化。本质上一样,不建议使用convertAndSend方式。

  1. 生产方式。

发送方式

循序消息

RPC消息

批量消息

事务消息

同步

syncSendOrderly

支持(sendAndReceive)

支持

sendMessageInTransaction

异步

asyncSendOrderly

支持(sendAndReceive)

不支持

不支持

OneWay

sendOneWayOrderly

不支持

不支持

不支持

  1. 事务消息。
  1. 实现 RocketMQLocalTransactionListener 接口。
  2. 配置 @RocketMQTransactionListener 注解。
  3. rocketMQTemplateBeanName 拦截的 RocketMQTemplate 的事务,值对应ExtRocketMQTemplateConfiguration.value
  1. 实现接口方法。

实验链接:https://developer.aliyun.com/adc/scenario/470077b9392a482b91fa1a2bf4c423fe

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
22天前
|
数据采集 Java 数据安全/隐私保护
Spring Boot 3.3中的优雅实践:全局数据绑定与预处理
【10月更文挑战第22天】 在Spring Boot应用中,`@ControllerAdvice`是一个强大的工具,它允许我们在单个位置处理多个控制器的跨切面关注点,如全局数据绑定和预处理。这种方式可以大大减少重复代码,提高开发效率。本文将探讨如何在Spring Boot 3.3中使用`@ControllerAdvice`来实现全局数据绑定与预处理。
58 2
|
23天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
1月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
67 6
|
25天前
|
SQL Java 数据库
Spring Boot与Flyway:数据库版本控制的自动化实践
【10月更文挑战第19天】 在软件开发中,数据库的版本控制是一个至关重要的环节,它确保了数据库结构的一致性和项目的顺利迭代。Spring Boot结合Flyway提供了一种自动化的数据库版本控制解决方案,极大地简化了数据库迁移管理。本文将详细介绍如何使用Spring Boot和Flyway实现数据库版本的自动化控制。
24 2
|
1月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
1月前
|
消息中间件 存储 弹性计算
云消息队列 RabbitMQ 版实践解决方案评测
随着企业业务的增长,对消息队列的需求日益提升。阿里云的云消息队列 RabbitMQ 版通过架构优化,解决了消息积压、内存泄漏等问题,并支持弹性伸缩和按量计费,大幅降低资源和运维成本。本文从使用者角度详细评测这一解决方案,涵盖实践原理、部署体验、实际优势及应用场景。
|
1月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
64 4
|
1月前
|
Java API Spring
springboot学习七:Spring Boot2.x 拦截器基础入门&实战项目场景实现
这篇文章是关于Spring Boot 2.x中拦截器的入门教程和实战项目场景实现的详细指南。
26 0
springboot学习七:Spring Boot2.x 拦截器基础入门&实战项目场景实现
|
1月前
|
Java API Spring
springboot学习六:Spring Boot2.x 过滤器基础入门&实战项目场景实现
这篇文章是关于Spring Boot 2.x中过滤器的基础知识和实战项目应用的教程。
24 0
springboot学习六:Spring Boot2.x 过滤器基础入门&实战项目场景实现
|
1月前
|
消息中间件
实践部署《云消息队列RabbitMQ实践》测评
《云消息队列RabbitMQ实践》解决方案原理清晰,尤其在异步通信和解耦方面解释详尽。对初学者而言,部分术语如消息持久化、确认机制及集群性能优化可更细致。部署过程文档详实,涵盖主要环节,但插件配置等细节存在环境问题,需查阅社区资料解决。该方案展示了RabbitMQ的高吞吐量、灵活路由和可靠消息传递能力,但在高可用性和消息丢失处理上可提供更深入配置建议。适用于高并发和解耦场景,如订单处理、日志收集,有助于提升系统可扩展性。总体部署体验良好,实用性较强。
49 0

相关产品

  • 云消息队列 MQ