Rocketmq-spring入门与实践

本文涉及的产品
云消息队列RocketMQ,TPS总和2000次/秒
简介: 本场景带您体验如何在 Spring 生态中优雅地使用 Apache RocketMQ,感受最受欢迎业务开发框架与最受欢迎消息平台结合的魅力。
+关注继续查看

Rocketmq-spring入门与实践

1. 创建资源

开始实验之前,您需要先创建ECS实例资源。

  1. 在实验室页面,单击创建资源
  2. (可选)在实验室页面左侧导航栏中,单击云产品资源列表,可查看本次实验资源相关信息(例如IP地址、用户信息等)。

说明 资源创建过程需要1~3分钟。


2. 启动名称服务器和代理

本步骤指导您如何在已搭建好RocketMQ实例中启动名称服务器和代理。

  1. 执行如下命令,切换目录至rocketmq-4.9.3/bin下。
cd /root/rocketmq-4.9.3/bin
  1. 执行如下命令,启动服务器。

说明 按CTRL+C可结束当前进程。

nohup sh mqnamesrv &

image

tail -f ~/logs/rocketmqlogs/namesrv.log

image

若启动代理失败,解决方案如下。

  1. 执行如下命令,查看 mqnamesrv 进程。
ps -ef |grep mqnamesrv

image

  1. 执行如下命令,清除掉该进程(该命令进程编号以上图示例)。
kill -9 6987
  1. 执行如下命令,清除Java进程。
sh mqshutdown namesrv
sh mqshutdown broker

image

  1. 执行如下命令,启动代理。

说明 按CTRL+C可结束当前进程。

nohup sh mqbroker -n localhost:9876 &

image

tail -f ~/logs/rocketmqlogs/broker.log

image

4.执行如下命令,创建topic。

cd /root/rocketmq-4.9.3/bin
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendDataType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendOperationType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t syncSendOrderly
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendAndReceive
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t broadcastConsumer
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t transactionMessage


3. 启动 java 工程

本步骤指导您如何启动java工程。

  1. 执行如下命令,启动Java工程。

说明 首次运行可能要花费3分钟左右的编译时间。

cd /root/rocketmq-handson-apply
mvn install -Dmaven.test.skip
export local=$RANDOM
java -jar  rocketmq-spring/target/rocketmq-spring-1.0.0.jar

启动成功,返回信息如下。

image

  1. 单击当前页面右上角,再开启一个会话

image


4. 访问并测试执行效果

本步骤指导您如何进行测试,并查看其相应效果。

  1. 消息类型测试。
    1. 发送对象。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendObject?id=1&name=xiaoming&action=go2&ags=3"

再观察会话1中的日志,会观察到如下的输出。

image

  1. 发送message。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendMessage?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

image

  1. 发送List。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendCollection?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

image

  1. 测试不同发送行为。
    1. 同步发送。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/syncSend?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

image

  1. 异步发送。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/asyncSend?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

image

  1. OneWay发送。

执行如下命令, 向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/oneWaySend?id=2&name=xiaoming&action=des&ags=3"

再观察会话1中的日志,会观察到如下的输出。

image

  1. 发送事务消息。

执行如下命令, 向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=1"
wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=2"
wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=3"

再观察会话1中的日志,会观察到如下的输出。

image

  1. 发送同步RPC消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "http://127.0.0.1:28082/messageSend/sendAndReceive?id=2&name=xiaoming&action=des&ags=3"

预期返回结果如下。

image

再观察会话1中的日志,会观察到如下的输出。

image


5. 细节点

本步骤将讲解rocketmq-spring的tag设置与原生不同以及关于 Message 对象的 header。

  1. rocketmq-spring的tag设置与原生不同。

现在把topic与tag合成了destination。destination=topic or topic":"tag。实例请看MessageSendController第152行。

  1. 关于 Message 对象的 header。

用于为消息提供附加信息,同时降低了消息体反序列化带来的额外开销。

  1. RocketMQ默认请求头。

请求头

作用

类型

RocketMQHeaders.KEYS

消息key,为发送消息设定keys

string


6. 生产者

本步骤将讲解生产者的创捷和各属性信息。

  1. 创建生产者。
    1. 默认生产者创建如下。

