Spring-cloud-stream-binder-rocketmq入门与实践

简介: 本场景带您体验如何在 Spring 生态中优雅地使用 Apache RocketMQ,感受最受欢迎业务开发框架与最受欢迎消息平台结合的魅力。

Spring-cloud-stream-binder-rocketmq入门与实践


1. 创建资源

开始实验之前,您需要先创建ECS实例资源。

  1. 在实验室页面,单击创建资源
  2. (可选)在实验室页面左侧导航栏中,单击云产品资源列表,可查看本次实验资源相关信息(例如IP地址、用户信息等)。

说明 资源创建过程需要1~3分钟。

2. 启动名称服务器和代理

  1. 本步骤指导您如何在已搭建好RocketMQ实例中启动名称服务器和代理。
  2. 执行如下命令,切换目录至rocketmq-4.9.3/bin下。
cd /root/rocketmq-4.9.3/bin
  1. 执行如下命令,启动服务器。

说明 按CTRL+C可结束当前进程。

nohup sh mqnamesrv &

tail -f ~/logs/rocketmqlogs/namesrv.log

若启动代理失败,解决方案如下。

  1. 执行如下命令,查看 mqnamesrv 进程。
ps -ef |grep mqnamesrv

  1. 执行如下命令,清除掉该进程(该命令进程编号以上图示例)。
kill -9 6987
  1. 执行如下命令,清除Java进程。
sh mqshutdown namesrv
sh mqshutdown broker

  1. 执行如下命令,启动代理。

说明 按CTRL+C可结束当前进程。

nohup sh mqbroker -n localhost:9876 &

tail -f ~/logs/rocketmqlogs/broker.log

  1. 执行如下命令,创建topic。
cd /root/rocketmq-4.9.3/bin
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendDataType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendOperationType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t syncSendOrderly
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendAndReceive
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t broadcastConsumer
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t transactionMessage

3. 启动 java 工程

本步骤指导您如何启动java工程。

  1. 执行如下命令,启动Java工程。

说明 首次运行可能要花费3分钟左右的编译时间。

cd /root/rocketmq-handson-apply
mvn install -Dmaven.test.skip
export local=$RANDOM
java -jar  rocketmq-spring-cloud-stream/target/rocketmq-spring-cloud-stream-1.0.0.jar

启动成功,返回信息如下。

  1. 单击当前页面右上角,再开启一个会话窗口,进行响应测试。

4. 访问并测试执行效果

本步骤指导您如何进行测试,并查看其相应效果。

  1. 测试同步消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "127.0.0.1:28082/messageSend/sendSync?id=1&name=xiaoming&action=go"

再观察会话1中的日志,会观察到如下的输出。

  1. 测试异步消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "127.0.0.1:28082/messageSend/sendAsync?id=2&name=xiaohua&action=to"

再观察会话1中的日志,会观察到如下的输出。

  1. 测试事务消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

  • ags =1 是回调提交
  • ags =2 是回滚事务消息
  • ags =3 是直接提交
wget -q -O- "127.0.0.1:28082/messageSend/sendTransactional?id=1&name=xiaoming&action=go&ags=1"
wget -q -O- "127.0.0.1:28082/messageSend/sendTransactional?id=1&name=xiaoming&action=go&ags=2"
wget -q -O- "127.0.0.1:28082/sendTransactional?id=1&name=xiaoming&action=go&ags=3"

再观察会话1中的日志,会观察到如下的输出。

5. 开发流程

本步骤讲述了核心配置。

  1. maven依赖。

  1. 配置讲解。
${固定前缀}.${识别标记}.${consumer or producer or null}.${变量}
  1. 固定标记。

spring固定前缀是:spring.cloud.stream.bindings。

RocketMQ固定前最是: spring.cloud.stream.rocketmq。

  1. 识别标记。

识别标记是配置中最要的,配置里面同样识别为一组。这组数据为一组配置,实例化一个生产者或者消费者。比如识别标记为 topic。注解Output("topic"),Input("topic"),StreamListener("topic"),就会识别对应的标记。

  1. spring-bind配置。

配置名

类型

默认值

说明

destination

String

无(必须)

topic名

content-type

String

无(必须)

数据序列化方式。如果传递是对象建议使用application/json。如果传递是string建议使用text/plain

配置演示,如下所示。

6. 生产者

本步骤将描述生产者主要配置、发送配置、发送流程演示。

  1. 配置说明。
  1. 主要配置。

配置名

类型

默认值

说明

group

String

无(必须)

消费组

maxMessageSize

int

1024 * 1024 * 4

消息最大大小

transactional

boolean

false

是否是事务消息

sync

boolean

false

是否以同步方式发送

vipChannelEnabled

boolean

true

是否使用vip channel

sendMessageTimeout

int

3000

发送消息超时时间

compressMessageBodyThreshold

int

1024 * 4

消息压缩门限值, 超过该值会自动压缩

retryTimesWhenSendFailed

int

2

