Rocketmq-spring入门与实践

简介: 本场景带您体验如何在 Spring 生态中优雅地使用 Apache RocketMQ,感受最受欢迎业务开发框架与最受欢迎消息平台结合的魅力。

Rocketmq-spring入门与实践

1. 创建资源

开始实验之前,您需要先创建ECS实例资源。

  1. 在实验室页面,单击创建资源
  2. (可选)在实验室页面左侧导航栏中,单击云产品资源列表,可查看本次实验资源相关信息(例如IP地址、用户信息等)。

说明 资源创建过程需要1~3分钟。


2. 启动名称服务器和代理

本步骤指导您如何在已搭建好RocketMQ实例中启动名称服务器和代理。

  1. 执行如下命令,切换目录至rocketmq-4.9.3/bin下。
cd /root/rocketmq-4.9.3/bin
  1. 执行如下命令,启动服务器。

说明 按CTRL+C可结束当前进程。

nohup sh mqnamesrv &

tail -f ~/logs/rocketmqlogs/namesrv.log

若启动代理失败,解决方案如下。

  1. 执行如下命令,查看 mqnamesrv 进程。
ps -ef |grep mqnamesrv

  1. 执行如下命令,清除掉该进程(该命令进程编号以上图示例)。
kill -9 6987
  1. 执行如下命令,清除Java进程。
sh mqshutdown namesrv
sh mqshutdown broker

  1. 执行如下命令,启动代理。

说明 按CTRL+C可结束当前进程。

nohup sh mqbroker -n localhost:9876 &

tail -f ~/logs/rocketmqlogs/broker.log

4.执行如下命令,创建topic。

cd /root/rocketmq-4.9.3/bin
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendDataType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendOperationType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t syncSendOrderly
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendAndReceive
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t broadcastConsumer
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t transactionMessage


3. 启动 java 工程

本步骤指导您如何启动java工程。

  1. 执行如下命令,启动Java工程。

说明 首次运行可能要花费3分钟左右的编译时间。

cd /root/rocketmq-handson-apply
mvn install -Dmaven.test.skip
export local=$RANDOM
java -jar  rocketmq-spring/target/rocketmq-spring-1.0.0.jar

启动成功,返回信息如下。

  1. 单击当前页面右上角,再开启一个会话


4. 访问并测试执行效果

本步骤指导您如何进行测试,并查看其相应效果。

  1. 消息类型测试。
  1. 发送对象。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendObject?id=1&name=xiaoming&action=go2&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 发送message。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendMessage?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 发送List。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendCollection?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 测试不同发送行为。
  1. 同步发送。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/syncSend?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 异步发送。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/asyncSend?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. OneWay发送。

执行如下命令, 向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/oneWaySend?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 发送事务消息。

执行如下命令, 向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=1"
wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=2"
wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=3"

再观察会话1中的日志,会观察到如下的输出。

  1. 发送同步RPC消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendAndReceive?id=2&name=xiaoming&action=des&ags=3"

预期返回结果如下。

再观察会话1中的日志,会观察到如下的输出。


5. 细节点

本步骤将讲解rocketmq-spring的tag设置与原生不同以及关于 Message 对象的 header。

  1. rocketmq-spring的tag设置与原生不同。

现在把topic与tag合成了destination。destination=topic or topic":"tag。实例请看MessageSendController第152行。

  1. 关于 Message 对象的 header。

用于为消息提供附加信息,同时降低了消息体反序列化带来的额外开销。

  1. RocketMQ默认请求头。

请求头

作用

类型

RocketMQHeaders.KEYS

消息key,为发送消息设定keys

string


6. 生产者

本步骤将讲解生产者的创捷和各属性信息。

  1. 创建生产者。
  1. 默认生产者创建如下。

  1. 自定义生产者创建如下。

  1. 继承RocketMQTemplate对象。
  2. 使用ExtRocketMQTemplateConfiguration 注解。

配置名

作用

类型

默认值

是否必须

存在配置文件

默认配置名

存在注解

nameServer

注册服务地址

String

rocketmq.name-server

group

生产组

String

rocketmq.producer.group

sendMessageTimeout

发送超时

int

3000

rocketmq.producer.send-message-timeout

compressMessageBodyThreshold

数据压缩限制

int

1024*4

rocketmq.producer.compress-message-body-threshold

retryTimesWhenSendFailed

同步发送失败重试次数

int

2

rocketmq.producer.retry-times-when-send-failed

retryTimesWhenSendAsyncFailed

异步发送失败重试次数

int

2

rocketmq.producer.retry-times-when-send-async-fsailed

retryNextServer

内部发送失败时重试另一个代理

boolean

false

rocketmq.producer.retry-next-server

maxMessageSize

消息最大字节数

int

4M

rocketmq.producer.max-message-size

enableMsgTrace

开启消息轨迹

boolean

true

rocketmq.producer.enable-msg-trace

customizedTraceTopic

消息轨迹topic

String

RMQSYSTRACE_TOPIC

rocketmq.producer.customized-trace-topic

accessKey

acl需要的ak

String

rocketmq.producer.access-key

secretKey

acl需要的sk

String

  1. 配置属性信息。
  2. 声明自定义消费者的全局变量,并使用@Autowired注解。
  3. 消息生产。
  4. 发送形参说明如下。

类型

说明

destination

String

消息topic或者topic与tag组合

message

Message

发送对象

payload

Object

发送对象

timeout

long

发送超时时长,如果没有,使用全局配置或者默认配置

delayLevel

int

数据压缩级别

messages

发送集合消息

集合内部数据只能是message或者子类

hashKey

String

循序消费的消息需要, 用于确定消息的队列

sendCallback

SendCallback

异步发送回调对象

