Spring-cloud-stream-binder-rocketmq入门与实践
1. 创建资源
开始实验之前,您需要先创建ECS实例资源。
- 在实验室页面,单击创建资源。
- (可选)在实验室页面左侧导航栏中,单击云产品资源列表,可查看本次实验资源相关信息(例如IP地址、用户信息等)。
说明 资源创建过程需要1~3分钟。
2. 启动名称服务器和代理
- 本步骤指导您如何在已搭建好RocketMQ实例中启动名称服务器和代理。
- 执行如下命令,切换目录至rocketmq-4.9.3/bin下。
cd /root/rocketmq-4.9.3/bin
- 执行如下命令,启动服务器。
说明 按CTRL+C可结束当前进程。
nohup sh mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
若启动代理失败,解决方案如下。
- 执行如下命令,查看 mqnamesrv 进程。
ps -ef |grep mqnamesrv
- 执行如下命令,清除掉该进程(该命令进程编号以上图示例)。
kill -9 6987
- 执行如下命令,清除Java进程。
sh mqshutdown namesrv sh mqshutdown broker
- 执行如下命令,启动代理。
说明 按CTRL+C可结束当前进程。
nohup sh mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
- 执行如下命令,创建topic。
cd /root/rocketmq-4.9.3/bin ./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendDataType ./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendOperationType ./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t syncSendOrderly ./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendAndReceive ./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t broadcastConsumer ./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t transactionMessage
3. 启动 java 工程
本步骤指导您如何启动java工程。
- 执行如下命令,启动Java工程。
说明 首次运行可能要花费3分钟左右的编译时间。
cd /root/rocketmq-handson-apply mvn install -Dmaven.test.skip export local=$RANDOM java -jar rocketmq-spring-cloud-stream/target/rocketmq-spring-cloud-stream-1.0.0.jar
启动成功,返回信息如下。
- 单击当前页面右上角,再开启一个会话窗口,进行响应测试。
4. 访问并测试执行效果
本步骤指导您如何进行测试,并查看其相应效果。
- 测试同步消息。
执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。
wget -q -O- "127.0.0.1:28082/messageSend/sendSync?id=1&name=xiaoming&action=go"
再观察会话1中的日志,会观察到如下的输出。
- 测试异步消息。
执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。
wget -q -O- "127.0.0.1:28082/messageSend/sendAsync?id=2&name=xiaohua&action=to"
再观察会话1中的日志,会观察到如下的输出。
- 测试事务消息。
执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。
- ags =1 是回调提交
- ags =2 是回滚事务消息
- ags =3 是直接提交
wget -q -O- "127.0.0.1:28082/messageSend/sendTransactional?id=1&name=xiaoming&action=go&ags=1" wget -q -O- "127.0.0.1:28082/messageSend/sendTransactional?id=1&name=xiaoming&action=go&ags=2" wget -q -O- "127.0.0.1:28082/sendTransactional?id=1&name=xiaoming&action=go&ags=3"
再观察会话1中的日志,会观察到如下的输出。
5. 开发流程
本步骤讲述了核心配置。
- maven依赖。
- 配置讲解。
${固定前缀}.${识别标记}.${consumer or producer or null}.${变量}
- 固定标记。
spring固定前缀是:spring.cloud.stream.bindings。
RocketMQ固定前最是: spring.cloud.stream.rocketmq。
- 识别标记。
识别标记是配置中最要的,配置里面同样识别为一组。这组数据为一组配置,实例化一个生产者或者消费者。比如识别标记为 topic。注解Output("topic"),Input("topic"),StreamListener("topic"),就会识别对应的标记。
- spring-bind配置。
配置名 |
类型 |
默认值 |
说明 |
destination |
String |
无(必须) |
topic名 |
content-type |
String |
无(必须) |
数据序列化方式。如果传递是对象建议使用application/json。如果传递是string建议使用text/plain |
配置演示,如下所示。
6. 生产者
本步骤将描述生产者主要配置、发送配置、发送流程演示。
- 配置说明。
- 主要配置。
配置名 |
类型 |
默认值 |
说明 |
group |
String |
无(必须) |
消费组 |
maxMessageSize |
int |
1024 * 1024 * 4 |
消息最大大小 |
transactional |
boolean |
false |
是否是事务消息 |
sync |
boolean |
false |
是否以同步方式发送 |
vipChannelEnabled |
boolean |
true |
是否使用vip channel |
sendMessageTimeout |
int |
3000 |
发送消息超时时间 |
compressMessageBodyThreshold |
int |
1024 * 4 |
消息压缩门限值, 超过该值会自动压缩 |
retryTimesWhenSendFailed |
int |
2 |
同步发送消息失败前内部重试发送的最大次数 |
retryTimesWhenSendAsyncFailed |
int |
2 |
异步发送消息失败前内部重试发送的最大次数 |
关于output1详细讲解下。
- 发送配置。
在message构建时写入到setHeader里面,如下所示。
配置名 |
类型 |
默认值 |
说明 |
MessageConst.PROPERTY_TAGS |
String |
无 |
标签 |
MessageConst.PROPERTYDELAYTIME_LEVEL |
int |
0 |
延迟消费 |
BinderHeaders.PARTITION_HEADER |
boolean |
false |
是否为分区顺序消息 |
RocketMQBinderConstants.ROCKETTRANSACTIONALARG |
Object |
无 |
发送事务消息时用于传入自定义参数 |
- 发送流程演示。
- 第一步:配置RocketMQ-binder 核心配置。
- 第二步:配置发送对象。
- 第三步:创建发送对象。
另外一种方式:
对发送对象进行封装,优势是代码简单,有模块层次感。如果方法多个地方调用可以提高代码可维护性。
细节:
- 在封装类上必须标记@Component注解或者子注解。
- 引入有Output注解的类。
- 第四步:配置发送对象。
建议在spring-boot的启动类上使用EnableBinding注解。
- 第五步:调用发送对象。
- 消息事务。
7. 消费者
本步骤将描述消费者主要配置、消费流程演示。
- 配置说明。
- 主要配置。
配置名 |
类型 |
默认值 |
说明 |
orderly |
boolean |
false |
是否为顺序消费 |
broadcasting |
boolean |
false |
是否使用广播模式 |
sql |
String |
null |
SelectorType 模式为 sql |
tags |
String |
null |
SelectorType 模式为 tag |
- Spring-bind配置。
配置名 |
类型 |
默认值 |
说明 |
group |
String |
无(必须) |
消费组 |
concurrency |
int |
1 |
消费线程数 |
max-attempts |
int |
3 |
消费失败,spring重试次数。不建议使用, rocketmq默认有重试机制 |
back-off-initial-interval |
int |
100(毫秒) |
第一次重试间隔 |
back-off-multiplier |
int |
2 |
重试时间因子。重试时间=back-off-initial-interval*back-off-multiplier |
- 消费流程演示。
- 第一步:配置RocketMQ-binder 核心配置。
- 第二步:配置发送对象。
- 第三步:创建消费对象。
创建消费接口:必须创建消费接口,@StreamListener只读取@Input的配置,比如@StreamListener("xxxx") 里面的xxxx不存在Input里面,直接无效。
建议命名方式 {业务作用}StreamBindingConsumer。
Push模式:在方法上使用StreamListener注解表示消费。
建议命名方式 {业务作用}PushModeConsumer。
- MessageSource 支持。
- 创建pull对象。
- 返回对象一定是PollableMessageSource。
- 返回对象一定是PollableMessageSource。
- 返回对象一定是PollableMessageSource。
- 建议一个线程执行。
- 建议在无限循环(while(true) or for(;;))调用pull方法。
- 建议命名方式 {业务作用}PushModeConsumer。
实验地址:https://developer.aliyun.com/adc/scenario/5d7eb51b5b2348f2a30197ce7bc3ae4b