Rocketmq-spring入门与实践
1. 创建资源
开始实验之前,您需要先创建ECS实例资源。
- 在实验室页面,单击创建资源。
- (可选)在实验室页面左侧导航栏中,单击云产品资源列表,可查看本次实验资源相关信息(例如IP地址、用户信息等)。
说明 资源创建过程需要1~3分钟。
2. 启动名称服务器和代理
本步骤指导您如何在已搭建好RocketMQ实例中启动名称服务器和代理。
- 执行如下命令,切换目录至rocketmq-4.9.3/bin下。
cd /root/rocketmq-4.9.3/bin
- 执行如下命令,启动服务器。
说明 按CTRL+C可结束当前进程。
nohup sh mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
若启动代理失败,解决方案如下。
- 执行如下命令,查看 mqnamesrv 进程。
ps -ef |grep mqnamesrv
- 执行如下命令,清除掉该进程(该命令进程编号以上图示例)。
kill -9 6987
- 执行如下命令,清除Java进程。
sh mqshutdown namesrv sh mqshutdown broker
- 执行如下命令,启动代理。
说明 按CTRL+C可结束当前进程。
nohup sh mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
4.执行如下命令,创建topic。
cd /root/rocketmq-4.9.3/bin ./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendDataType ./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendOperationType ./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t syncSendOrderly ./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendAndReceive ./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t broadcastConsumer ./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t transactionMessage
3. 启动 java 工程
本步骤指导您如何启动java工程。
- 执行如下命令,启动Java工程。
说明 首次运行可能要花费3分钟左右的编译时间。
cd /root/rocketmq-handson-apply mvn install -Dmaven.test.skip export local=$RANDOM java -jar rocketmq-spring/target/rocketmq-spring-1.0.0.jar
启动成功,返回信息如下。
- 单击当前页面右上角,再开启一个会话
4. 访问并测试执行效果
本步骤指导您如何进行测试,并查看其相应效果。
- 消息类型测试。
- 发送对象。
执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。
wget -q -O- "http://127.0.0.1:28082/messageSend/sendObject?id=1&name=xiaoming&action=go2&ags=3"
再观察会话1中的日志,会观察到如下的输出。
- 发送message。
执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。
wget -q -O- "http://127.0.0.1:28082/messageSend/sendMessage?id=2&name=xiaoming&action=des&ags=3"
再观察会话1中的日志,会观察到如下的输出。
- 发送List。
执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。
wget -q -O- "http://127.0.0.1:28082/messageSend/sendCollection?id=2&name=xiaoming&action=des&ags=3"
再观察会话1中的日志,会观察到如下的输出。
- 测试不同发送行为。
- 同步发送。
执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。
wget -q -O- "http://127.0.0.1:28082/messageSend/syncSend?id=2&name=xiaoming&action=des&ags=3"
再观察会话1中的日志,会观察到如下的输出。
- 异步发送。
执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。
wget -q -O- "http://127.0.0.1:28082/messageSend/asyncSend?id=2&name=xiaoming&action=des&ags=3"
再观察会话1中的日志,会观察到如下的输出。
- OneWay发送。
执行如下命令, 向刚才启动的程序发送一条简单的请求,以观察响应。
wget -q -O- "http://127.0.0.1:28082/messageSend/oneWaySend?id=2&name=xiaoming&action=des&ags=3"
再观察会话1中的日志,会观察到如下的输出。
- 发送事务消息。
执行如下命令, 向刚才启动的程序发送一条简单的请求,以观察响应。
wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=1" wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=2" wget -q -O- "http://127.0.0.1:28082/messageSend/transactionMessage?id=2&name=xiaoming&action=des&arg=3"
再观察会话1中的日志,会观察到如下的输出。
- 发送同步RPC消息。
执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。
wget -q -O- "http://127.0.0.1:28082/messageSend/sendAndReceive?id=2&name=xiaoming&action=des&ags=3"
预期返回结果如下。
再观察会话1中的日志,会观察到如下的输出。
5. 细节点
本步骤将讲解rocketmq-spring的tag设置与原生不同以及关于 Message 对象的 header。
- rocketmq-spring的tag设置与原生不同。
现在把topic与tag合成了destination。destination=topic or topic":"tag。实例请看MessageSendController第152行。
- 关于 Message 对象的 header。
用于为消息提供附加信息,同时降低了消息体反序列化带来的额外开销。
- RocketMQ默认请求头。
请求头 |
作用 |
类型 |
RocketMQHeaders.KEYS |
消息key,为发送消息设定keys |
string |
6. 生产者
本步骤将讲解生产者的创捷和各属性信息。
- 创建生产者。
- 默认生产者创建如下。
- 自定义生产者创建如下。
- 继承RocketMQTemplate对象。
- 使用ExtRocketMQTemplateConfiguration 注解。
配置名 |
作用 |
类型 |
默认值 |
是否必须 |
存在配置文件 |
默认配置名 |
存在注解 |
nameServer |
注册服务地址 |
String |
是 |
是 |
rocketmq.name-server |
是 |
|
group |
生产组 |
String |
是 |
是 |
rocketmq.producer.group |
是 |
|
sendMessageTimeout |
发送超时 |
int |
3000 |
否 |
是 |
rocketmq.producer.send-message-timeout |
是 |
compressMessageBodyThreshold |
数据压缩限制 |
int |
1024*4 |
否 |
是 |
rocketmq.producer.compress-message-body-threshold |
|
retryTimesWhenSendFailed |
同步发送失败重试次数 |
int |
2 |
否 |
是 |
rocketmq.producer.retry-times-when-send-failed |
是 |
retryTimesWhenSendAsyncFailed |
异步发送失败重试次数 |
int |
2 |
否 |
是 |
rocketmq.producer.retry-times-when-send-async-fsailed |
|
retryNextServer |
内部发送失败时重试另一个代理 |
boolean |
false |
否 |
是 |
rocketmq.producer.retry-next-server |
是 |
maxMessageSize |
消息最大字节数 |
int |
4M |
否 |
是 |
rocketmq.producer.max-message-size |
是 |
enableMsgTrace |
开启消息轨迹 |
boolean |
true |
否 |
是 |
rocketmq.producer.enable-msg-trace |
是 |
customizedTraceTopic |
消息轨迹topic |
String |
RMQSYSTRACE_TOPIC |
否 |
是 |
rocketmq.producer.customized-trace-topic |
是 |
accessKey |
acl需要的ak |
String |
否 |
是 |
rocketmq.producer.access-key |
是 |
|
secretKey |
acl需要的sk |
String |
- 配置属性信息。
- 声明自定义消费者的全局变量,并使用@Autowired注解。
- 消息生产。
- 发送形参说明如下。
名 |
类型 |
说明 |
destination |
String |
消息topic或者topic与tag组合 |
message |
Message |
发送对象 |
payload |
Object |
发送对象 |
timeout |
long |
发送超时时长,如果没有,使用全局配置或者默认配置 |
delayLevel |
int |
数据压缩级别 |
messages |
发送集合消息 |
集合内部数据只能是message或者子类 |
hashKey |
String |
循序消费的消息需要, 用于确定消息的队列 |
sendCallback |
SendCallback |
异步发送回调对象 |
rocketMQLocalRequestCallback |
RocketMQLocalRequestCallback |
异步RPC回调对象 |
type |
Type |
同步RPC消息返回对象 |
- 消息类型解读。
消息类型 |
出处 |
是否返回SendResult |
优势 |
劣势 |
Object |
rocketmq-spring |
是 |
简单,方便 |
扩展度低 |
message |
rocketmq-spring |
是 |
可以扩展 |
相对object使用复杂 |
convertAndSend |
spring-cloud-steam |
否 |
spring标准 |
说明 三种消息都是使用convert进行数据序列化。本质上一样,不建议使用convertAndSend方式。
- 生产方式。
发送方式 |
循序消息 |
RPC消息 |
批量消息 |
事务消息 |
同步 |
syncSendOrderly |
支持(sendAndReceive) |
支持 |
sendMessageInTransaction |
异步 |
asyncSendOrderly |
支持(sendAndReceive) |
不支持 |
不支持 |
OneWay |
sendOneWayOrderly |
不支持 |
不支持 |
不支持 |
- 事务消息。
- 实现 RocketMQLocalTransactionListener 接口。
- 配置 @RocketMQTransactionListener 注解。
- rocketMQTemplateBeanName 拦截的 RocketMQTemplate 的事务,值对应ExtRocketMQTemplateConfiguration.value。
- 实现接口方法。
实验链接:https://developer.aliyun.com/adc/scenario/470077b9392a482b91fa1a2bf4c423fe