rocketMQLocalRequestCallback

RocketMQLocalRequestCallback

异步RPC回调对象

type

Type

同步RPC消息返回对象

  1. 消息类型解读。

消息类型

出处

是否返回SendResult

优势

劣势

Object

rocketmq-spring

简单,方便

扩展度低

message

rocketmq-spring

可以扩展

相对object使用复杂

convertAndSend

spring-cloud-steam

spring标准

说明 三种消息都是使用convert进行数据序列化。本质上一样,不建议使用convertAndSend方式。

  1. 生产方式。

发送方式

循序消息

RPC消息

批量消息

事务消息

同步

syncSendOrderly

支持(sendAndReceive)

支持

sendMessageInTransaction

异步

asyncSendOrderly

支持(sendAndReceive)

不支持

不支持

OneWay

sendOneWayOrderly

不支持

不支持

不支持

  1. 事务消息。
  1. 实现 RocketMQLocalTransactionListener 接口。
  2. 配置 @RocketMQTransactionListener 注解。
  3. rocketMQTemplateBeanName 拦截的 RocketMQTemplate 的事务,值对应ExtRocketMQTemplateConfiguration.value
  1. 实现接口方法。

实验链接:https://developer.aliyun.com/adc/scenario/470077b9392a482b91fa1a2bf4c423fe

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
16天前
|
存储 安全 Java
Spring Security 入门
Spring Security 是 Spring 框架中的安全模块,提供强大的认证和授权功能,支持防止常见攻击(如 CSRF 和会话固定攻击)。它通过过滤器链拦截请求,核心概念包括认证、授权和自定义过滤器。配置方面,涉及密码加密、用户信息服务、认证提供者及过滤器链设置。示例代码展示了如何配置登录、注销、CSRF防护等。常见问题包括循环重定向、静态资源被拦截和登录失败未返回错误信息,解决方法需确保路径正确和添加错误提示逻辑。
Spring Security 入门
|
3天前
|
消息中间件 存储 监控
活动实践 | 快速体验云消息队列RocketMQ版
本方案介绍如何使用阿里云消息队列RocketMQ版Serverless实例进行消息管理。主要步骤包括获取接入点、创建Topic和订阅组、收发消息、查看消息轨迹及仪表盘监控。通过这些操作,用户可以轻松实现消息的全生命周期管理,确保消息收发的高效与可靠。此外,还提供了消费验证、下载消息等功能,方便用户进行详细的消息处理与调试。
|
4天前
|
人工智能 自然语言处理 Java
Spring Cloud Alibaba AI 入门与实践
本文将介绍 Spring Cloud Alibaba AI 的基本概念、主要特性和功能,并演示如何完成一个在线聊天和在线画图的 AI 应用。
97 7
|
21天前
|
消息中间件 Java 开发工具
【实践】快速学会使用云消息队列RabbitMQ版
本次分享的主题是快速学会使用云消息队列RabbitMQ版的实践。内容包括:如何创建和配置RabbitMQ实例,如Vhost、Exchange、Queue等;如何通过阿里云控制台管理静态用户名密码和AccessKey;以及如何使用RabbitMQ开源客户端进行消息生产和消费测试。最后介绍了实验资源的回收步骤,确保资源合理利用。通过详细的操作指南,帮助用户快速上手并掌握RabbitMQ的使用方法。
87 10
|
1月前
|
Java 开发者 微服务
Spring Boot 入门:简化 Java Web 开发的强大工具
Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级别的基于Spring框架的应用程序。它旨在简化Spring应用的初始搭建以及开发过程。
67 6
Spring Boot 入门:简化 Java Web 开发的强大工具
|
23天前
|
XML Java 数据格式
🌱 深入Spring的心脏:Bean配置的艺术与实践 🌟
本文深入探讨了Spring框架中Bean配置的奥秘,从基本概念到XML配置文件的使用,再到静态工厂方式实例化Bean的详细步骤,通过实际代码示例帮助读者更好地理解和应用Spring的Bean配置。希望对你的Spring开发之旅有所助益。
89 3
|
1月前
|
XML Java 数据格式
Spring Core核心类库的功能与应用实践分析
【12月更文挑战第1天】大家好,今天我们来聊聊Spring Core这个强大的核心类库。Spring Core作为Spring框架的基础,提供了控制反转(IOC)和依赖注入(DI)等核心功能,以及企业级功能,如JNDI和定时任务等。通过本文,我们将从概述、功能点、背景、业务点、底层原理等多个方面深入剖析Spring Core,并通过多个Java示例展示其应用实践,同时指出对应实践的优缺点。
57 14
|
1月前
|
缓存 Java 数据库连接
Spring框架中的事件机制:深入理解与实践
Spring框架是一个广泛使用的Java企业级应用框架,提供了依赖注入、面向切面编程(AOP)、事务管理、Web应用程序开发等一系列功能。在Spring框架中,事件机制是一种重要的通信方式,它允许不同组件之间进行松耦合的通信,提高了应用程序的可维护性和可扩展性。本文将深入探讨Spring框架中的事件机制,包括不同类型的事件、底层原理、应用实践以及优缺点。
67 8
|
1月前
|
负载均衡 Java 开发者
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
150 5
|
1月前
|
XML 前端开发 安全
Spring MVC:深入理解与应用实践
Spring MVC是Spring框架提供的一个用于构建Web应用程序的Model-View-Controller(MVC)实现。它通过分离业务逻辑、数据、显示来组织代码,使得Web应用程序的开发变得更加简洁和高效。本文将从概述、功能点、背景、业务点、底层原理等多个方面深入剖析Spring MVC,并通过多个Java示例展示其应用实践,同时指出对应实践的优缺点。
85 2

相关产品

  • 云消息队列 MQ