Rocketmq-spring入门与实践

简介: 本场景带您体验如何在 Spring 生态中优雅地使用 Apache RocketMQ,感受最受欢迎业务开发框架与最受欢迎消息平台结合的魅力。

Rocketmq-spring入门与实践

1. 创建资源

开始实验之前,您需要先创建ECS实例资源。

  1. 在实验室页面,单击创建资源
  2. (可选)在实验室页面左侧导航栏中,单击云产品资源列表,可查看本次实验资源相关信息(例如IP地址、用户信息等)。

说明 资源创建过程需要1~3分钟。


2. 启动名称服务器和代理

本步骤指导您如何在已搭建好RocketMQ实例中启动名称服务器和代理。

  1. 执行如下命令,切换目录至rocketmq-4.9.3/bin下。
cd /root/rocketmq-4.9.3/bin
  1. 执行如下命令,启动服务器。

说明 按CTRL+C可结束当前进程。

nohup sh mqnamesrv &

tail -f ~/logs/rocketmqlogs/namesrv.log

若启动代理失败,解决方案如下。

  1. 执行如下命令,查看 mqnamesrv 进程。
ps -ef |grep mqnamesrv

  1. 执行如下命令,清除掉该进程(该命令进程编号以上图示例)。
kill -9 6987
  1. 执行如下命令,清除Java进程。
sh mqshutdown namesrv
sh mqshutdown broker

  1. 执行如下命令,启动代理。

说明 按CTRL+C可结束当前进程。

nohup sh mqbroker -n localhost:9876 &

tail -f ~/logs/rocketmqlogs/broker.log

4.执行如下命令,创建topic。

cd /root/rocketmq-4.9.3/bin
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendDataType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendOperationType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t syncSendOrderly
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendAndReceive
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t broadcastConsumer
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t transactionMessage


3. 启动 java 工程

本步骤指导您如何启动java工程。

  1. 执行如下命令,启动Java工程。

说明 首次运行可能要花费3分钟左右的编译时间。

cd /root/rocketmq-handson-apply
mvn install -Dmaven.test.skip
export local=$RANDOM
java -jar  rocketmq-spring/target/rocketmq-spring-1.0.0.jar

启动成功,返回信息如下。

  1. 单击当前页面右上角,再开启一个会话


4. 访问并测试执行效果

本步骤指导您如何进行测试,并查看其相应效果。

  1. 消息类型测试。
  1. 发送对象。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendObject?id=1&name=xiaoming&action=go2&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 发送message。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendMessage?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 发送List。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendCollection?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 测试不同发送行为。
  1. 同步发送。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/syncSend?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 异步发送。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/asyncSend?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. OneWay发送。

执行如下命令, 向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/oneWaySend?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 发送事务消息。

执行如下命令, 向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=1"
wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=2"
wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 发送同步RPC消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendAndReceive?id=2&name=xiaoming&action=des&ags=3"

预期返回结果如下。

再观察会话1中的日志,会观察到如下的输出。


5. 细节点

本步骤将讲解rocketmq-spring的tag设置与原生不同以及关于 Message 对象的 header。

  1. rocketmq-spring的tag设置与原生不同。

现在把topic与tag合成了destination。destination=topic or topic":"tag。实例请看MessageSendController第152行。

  1. 关于 Message 对象的 header。

用于为消息提供附加信息,同时降低了消息体反序列化带来的额外开销。

  1. RocketMQ默认请求头。

请求头

作用

类型

RocketMQHeaders.KEYS

消息key,为发送消息设定keys

string


6. 生产者

本步骤将讲解生产者的创捷和各属性信息。

  1. 创建生产者。
  1. 默认生产者创建如下。

  1. 自定义生产者创建如下。

  1. 继承RocketMQTemplate对象。
  2. 使用ExtRocketMQTemplateConfiguration 注解。

配置名

作用

类型

默认值

是否必须

存在配置文件

默认配置名

存在注解

nameServer

注册服务地址

String

rocketmq.name-server

group

生产组

String

rocketmq.producer.group

sendMessageTimeout

发送超时

int

3000

rocketmq.producer.send-message-timeout

compressMessageBodyThreshold

数据压缩限制

int

1024*4

rocketmq.producer.compress-message-body-threshold

