Spring-cloud-stream-binder-rocketmq入门与实践

简介: 本场景带您体验如何在 Spring 生态中优雅地使用 Apache RocketMQ,感受最受欢迎业务开发框架与最受欢迎消息平台结合的魅力。

Spring-cloud-stream-binder-rocketmq入门与实践


1. 创建资源

开始实验之前,您需要先创建ECS实例资源。

  1. 在实验室页面,单击创建资源
  2. (可选)在实验室页面左侧导航栏中,单击云产品资源列表,可查看本次实验资源相关信息(例如IP地址、用户信息等)。

说明 资源创建过程需要1~3分钟。

2. 启动名称服务器和代理

  1. 本步骤指导您如何在已搭建好RocketMQ实例中启动名称服务器和代理。
  2. 执行如下命令,切换目录至rocketmq-4.9.3/bin下。
cd /root/rocketmq-4.9.3/bin
  1. 执行如下命令,启动服务器。

说明 按CTRL+C可结束当前进程。

nohup sh mqnamesrv &

tail -f ~/logs/rocketmqlogs/namesrv.log

若启动代理失败,解决方案如下。

  1. 执行如下命令,查看 mqnamesrv 进程。
ps -ef |grep mqnamesrv

  1. 执行如下命令,清除掉该进程(该命令进程编号以上图示例)。
kill -9 6987
  1. 执行如下命令,清除Java进程。
sh mqshutdown namesrv
sh mqshutdown broker

  1. 执行如下命令,启动代理。

说明 按CTRL+C可结束当前进程。

nohup sh mqbroker -n localhost:9876 &

tail -f ~/logs/rocketmqlogs/broker.log

  1. 执行如下命令,创建topic。
cd /root/rocketmq-4.9.3/bin
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendDataType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendOperationType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t syncSendOrderly
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendAndReceive
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t broadcastConsumer
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t transactionMessage

3. 启动 java 工程

本步骤指导您如何启动java工程。

  1. 执行如下命令,启动Java工程。

说明 首次运行可能要花费3分钟左右的编译时间。

cd /root/rocketmq-handson-apply
mvn install -Dmaven.test.skip
export local=$RANDOM
java -jar  rocketmq-spring-cloud-stream/target/rocketmq-spring-cloud-stream-1.0.0.jar

启动成功,返回信息如下。

  1. 单击当前页面右上角,再开启一个会话窗口,进行响应测试。

4. 访问并测试执行效果

本步骤指导您如何进行测试,并查看其相应效果。

  1. 测试同步消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "127.0.0.1:28082/messageSend/sendSync?id=1&name=xiaoming&action=go"

再观察会话1中的日志,会观察到如下的输出。

  1. 测试异步消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "127.0.0.1:28082/messageSend/sendAsync?id=2&name=xiaohua&action=to"

再观察会话1中的日志,会观察到如下的输出。

  1. 测试事务消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

  • ags =1 是回调提交
  • ags =2 是回滚事务消息
  • ags =3 是直接提交
wget -q -O- "127.0.0.1:28082/messageSend/sendTransactional?id=1&name=xiaoming&action=go&ags=1"
wget -q -O- "127.0.0.1:28082/messageSend/sendTransactional?id=1&name=xiaoming&action=go&ags=2"
wget -q -O- "127.0.0.1:28082/sendTransactional?id=1&name=xiaoming&action=go&ags=3"

再观察会话1中的日志,会观察到如下的输出。

5. 开发流程

本步骤讲述了核心配置。

  1. maven依赖。

  1. 配置讲解。
${固定前缀}.${识别标记}.${consumer or producer or null}.${变量}
  1. 固定标记。

spring固定前缀是:spring.cloud.stream.bindings。

RocketMQ固定前最是: spring.cloud.stream.rocketmq。

  1. 识别标记。

识别标记是配置中最要的,配置里面同样识别为一组。这组数据为一组配置,实例化一个生产者或者消费者。比如识别标记为 topic。注解Output("topic"),Input("topic"),StreamListener("topic"),就会识别对应的标记。

  1. spring-bind配置。

配置名

类型

默认值

说明

destination

String

无(必须)

topic名

content-type

String

无(必须)

数据序列化方式。如果传递是对象建议使用application/json。如果传递是string建议使用text/plain

配置演示,如下所示。

6. 生产者

本步骤将描述生产者主要配置、发送配置、发送流程演示。

  1. 配置说明。
  1. 主要配置。

配置名

类型

默认值

说明

group

String

无(必须)

消费组

maxMessageSize

int

1024 * 1024 * 4

消息最大大小

transactional

boolean

false

是否是事务消息

sync

boolean

false

是否以同步方式发送

vipChannelEnabled

boolean

true

是否使用vip channel

sendMessageTimeout

int

3000

发送消息超时时间

compressMessageBodyThreshold

int

1024 * 4

消息压缩门限值, 超过该值会自动压缩

retryTimesWhenSendFailed

int

2

同步发送消息失败前内部重试发送的最大次数

retryTimesWhenSendAsyncFailed

int

2

异步发送消息失败前内部重试发送的最大次数

关于output1详细讲解下。

  1. 发送配置。

在message构建时写入到setHeader里面,如下所示。

配置名

类型

默认值

说明

MessageConst.PROPERTY_TAGS

String

标签

