Rocketmq-spring入门与实践
1. 创建资源
开始实验之前,您需要先创建ECS实例资源。
- 在实验室页面,单击创建资源。
- (可选)在实验室页面左侧导航栏中,单击云产品资源列表,可查看本次实验资源相关信息(例如IP地址、用户信息等)。
说明 资源创建过程需要1~3分钟。
2. 启动名称服务器和代理
本步骤指导您如何在已搭建好RocketMQ实例中启动名称服务器和代理。
- 执行如下命令,切换目录至rocketmq-4.9.3/bin下。
cd /root/rocketmq-4.9.3/bin
- 执行如下命令,启动服务器。
说明 按CTRL+C可结束当前进程。

tail -f ~/logs/rocketmqlogs/namesrv.log

若启动代理失败,解决方案如下。
- 执行如下命令,查看 mqnamesrv 进程。

- 执行如下命令,清除掉该进程(该命令进程编号以上图示例)。
- 执行如下命令,清除Java进程。
sh mqshutdown namesrv
sh mqshutdown broker

- 执行如下命令,启动代理。
说明 按CTRL+C可结束当前进程。
nohup sh mqbroker -n localhost:9876 &

tail -f ~/logs/rocketmqlogs/broker.log

4.执行如下命令,创建topic。
cd /root/rocketmq-4.9.3/bin
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendDataType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendOperationType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t syncSendOrderly
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendAndReceive
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t broadcastConsumer
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t transactionMessage
3. 启动 java 工程
本步骤指导您如何启动java工程。
- 执行如下命令,启动Java工程。
说明 首次运行可能要花费3分钟左右的编译时间。
cd /root/rocketmq-handson-apply
mvn install -Dmaven.test.skip
export local=$RANDOM
java -jar rocketmq-spring/target/rocketmq-spring-1.0.0.jar
启动成功,返回信息如下。

- 单击当前页面右上角,再开启一个会话

4. 访问并测试执行效果
本步骤指导您如何进行测试,并查看其相应效果。
- 消息类型测试。
- 发送对象。
执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。
wget -q -O- "http://127.0.0.1:28082/messageSend/sendObject?id=1&name=xiaoming&action=go2&ags=3"
再观察会话1中的日志,会观察到如下的输出。

- 发送message。
执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。
wget -q -O- "http://127.0.0.1:28082/messageSend/sendMessage?id=2&name=xiaoming&action=des&ags=3"
再观察会话1中的日志,会观察到如下的输出。

- 发送List。
执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。
wget -q -O- "http://127.0.0.1:28082/messageSend/sendCollection?id=2&name=xiaoming&action=des&ags=3"
再观察会话1中的日志,会观察到如下的输出。

- 测试不同发送行为。
- 同步发送。
执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。
wget -q -O- "http://127.0.0.1:28082/messageSend/syncSend?id=2&name=xiaoming&action=des&ags=3"
再观察会话1中的日志,会观察到如下的输出。

- 异步发送。
执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。
wget -q -O- "http://127.0.0.1:28082/messageSend/asyncSend?id=2&name=xiaoming&action=des&ags=3"
再观察会话1中的日志,会观察到如下的输出。

- OneWay发送。
执行如下命令, 向刚才启动的程序发送一条简单的请求,以观察响应。
wget -q -O- "http://127.0.0.1:28082/messageSend/oneWaySend?id=2&name=xiaoming&action=des&ags=3"
再观察会话1中的日志,会观察到如下的输出。

- 发送事务消息。
执行如下命令, 向刚才启动的程序发送一条简单的请求,以观察响应。
wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=1"
wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=2"
wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=3"
再观察会话1中的日志,会观察到如下的输出。

- 发送同步RPC消息。
执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。
wget -q -O- "http://127.0.0.1:28082/messageSend/sendAndReceive?id=2&name=xiaoming&action=des&ags=3"
预期返回结果如下。

再观察会话1中的日志,会观察到如下的输出。

5. 细节点
本步骤将讲解rocketmq-spring的tag设置与原生不同以及关于 Message 对象的 header。
- rocketmq-spring的tag设置与原生不同。
现在把topic与tag合成了destination。destination=topic or topic":"tag。实例请看MessageSendController第152行。
- 关于 Message 对象的 header。
用于为消息提供附加信息,同时降低了消息体反序列化带来的额外开销。
- RocketMQ默认请求头。
请求头 | 作用 | 类型 |
RocketMQHeaders.KEYS | 消息key,为发送消息设定keys | string |
6. 生产者
本步骤将讲解生产者的创捷和各属性信息。
- 创建生产者。
- 默认生产者创建如下。

