Rocketmq-spring入门与实践

简介: 本场景带您体验如何在 Spring 生态中优雅地使用 Apache RocketMQ,感受最受欢迎业务开发框架与最受欢迎消息平台结合的魅力。

Rocketmq-spring入门与实践

1. 创建资源

开始实验之前,您需要先创建ECS实例资源。

  1. 在实验室页面,单击创建资源
  2. (可选)在实验室页面左侧导航栏中,单击云产品资源列表,可查看本次实验资源相关信息(例如IP地址、用户信息等)。

说明 资源创建过程需要1~3分钟。


2. 启动名称服务器和代理

本步骤指导您如何在已搭建好RocketMQ实例中启动名称服务器和代理。

  1. 执行如下命令,切换目录至rocketmq-4.9.3/bin下。
cd /root/rocketmq-4.9.3/bin
  1. 执行如下命令,启动服务器。

说明 按CTRL+C可结束当前进程。

nohup sh mqnamesrv &

tail -f ~/logs/rocketmqlogs/namesrv.log

若启动代理失败,解决方案如下。

  1. 执行如下命令,查看 mqnamesrv 进程。
ps -ef |grep mqnamesrv

  1. 执行如下命令,清除掉该进程(该命令进程编号以上图示例)。
kill -9 6987
  1. 执行如下命令,清除Java进程。
sh mqshutdown namesrv
sh mqshutdown broker

  1. 执行如下命令,启动代理。

说明 按CTRL+C可结束当前进程。

nohup sh mqbroker -n localhost:9876 &

tail -f ~/logs/rocketmqlogs/broker.log

4.执行如下命令,创建topic。

cd /root/rocketmq-4.9.3/bin
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendDataType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendOperationType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t syncSendOrderly
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendAndReceive
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t broadcastConsumer
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t transactionMessage


3. 启动 java 工程

本步骤指导您如何启动java工程。

  1. 执行如下命令,启动Java工程。

说明 首次运行可能要花费3分钟左右的编译时间。

cd /root/rocketmq-handson-apply
mvn install -Dmaven.test.skip
export local=$RANDOM
java -jar  rocketmq-spring/target/rocketmq-spring-1.0.0.jar

启动成功,返回信息如下。

  1. 单击当前页面右上角,再开启一个会话


4. 访问并测试执行效果

本步骤指导您如何进行测试,并查看其相应效果。

  1. 消息类型测试。
  1. 发送对象。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendObject?id=1&name=xiaoming&action=go2&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 发送message。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendMessage?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 发送List。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendCollection?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 测试不同发送行为。
  1. 同步发送。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/syncSend?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 异步发送。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/asyncSend?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. OneWay发送。

执行如下命令, 向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/oneWaySend?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 发送事务消息。

执行如下命令, 向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=1"
wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=2"
wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 发送同步RPC消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendAndReceive?id=2&name=xiaoming&action=des&ags=3"

预期返回结果如下。

再观察会话1中的日志,会观察到如下的输出。


5. 细节点

本步骤将讲解rocketmq-spring的tag设置与原生不同以及关于 Message 对象的 header。

  1. rocketmq-spring的tag设置与原生不同。

现在把topic与tag合成了destination。destination=topic or topic":"tag。实例请看MessageSendController第152行。

  1. 关于 Message 对象的 header。

用于为消息提供附加信息,同时降低了消息体反序列化带来的额外开销。

  1. RocketMQ默认请求头。

请求头

作用

类型

RocketMQHeaders.KEYS

消息key,为发送消息设定keys

string


6. 生产者

本步骤将讲解生产者的创捷和各属性信息。

  1. 创建生产者。
  1. 默认生产者创建如下。

  1. 自定义生产者创建如下。

  1. 继承RocketMQTemplate对象。
  2. 使用ExtRocketMQTemplateConfiguration 注解。

配置名

作用

类型

默认值

是否必须

存在配置文件

默认配置名

存在注解

nameServer

注册服务地址

String

rocketmq.name-server

group

生产组

String

rocketmq.producer.group

sendMessageTimeout

发送超时

int

3000

rocketmq.producer.send-message-timeout

compressMessageBodyThreshold

数据压缩限制

int

1024*4

rocketmq.producer.compress-message-body-threshold

retryTimesWhenSendFailed

同步发送失败重试次数

int

2

rocketmq.producer.retry-times-when-send-failed

retryTimesWhenSendAsyncFailed

异步发送失败重试次数

int

2

rocketmq.producer.retry-times-when-send-async-fsailed

retryNextServer

内部发送失败时重试另一个代理

boolean

false

rocketmq.producer.retry-next-server

maxMessageSize

消息最大字节数

int

4M

rocketmq.producer.max-message-size

enableMsgTrace

开启消息轨迹

boolean

true

rocketmq.producer.enable-msg-trace

customizedTraceTopic

消息轨迹topic

String

RMQSYSTRACE_TOPIC

rocketmq.producer.customized-trace-topic

accessKey

acl需要的ak

String

rocketmq.producer.access-key

secretKey

acl需要的sk

String

  1. 配置属性信息。
  2. 声明自定义消费者的全局变量,并使用@Autowired注解。
  3. 消息生产。
  4. 发送形参说明如下。

类型

说明

destination

String

消息topic或者topic与tag组合

message

Message

发送对象

payload

Object

发送对象

timeout

long

发送超时时长,如果没有,使用全局配置或者默认配置

delayLevel

int

数据压缩级别

messages

发送集合消息

集合内部数据只能是message或者子类

hashKey

String

循序消费的消息需要, 用于确定消息的队列