MessageConst.PROPERTYDELAYTIME_LEVEL

int

0

延迟消费

BinderHeaders.PARTITION_HEADER

boolean

false

是否为分区顺序消息

RocketMQBinderConstants.ROCKETTRANSACTIONALARG

Object

发送事务消息时用于传入自定义参数

  1. 发送流程演示。
  1. 第一步:配置RocketMQ-binder 核心配置。

  1. 第二步:配置发送对象。

  1. 第三步:创建发送对象。

另外一种方式:

对发送对象进行封装,优势是代码简单,有模块层次感。如果方法多个地方调用可以提高代码可维护性。

细节:

  • 在封装类上必须标记@Component注解或者子注解。
  • 引入有Output注解的类。

  1. 第四步:配置发送对象。

建议在spring-boot的启动类上使用EnableBinding注解。

  1. 第五步:调用发送对象。

  1. 消息事务。

7. 消费者

本步骤将描述消费者主要配置、消费流程演示。

  1. 配置说明。
  1. 主要配置。

配置名

类型

默认值

说明

orderly

boolean

false

是否为顺序消费

broadcasting

boolean

false

是否使用广播模式

sql

String

null

SelectorType 模式为 sql

tags

String

null

SelectorType 模式为 tag

  1. Spring-bind配置。

配置名

类型

默认值

说明

group

String

无(必须)

消费组

concurrency

int

1

消费线程数

max-attempts

int

3

消费失败,spring重试次数。不建议使用, rocketmq默认有重试机制

back-off-initial-interval

int

100(毫秒)

第一次重试间隔

back-off-multiplier

int

2

重试时间因子。重试时间=back-off-initial-interval*back-off-multiplier

  1. 消费流程演示。
  1. 第一步:配置RocketMQ-binder 核心配置。

  1. 第二步:配置发送对象。

  1. 第三步:创建消费对象。

创建消费接口:必须创建消费接口,@StreamListener只读取@Input的配置,比如@StreamListener("xxxx") 里面的xxxx不存在Input里面,直接无效。

建议命名方式 {业务作用}StreamBindingConsumer。

Push模式:在方法上使用StreamListener注解表示消费。

建议命名方式 {业务作用}PushModeConsumer。

  1. MessageSource 支持。
  1. 创建pull对象。
  • 返回对象一定是PollableMessageSource。
  • 返回对象一定是PollableMessageSource。
  • 返回对象一定是PollableMessageSource。
  • 建议一个线程执行。
  • 建议在无限循环(while(true) or for(;;))调用pull方法。
  • 建议命名方式 {业务作用}PushModeConsumer。

实验地址:https://developer.aliyun.com/adc/scenario/5d7eb51b5b2348f2a30197ce7bc3ae4b

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8天前
|
XML Java UED
使用 Spring Boot 实现重试和补偿功能:从理论到实践
【6月更文挑战第17天】在分布式系统中,服务之间的调用可能会因为网络故障、服务器负载等原因偶尔失败。为了提高系统的可靠性和稳定性,我们经常需要实现重试和补偿功能。
33 6
|
6天前
|
XML 缓存 Java
Spring Boot 优雅实现降级功能:Hystrix 与 Resilience4j 的实践
【6月更文挑战第19天】在分布式系统中,服务降级是一种重要的容错机制。当某个服务不可用或响应慢时,降级机制可以保证系统的整体稳定性。本文将详细介绍如何在 Spring Boot 中使用 Hystrix 和 Resilience4j 实现降级功能。
31 7
|
7天前
|
NoSQL 算法 Java
使用 Spring Boot 实现限流功能:从理论到实践
【6月更文挑战第18天】在微服务和高并发系统中,限流(Rate Limiting)是一种非常重要的技术手段,用于保护系统免受过载,确保服务的稳定性。限流可以控制请求的速率,防止单个客户端或恶意用户消耗过多的资源,从而影响其他用户。
16 5
|
17天前
|
XML 存储 Java
Spring 6(一)【Spring 入门】
Spring 6(一)【Spring 入门】
|
19天前
|
Java 数据库连接 数据库
Spring日志完结篇,MyBatis操作数据库(入门)
Spring日志完结篇,MyBatis操作数据库(入门)
|
20天前
|
存储 自动驾驶 Java
Spring IoC&DI(1)—入门
Spring IoC&DI(1)—入门
15 1
|
20天前
|
JSON 前端开发 Java
Spring Web MVC入门(3)——响应
Spring Web MVC入门(3)——响应
26 1
|
14小时前
|
消息中间件 存储 Kafka
01.RabbitMQ入门
01.RabbitMQ入门
|
4天前
|
Java 程序员 开发工具
Spring - SpringBoot入门之环境搭建
Spring - SpringBoot入门之环境搭建
8 0
|
14天前
|
消息中间件 存储 前端开发
RabbitMQ在Java中的完美实现:从入门到精通
本文由木头左介绍如何在Java项目中使用RabbitMQ。RabbitMQ是开源的AMQP实现,支持多种客户端,适合分布式系统中的消息传递。首先需安装Erlang和RabbitMQ,接着在Java项目中添加RabbitMQ客户端库依赖。通过创建连接工厂和连接,建立与RabbitMQ的通信,并展示了创建连接和通道的代码示例。

热门文章

最新文章

相关产品

  • 云消息队列 MQ