同步发送消息失败前内部重试发送的最大次数

retryTimesWhenSendAsyncFailed

int

2

异步发送消息失败前内部重试发送的最大次数

关于output1详细讲解下。

  1. 发送配置。

在message构建时写入到setHeader里面,如下所示。

配置名

类型

默认值

说明

MessageConst.PROPERTY_TAGS

String

标签

MessageConst.PROPERTYDELAYTIME_LEVEL

int

0

延迟消费

BinderHeaders.PARTITION_HEADER

boolean

false

是否为分区顺序消息

RocketMQBinderConstants.ROCKETTRANSACTIONALARG

Object

发送事务消息时用于传入自定义参数

  1. 发送流程演示。
  1. 第一步:配置RocketMQ-binder 核心配置。

  1. 第二步:配置发送对象。

  1. 第三步:创建发送对象。

另外一种方式:

对发送对象进行封装,优势是代码简单,有模块层次感。如果方法多个地方调用可以提高代码可维护性。

细节:

  • 在封装类上必须标记@Component注解或者子注解。
  • 引入有Output注解的类。

  1. 第四步:配置发送对象。

建议在spring-boot的启动类上使用EnableBinding注解。

  1. 第五步:调用发送对象。

  1. 消息事务。

7. 消费者

本步骤将描述消费者主要配置、消费流程演示。

  1. 配置说明。
  1. 主要配置。

配置名

类型

默认值

说明

orderly

boolean

false

是否为顺序消费

broadcasting

boolean

false

是否使用广播模式

sql

String

null

SelectorType 模式为 sql

tags

String

null

SelectorType 模式为 tag

  1. Spring-bind配置。

配置名

类型

默认值

说明

group

String

无(必须)

消费组

concurrency

int

1

消费线程数

max-attempts

int

3

消费失败,spring重试次数。不建议使用, rocketmq默认有重试机制

back-off-initial-interval

int

100(毫秒)

第一次重试间隔

back-off-multiplier

int

2

重试时间因子。重试时间=back-off-initial-interval*back-off-multiplier

  1. 消费流程演示。
  1. 第一步:配置RocketMQ-binder 核心配置。

  1. 第二步:配置发送对象。

  1. 第三步:创建消费对象。

创建消费接口:必须创建消费接口,@StreamListener只读取@Input的配置,比如@StreamListener("xxxx") 里面的xxxx不存在Input里面,直接无效。

建议命名方式 {业务作用}StreamBindingConsumer。

Push模式:在方法上使用StreamListener注解表示消费。

建议命名方式 {业务作用}PushModeConsumer。

  1. MessageSource 支持。
  1. 创建pull对象。
  • 返回对象一定是PollableMessageSource。
  • 返回对象一定是PollableMessageSource。
  • 返回对象一定是PollableMessageSource。
  • 建议一个线程执行。
  • 建议在无限循环(while(true) or for(;;))调用pull方法。
  • 建议命名方式 {业务作用}PushModeConsumer。

实验地址:https://developer.aliyun.com/adc/scenario/5d7eb51b5b2348f2a30197ce7bc3ae4b

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
缓存 Java API
【云原生】Spring Cloud Gateway的底层原理与实践方法探究
【云原生】Spring Cloud Gateway的底层原理与实践方法探究
|
2月前
|
存储 安全 Java
SpringBoot搭建Spring Security 入门
SpringBoot搭建Spring Security 入门
114 0
|
17天前
|
前端开发 Java 数据库连接
Spring系列文章1:Spring入门程序
Spring系列文章1:Spring入门程序
|
3天前
|
Java Nacos 开发者
Java从入门到精通:4.2.1学习新技术与框架——以Spring Boot和Spring Cloud Alibaba为例
Java从入门到精通:4.2.1学习新技术与框架——以Spring Boot和Spring Cloud Alibaba为例
|
3天前
|
Dubbo Java 应用服务中间件
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
|
8天前
|
消息中间件 监控 负载均衡
RocketMQ实践问题收集
RocketMQ实践问题收集
|
11天前
|
消息中间件 供应链 Java
RabbitMQ入门指南(九):消费者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。
28 0
RabbitMQ入门指南(九):消费者可靠性
|
11天前
|
消息中间件 存储 Java
RabbitMQ入门指南(八):MQ可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了MQ数据持久化、LazyQueue模式、管理控制台配置Lazy模式、代码配置Lazy模式、更新已有队列为lazy模式等内容。
39 0
|
11天前
|
消息中间件 微服务
RabbitMQ入门指南(四):交换机与案例解析
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了交换机在RabbitMQ中的作用与类型、交换机案例(Fanout交换机、Direct交换机、Topic交换机)等内容。
22 0
|
11天前
|
消息中间件 存储 数据库
RabbitMQ入门指南(二):架构和管理控制台的使用
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了RabbitMQ架构和管理控制台的使用等内容。
33 0
RabbitMQ入门指南(二):架构和管理控制台的使用

相关产品

  • 云消息队列 MQ