image

  1. 自定义生产者创建如下。

image

  1. 继承RocketMQTemplate对象。
  2. 使用ExtRocketMQTemplateConfiguration 注解。

配置名

作用

类型

默认值

是否必须

存在配置文件

默认配置名

存在注解

nameServer

注册服务地址

String

rocketmq.name-server

group

生产组

String

rocketmq.producer.group

sendMessageTimeout

发送超时

int

3000

rocketmq.producer.send-message-timeout

compressMessageBodyThreshold

数据压缩限制

int

1024*4

rocketmq.producer.compress-message-body-threshold

retryTimesWhenSendFailed

同步发送失败重试次数

int

2

rocketmq.producer.retry-times-when-send-failed

retryTimesWhenSendAsyncFailed

异步发送失败重试次数

int

2

rocketmq.producer.retry-times-when-send-async-fsailed

retryNextServer

内部发送失败时重试另一个代理

boolean

false

rocketmq.producer.retry-next-server

maxMessageSize

消息最大字节数

int

4M

rocketmq.producer.max-message-size

enableMsgTrace

开启消息轨迹

boolean

true

rocketmq.producer.enable-msg-trace

customizedTraceTopic

消息轨迹topic

String

RMQSYSTRACE_TOPIC

rocketmq.producer.customized-trace-topic

accessKey

acl需要的ak

String

rocketmq.producer.access-key

secretKey

acl需要的sk

String

  1. 配置属性信息。
  2. 声明自定义消费者的全局变量,并使用@Autowired注解。
  3. 消息生产。
  4. 发送形参说明如下。

类型

说明

destination

String

消息topic或者topic与tag组合

message

Message

发送对象

payload

Object

发送对象

timeout

long

发送超时时长,如果没有,使用全局配置或者默认配置

delayLevel

int

数据压缩级别

messages

发送集合消息

集合内部数据只能是message或者子类

hashKey

String

循序消费的消息需要, 用于确定消息的队列

sendCallback

SendCallback

异步发送回调对象

rocketMQLocalRequestCallback

RocketMQLocalRequestCallback

异步RPC回调对象

type

Type

同步RPC消息返回对象

  1. 消息类型解读。

消息类型

出处

是否返回SendResult

优势

劣势

Object

rocketmq-spring

简单,方便

扩展度低

message

rocketmq-spring

可以扩展

相对object使用复杂

convertAndSend

spring-cloud-steam

spring标准

说明 三种消息都是使用convert进行数据序列化。本质上一样,不建议使用convertAndSend方式。

  1. 生产方式。

发送方式

循序消息

RPC消息

批量消息

事务消息

同步

syncSendOrderly

支持(sendAndReceive)

支持

sendMessageInTransaction

异步

asyncSendOrderly

支持(sendAndReceive)

不支持

不支持

OneWay

sendOneWayOrderly

不支持

不支持

不支持

  1. 事务消息。
    1. 实现 RocketMQLocalTransactionListener 接口。
    2. 配置 @RocketMQTransactionListener 注解。
    3. rocketMQTemplateBeanName 拦截的 RocketMQTemplate 的事务,值对应ExtRocketMQTemplateConfiguration.value
  1. 实现接口方法。

image