retryTimesWhenSendFailed

同步发送失败重试次数

int

2

rocketmq.producer.retry-times-when-send-failed

retryTimesWhenSendAsyncFailed

异步发送失败重试次数

int

2

rocketmq.producer.retry-times-when-send-async-fsailed

retryNextServer

内部发送失败时重试另一个代理

boolean

false

rocketmq.producer.retry-next-server

maxMessageSize

消息最大字节数

int

4M

rocketmq.producer.max-message-size

enableMsgTrace

开启消息轨迹

boolean

true

rocketmq.producer.enable-msg-trace

customizedTraceTopic

消息轨迹topic

String

RMQSYSTRACE_TOPIC

rocketmq.producer.customized-trace-topic

accessKey

acl需要的ak

String

rocketmq.producer.access-key

secretKey

acl需要的sk

String

  1. 配置属性信息。
  2. 声明自定义消费者的全局变量,并使用@Autowired注解。
  3. 消息生产。
  4. 发送形参说明如下。

类型

说明

destination

String

消息topic或者topic与tag组合

message

Message

发送对象

payload

Object

发送对象

timeout

long

发送超时时长,如果没有,使用全局配置或者默认配置

delayLevel

int

数据压缩级别

messages

发送集合消息

集合内部数据只能是message或者子类

hashKey

String

循序消费的消息需要, 用于确定消息的队列

sendCallback

SendCallback

异步发送回调对象

rocketMQLocalRequestCallback

RocketMQLocalRequestCallback

异步RPC回调对象

type

Type

同步RPC消息返回对象

  1. 消息类型解读。

消息类型

出处

是否返回SendResult

优势

劣势

Object

rocketmq-spring

简单,方便

扩展度低

message

rocketmq-spring

可以扩展

相对object使用复杂

convertAndSend

spring-cloud-steam

spring标准

说明 三种消息都是使用convert进行数据序列化。本质上一样,不建议使用convertAndSend方式。

  1. 生产方式。

发送方式

循序消息

RPC消息

批量消息

事务消息

同步

syncSendOrderly

支持(sendAndReceive)

支持

sendMessageInTransaction

异步

asyncSendOrderly

支持(sendAndReceive)

不支持

不支持

OneWay

sendOneWayOrderly

不支持

不支持

不支持

  1. 事务消息。
  1. 实现 RocketMQLocalTransactionListener 接口。
  2. 配置 @RocketMQTransactionListener 注解。
  3. rocketMQTemplateBeanName 拦截的 RocketMQTemplate 的事务,值对应ExtRocketMQTemplateConfiguration.value
  1. 实现接口方法。

实验链接:https://developer.aliyun.com/adc/scenario/470077b9392a482b91fa1a2bf4c423fe