- 自定义生产者创建如下。

- 继承RocketMQTemplate对象。
- 使用ExtRocketMQTemplateConfiguration 注解。
配置名 | 作用 | 类型 | 默认值 | 是否必须 | 存在配置文件 | 默认配置名 | 存在注解 |
nameServer | 注册服务地址 | String | | 是 | 是 | rocketmq.name-server | 是 |
group | 生产组 | String | | 是 | 是 | rocketmq.producer.group | 是 |
sendMessageTimeout | 发送超时 | int | 3000 | 否 | 是 | rocketmq.producer.send-message-timeout | 是 |
compressMessageBodyThreshold | 数据压缩限制 | int | 1024*4 | 否 | 是 | rocketmq.producer.compress-message-body-threshold | |
retryTimesWhenSendFailed | 同步发送失败重试次数 | int | 2 | 否 | 是 | rocketmq.producer.retry-times-when-send-failed | 是 |
retryTimesWhenSendAsyncFailed | 异步发送失败重试次数 | int | 2 | 否 | 是 | rocketmq.producer.retry-times-when-send-async-fsailed | |
retryNextServer | 内部发送失败时重试另一个代理 | boolean | false | 否 | 是 | rocketmq.producer.retry-next-server | 是 |
maxMessageSize | 消息最大字节数 | int | 4M | 否 | 是 | rocketmq.producer.max-message-size | 是 |
enableMsgTrace | 开启消息轨迹 | boolean | true | 否 | 是 | rocketmq.producer.enable-msg-trace | 是 |
customizedTraceTopic | 消息轨迹topic | String | RMQSYSTRACE_TOPIC | 否 | 是 | rocketmq.producer.customized-trace-topic | 是 |
accessKey | acl需要的ak | String | | 否 | 是 | rocketmq.producer.access-key | 是 |
secretKey | acl需要的sk | String | | | | | |
- 配置属性信息。
- 声明自定义消费者的全局变量,并使用@Autowired注解。
- 消息生产。
- 发送形参说明如下。
名 | 类型 | 说明 |
destination | String | 消息topic或者topic与tag组合 |
message | Message | 发送对象 |
payload | Object | 发送对象 |
timeout | long | 发送超时时长,如果没有,使用全局配置或者默认配置 |
delayLevel | int | 数据压缩级别 |
messages | 发送集合消息 | 集合内部数据只能是message或者子类 |
hashKey | String | 循序消费的消息需要, 用于确定消息的队列 |
sendCallback | SendCallback | 异步发送回调对象 |
rocketMQLocalRequestCallback | RocketMQLocalRequestCallback | 异步RPC回调对象 |
type | Type | 同步RPC消息返回对象 |
- 消息类型解读。
消息类型 | 出处 | 是否返回SendResult | 优势 | 劣势 |
Object | rocketmq-spring | 是 | 简单,方便 | 扩展度低 |
message | rocketmq-spring | 是 | 可以扩展 | 相对object使用复杂 |
convertAndSend | spring-cloud-steam | 否 | spring标准 | |
说明 三种消息都是使用convert进行数据序列化。本质上一样,不建议使用convertAndSend方式。
- 生产方式。
发送方式 | 循序消息 | RPC消息 | 批量消息 | 事务消息 |
同步 | syncSendOrderly | 支持(sendAndReceive) | 支持 | sendMessageInTransaction |
异步 | asyncSendOrderly | 支持(sendAndReceive) | 不支持 | 不支持 |
OneWay | sendOneWayOrderly | 不支持 | 不支持 | 不支持 |
- 事务消息。
- 实现 RocketMQLocalTransactionListener 接口。
- 配置 @RocketMQTransactionListener 注解。
- rocketMQTemplateBeanName 拦截的 RocketMQTemplate 的事务,值对应ExtRocketMQTemplateConfiguration.value。
- 实现接口方法。

实验链接:https://developer.aliyun.com/adc/scenario/470077b9392a482b91fa1a2bf4c423fe