实验链接:https://developer.aliyun.com/adc/scenario/470077b9392a482b91fa1a2bf4c423fe

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 弹性计算 Java
Spring-cloud-bus-rocketmq入门与实践
本场景带您体验如何在 Spring 生态中优雅地使用 Apache RocketMQ,感受最受欢迎业务开发框架与最受欢迎消息平台结合的魅力。
178 0
|
4月前
|
消息中间件 Java 程序员
SpringBoot整合RocketMQ,尝尝几大高级特性!
作为一名程序员,您一定熟悉RocketMQ的功能,包括支持事务、顺序和延迟消息等。在程序员界有一句名言,“Talk is cheap. Show me the code” 。本文将通过实际案例来引出解决方案,并通过代码实现,让您在学习本节的过程中能够确切地掌握实际编码技能
84 0
SpringBoot整合RocketMQ,尝尝几大高级特性!
|
5月前
|
消息中间件 Java API
RocketMQ极简入门-在SpringBoot中使用RocketMQ
现在开发项目都是基于SpringBoot,新项目很少使用Spring,所以我们学习一门技术除了要会原生API,还不得不考虑和SpringBoot集成,本篇文章为SpirngBoot整合RocketMQ案例
167 0
|
5月前
|
消息中间件 Java API
八.RocketMQ极简入门-在SpringBoot中使用RocketMQ
RocketMQ极简入门-在SpringBoot中使用RocketMQ
|
8月前
|
消息中间件 Java 大数据
rocketmq-spring : 实战与源码解析一网打尽
RocketMQ 是大家耳熟能详的消息队列,开源项目 rocketmq-spring 可以帮助开发者在 Spring Boot 项目中快速整合 RocketMQ。 这篇文章会介绍 Spring Boot 项目使用 rocketmq-spring SDK 实现消息收发的操作流程,同时笔者会从开发者的角度解读 SDK 的设计逻辑。
rocketmq-spring : 实战与源码解析一网打尽
|
10月前
|
消息中间件 Java RocketMQ
RocketMQ-Spring学习
们可以它的一系列动作操作,会首先将相关配置MessageConverterConfiguration、ListenerContainerConfiguration、ExtProducerResetConfiguration、ExtConsumerResetConfiguration、RocketMQTransactionConfiguration、RocketMQListenerConfiguration进行导入。在自动注入前处理事务配置RocketMQTransactionConfiguration,在自动注入后处理消息转换器配置MessageConverterConfiguration。
821 0
RocketMQ-Spring学习
|
10月前
|
消息中间件 Java RocketMQ
《在 Spring 生态中玩转 RocketMQ》电子版地址
本书凭借作者多年技术经验,通过图文和实操地方式教各位开发者玩转在 Spring 生态中使用 RocketMQ 的三种主流方式,让各位开发者对几种经典场景有宏观的了解。此外,还详细介绍上述各个项目的具体使用方法和应用场景,真正地在 Spring 生态中玩转 RocketMQ!
59 0
《在 Spring 生态中玩转 RocketMQ》电子版地址
|
10月前
|
消息中间件 Java 中间件
如何在 Spring 生态中玩转 RocketMQ?
RocketMQ 作为业务消息的首选,在消息和流处理领域被广泛应用。而微服务生态 Spring 框架也是业务开发中最受欢迎的框架,两者的完美契合使得 RocketMQ 成为 Spring Messaging 实现中最受欢迎的消息实现。本文展示了 5 种在 Spring 生态中文玩转 RocketMQ 的方式,并描述了每个项目的特点和使用场景。
135 0
如何在 Spring 生态中玩转 RocketMQ?
|
10月前
|
消息中间件 Cloud Native Java
RocketMQ-Spring 毕业两周年,为什么能成为 Spring 生态中最受欢迎的 messaging 实现?
2019 年 1 月,孵化 6 个月的 RocketMQ-Spring 作为 Apache RocketMQ 的子项目正式毕业,发布了第一个 Release 版本 2.0.1。该项目是把 RocketMQ 的客户端使用 Spring Boot 的方式进行了封装,可以让用户通过简单的 annotation 和标准的 Spring Messaging API 编写代码来进行消息的发送和消费。当时 RocketMQ 社区同学请 Spring 社区的同学对 RocketMQ-Spring 代码进行 review,引出一段罗美琪(RocketMQ)和春波特(Spring Boot)的故事。
116 0
 RocketMQ-Spring 毕业两周年,为什么能成为 Spring 生态中最受欢迎的 messaging 实现?
|
消息中间件 Kubernetes Java
在 Spring 生态中玩转 RocketMQ
RocketMQ 作为业务消息的首选,在消息和流处理领域被广泛应用。而 Spring 框架也是业务开发中最受欢迎的框架,两者的完美契合使得 RocketMQ 成为 Spring Messaging 实现中最受欢迎的消息实现。一起来看看如何在 Spring 生态中玩转 RocketMQ 吧~
150 0
在 Spring 生态中玩转 RocketMQ
相关产品
云消息队列 Kafka 版
消息服务
云消息队列 MQ
推荐文章
更多