sendCallback

SendCallback

异步发送回调对象

rocketMQLocalRequestCallback

RocketMQLocalRequestCallback

异步RPC回调对象

type

Type

同步RPC消息返回对象

  1. 消息类型解读。

消息类型

出处

是否返回SendResult

优势

劣势

Object

rocketmq-spring

简单,方便

扩展度低

message

rocketmq-spring

可以扩展

相对object使用复杂

convertAndSend

spring-cloud-steam

spring标准

说明 三种消息都是使用convert进行数据序列化。本质上一样,不建议使用convertAndSend方式。

  1. 生产方式。

发送方式

循序消息

RPC消息

批量消息

事务消息

同步

syncSendOrderly

支持(sendAndReceive)

支持

sendMessageInTransaction

异步

asyncSendOrderly

支持(sendAndReceive)

不支持

不支持

OneWay

sendOneWayOrderly

不支持

不支持

不支持

  1. 事务消息。
  1. 实现 RocketMQLocalTransactionListener 接口。
  2. 配置 @RocketMQTransactionListener 注解。
  3. rocketMQTemplateBeanName 拦截的 RocketMQTemplate 的事务,值对应ExtRocketMQTemplateConfiguration.value
  1. 实现接口方法。

实验链接:https://developer.aliyun.com/adc/scenario/470077b9392a482b91fa1a2bf4c423fe

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
存储 人工智能 Java
AI 超级智能体全栈项目阶段四:学术分析 AI 项目 RAG 落地指南:基于 Spring AI 的本地与阿里云知识库实践
本文介绍RAG(检索增强生成)技术,结合Spring AI与本地及云知识库实现学术分析AI应用,利用阿里云Qwen-Plus模型提升回答准确性与可信度。
798 90
AI 超级智能体全栈项目阶段四:学术分析 AI 项目 RAG 落地指南:基于 Spring AI 的本地与阿里云知识库实践
|
3月前
|
前端开发 Java API
利用 Spring WebFlux 技术打造高效非阻塞 API 的完整开发方案与实践技巧
本文介绍了如何使用Spring WebFlux构建高效、可扩展的非阻塞API,涵盖响应式编程核心概念、技术方案设计及具体实现示例,适用于高并发场景下的API开发。
346 0
|
1月前
|
人工智能 监控 Java
Spring AI Alibaba实践|后台定时Agent
基于Spring AI Alibaba框架,可构建自主运行的AI Agent,突破传统Chat模式限制,支持定时任务、事件响应与人工协同,实现数据采集、分析到决策的自动化闭环,提升企业智能化效率。
Spring AI Alibaba实践|后台定时Agent
|
1月前
|
XML Java 应用服务中间件
【SpringBoot(一)】Spring的认知、容器功能讲解与自动装配原理的入门,带你熟悉Springboot中基本的注解使用
SpringBoot专栏开篇第一章,讲述认识SpringBoot、Bean容器功能的讲解、自动装配原理的入门,还有其他常用的Springboot注解!如果想要了解SpringBoot,那么就进来看看吧!
334 2
|
6月前
|
人工智能 Java API
Spring AI 实战|Spring AI入门之DeepSeek调用
本文介绍了Spring AI框架如何帮助Java开发者轻松集成和使用大模型API。文章从Spring AI的初探开始,探讨了其核心能力及应用场景,包括手动与自动发起请求、流式响应实现打字机效果,以及兼容不同AI服务(如DeepSeek、通义千问)的方法。同时,还详细讲解了如何在生产环境中添加监控以优化性能和成本管理。通过Spring AI,开发者可以简化大模型调用流程,降低复杂度,为企业智能应用开发提供强大支持。最后,文章展望了Spring AI在未来AI时代的重要作用,鼓励开发者积极拥抱这一技术变革。
2427 71
Spring AI 实战|Spring AI入门之DeepSeek调用
|
3月前
|
Java 应用服务中间件 开发者
Spring Boot 技术详解与应用实践
本文档旨在全面介绍 Spring Boot 这一广泛应用于现代企业级应用开发的框架。内容将涵盖 Spring Boot 的核心概念、核心特性、项目自动生成与结构解析、基础功能实现(如 RESTful API、数据访问)、配置管理以及最终的构建与部署。通过本文档,读者将能够理解 Spring Boot 如何简化 Spring 应用的初始搭建和开发过程,并掌握其基本使用方法。
331 2
|
5月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
4069 9
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
4月前
|
前端开发 Java API
基于 Spring Boot 3 与 React 的 Java 学生信息管理系统从入门到精通实操指南
本项目基于Spring Boot 3与React 18构建学生信息管理系统,涵盖前后端开发、容器化部署及测试监控,提供完整实操指南与源码,助你掌握Java全栈开发技能。
224 0
|
4月前
|
缓存 安全 Java
Spring 框架核心原理与实践解析
本文详解 Spring 框架核心知识,包括 IOC(容器管理对象)与 DI(容器注入依赖),以及通过注解(如 @Service、@Autowired)声明 Bean 和注入依赖的方式。阐述了 Bean 的线程安全(默认单例可能有安全问题,需业务避免共享状态或设为 prototype)、作用域(@Scope 注解,常用 singleton、prototype 等)及完整生命周期(实例化、依赖注入、初始化、销毁等步骤)。 解析了循环依赖的解决机制(三级缓存)、AOP 的概念(公共逻辑抽为切面)、底层动态代理(JDK 与 Cglib 的区别)及项目应用(如日志记录)。介绍了事务的实现(基于 AOP
170 0

相关产品

  • 云消息队列 MQ