相关实践学习
5分钟轻松打造应对流量洪峰的稳定商城交易系统
本实验通过SAE极速部署一个微服务电商商城,同时结合RocketMQ异步解耦、削峰填谷的能力,带大家体验面对流量洪峰仍旧稳定可靠的商城交易系统!
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
16天前
|
人工智能 Java API
Spring AI 实战|Spring AI入门之DeepSeek调用
本文介绍了Spring AI框架如何帮助Java开发者轻松集成和使用大模型API。文章从Spring AI的初探开始,探讨了其核心能力及应用场景,包括手动与自动发起请求、流式响应实现打字机效果,以及兼容不同AI服务(如DeepSeek、通义千问)的方法。同时,还详细讲解了如何在生产环境中添加监控以优化性能和成本管理。通过Spring AI,开发者可以简化大模型调用流程,降低复杂度,为企业智能应用开发提供强大支持。最后,文章展望了Spring AI在未来AI时代的重要作用,鼓励开发者积极拥抱这一技术变革。
492 71
Spring AI 实战|Spring AI入门之DeepSeek调用
|
2月前
|
JSON 前端开发 Java
深入理解 Spring Boot 中日期时间格式化:@DateTimeFormat 与 @JsonFormat 完整实践
在 Spring Boot 开发中,日期时间格式化是前后端交互的常见痛点。本文详细解析了 **@DateTimeFormat** 和 **@JsonFormat** 两个注解的用法,分别用于将前端传入的字符串解析为 Java 时间对象,以及将时间对象序列化为指定格式返回给前端。通过完整示例代码,展示了从数据接收、业务处理到结果返回的全流程,并总结了解决时区问题和全局配置的最佳实践,助你高效处理日期时间需求。
228 0
|
2月前
|
安全 Java 数据库
Spring Security 实战指南:从入门到精通
本文详细介绍了Spring Security在Java Web项目中的应用,涵盖登录、权限控制与安全防护等功能。通过Filter Chain过滤器链实现请求拦截与认证授权,核心组件包括AuthenticationProvider和UserDetailsService,负责用户信息加载与密码验证。文章还解析了项目结构,如SecurityConfig配置类、User实体类及自定义登录逻辑,并探讨了Method-Level Security、CSRF防护、Remember-Me等进阶功能。最后总结了Spring Security的核心机制与常见配置,帮助开发者构建健壮的安全系统。
174 0
|
2月前
|
存储 Java 数据库
Spring Boot 注册登录系统:问题总结与优化实践
在Spring Boot开发中,注册登录模块常面临数据库设计、密码加密、权限配置及用户体验等问题。本文以便利店销售系统为例,详细解析四大类问题:数据库字段约束(如默认值缺失)、密码加密(明文存储风险)、Spring Security配置(路径权限不当)以及表单交互(数据丢失与提示不足)。通过优化数据库结构、引入BCrypt加密、完善安全配置和改进用户交互,提供了一套全面的解决方案,助力开发者构建更 robust 的系统。
86 0
|
8天前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
219 6
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
8天前
|
Java 关系型数据库 MySQL
【Spring】【事务】初学者直呼学会了的Spring事务入门
本文深入解析了Spring事务的核心概念与使用方法。Spring事务是一种数据库事务管理机制,通过确保操作的原子性、一致性、隔离性和持久性(ACID),维护数据完整性。文章详细讲解了声明式事务(@Transactional注解)和编程式事务(TransactionTemplate、PlatformTransactionManager)的区别与用法,并探讨了事务传播行为(如REQUIRED、REQUIRES_NEW等)及隔离级别(如READ_COMMITTED、REPEATABLE_READ)。
72 1
|
8天前
|
人工智能 Java 决策智能
Spring AI Alibaba Graph:多智能体框架实践
Spring AI Alibaba 是一个面向 Java 开发者的开源人工智能框架,旨在简化 AI 应用开发。本文重点介绍其 Graph 组件,用于解决工作流与多智能体协作问题。Graph 组件通过声明式编程接口,提供统一的上下文管理、消息记忆、人工确认节点等功能,支持复杂 AI 应用的构建。
|
2月前
|
Java 开发者 微服务
Spring Cloud OpenFeign详解与实践
总结起来说,Spring Cloud OpenFeign提供了一种简单易懂且高效的方式去实现微服务之间通信.它隐藏了许多复杂性,并且允许开发者以声明式方式编写HTTP客户端代码.如果你正在开发基于Spring Cloud 的微服务架构系统,Spring Cloud Open Feign是一个非常好用且强大工具.
178 33
|
2月前
|
JSON 前端开发 Java
深入理解 Spring Boot 中日期时间格式化:@DateTimeFormat 与 @JsonFormat 完整实践
在 Spring Boot 开发中,处理前后端日期交互是一个常见问题。本文通过 **@DateTimeFormat** 和 **@JsonFormat** 两个注解,详细讲解了如何解析前端传来的日期字符串以及以指定格式返回日期数据。文章从实际案例出发,结合代码演示两者的使用场景与注意事项,解决解析失败、时区偏差等问题,并提供全局配置与局部注解的实践经验。帮助开发者高效应对日期时间格式化需求,提升开发效率。
394 2
|
2月前
|
存储 安全 Java
Spring Security 入门与详解
Spring Security 是 Spring 框架中的核心安全模块,提供认证、授权及防护功能。本文详解其核心概念,包括认证(Authentication)、授权(Authorization)和过滤器链(Security Filter Chain)。同时,通过代码示例介绍基本配置,如 PasswordEncoder、UserDetailsService 和自定义登录页面等。最后总结常见问题与解决方法,助你快速掌握 Spring Security 的使用与优化。
285 0

相关产品

  • 云消息队列 MQ