Spring-cloud-stream-binder-rocketmq入门与实践

简介: 本场景带您体验如何在 Spring 生态中优雅地使用 Apache RocketMQ,感受最受欢迎业务开发框架与最受欢迎消息平台结合的魅力。

Spring-cloud-stream-binder-rocketmq入门与实践


1. 创建资源

开始实验之前,您需要先创建ECS实例资源。

  1. 在实验室页面,单击创建资源
  2. (可选)在实验室页面左侧导航栏中,单击云产品资源列表,可查看本次实验资源相关信息(例如IP地址、用户信息等)。

说明 资源创建过程需要1~3分钟。

2. 启动名称服务器和代理

  1. 本步骤指导您如何在已搭建好RocketMQ实例中启动名称服务器和代理。
  2. 执行如下命令,切换目录至rocketmq-4.9.3/bin下。
cd /root/rocketmq-4.9.3/bin
  1. 执行如下命令,启动服务器。

说明 按CTRL+C可结束当前进程。

nohup sh mqnamesrv &

tail -f ~/logs/rocketmqlogs/namesrv.log

若启动代理失败,解决方案如下。

  1. 执行如下命令,查看 mqnamesrv 进程。
ps -ef |grep mqnamesrv

  1. 执行如下命令,清除掉该进程(该命令进程编号以上图示例)。
kill -9 6987
  1. 执行如下命令,清除Java进程。
sh mqshutdown namesrv
sh mqshutdown broker

  1. 执行如下命令,启动代理。

说明 按CTRL+C可结束当前进程。

nohup sh mqbroker -n localhost:9876 &

tail -f ~/logs/rocketmqlogs/broker.log

  1. 执行如下命令,创建topic。
cd /root/rocketmq-4.9.3/bin
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendDataType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendOperationType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t syncSendOrderly
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendAndReceive
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t broadcastConsumer
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t transactionMessage

3. 启动 java 工程

本步骤指导您如何启动java工程。

  1. 执行如下命令,启动Java工程。

说明 首次运行可能要花费3分钟左右的编译时间。

cd /root/rocketmq-handson-apply
mvn install -Dmaven.test.skip
export local=$RANDOM
java -jar  rocketmq-spring-cloud-stream/target/rocketmq-spring-cloud-stream-1.0.0.jar

启动成功,返回信息如下。

  1. 单击当前页面右上角,再开启一个会话窗口,进行响应测试。

4. 访问并测试执行效果

本步骤指导您如何进行测试,并查看其相应效果。

  1. 测试同步消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "127.0.0.1:28082/messageSend/sendSync?id=1&name=xiaoming&action=go"

再观察会话1中的日志,会观察到如下的输出。

  1. 测试异步消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "127.0.0.1:28082/messageSend/sendAsync?id=2&name=xiaohua&action=to"

再观察会话1中的日志,会观察到如下的输出。

  1. 测试事务消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

  • ags =1 是回调提交
  • ags =2 是回滚事务消息
  • ags =3 是直接提交
wget -q -O- "127.0.0.1:28082/messageSend/sendTransactional?id=1&name=xiaoming&action=go&ags=1"
wget -q -O- "127.0.0.1:28082/messageSend/sendTransactional?id=1&name=xiaoming&action=go&ags=2"
wget -q -O- "127.0.0.1:28082/sendTransactional?id=1&name=xiaoming&action=go&ags=3"

再观察会话1中的日志,会观察到如下的输出。

5. 开发流程

本步骤讲述了核心配置。

  1. maven依赖。

  1. 配置讲解。
${固定前缀}.${识别标记}.${consumer or producer or null}.${变量}
  1. 固定标记。

spring固定前缀是:spring.cloud.stream.bindings。

RocketMQ固定前最是: spring.cloud.stream.rocketmq。

  1. 识别标记。

识别标记是配置中最要的,配置里面同样识别为一组。这组数据为一组配置,实例化一个生产者或者消费者。比如识别标记为 topic。注解Output("topic"),Input("topic"),StreamListener("topic"),就会识别对应的标记。

  1. spring-bind配置。

配置名

类型

默认值

说明

destination

String

无(必须)

topic名

content-type

String

无(必须)

数据序列化方式。如果传递是对象建议使用application/json。如果传递是string建议使用text/plain

配置演示,如下所示。

6. 生产者

本步骤将描述生产者主要配置、发送配置、发送流程演示。

  1. 配置说明。
  1. 主要配置。

配置名

类型

默认值

说明

group

String

无(必须)

消费组

maxMessageSize

int

1024 * 1024 * 4

消息最大大小

transactional

boolean

false

是否是事务消息

sync

boolean

false

是否以同步方式发送

vipChannelEnabled

boolean

true

是否使用vip channel

sendMessageTimeout

int

3000

发送消息超时时间

compressMessageBodyThreshold

int

1024 * 4

消息压缩门限值, 超过该值会自动压缩

retryTimesWhenSendFailed

int

2

同步发送消息失败前内部重试发送的最大次数

retryTimesWhenSendAsyncFailed

int

2

异步发送消息失败前内部重试发送的最大次数

关于output1详细讲解下。

  1. 发送配置。

在message构建时写入到setHeader里面,如下所示。

配置名

类型

默认值

说明

MessageConst.PROPERTY_TAGS

String

标签

MessageConst.PROPERTYDELAYTIME_LEVEL

int

0

延迟消费

BinderHeaders.PARTITION_HEADER

boolean

false

是否为分区顺序消息

RocketMQBinderConstants.ROCKETTRANSACTIONALARG

Object

发送事务消息时用于传入自定义参数

  1. 发送流程演示。
  1. 第一步:配置RocketMQ-binder 核心配置。

  1. 第二步:配置发送对象。

  1. 第三步:创建发送对象。

另外一种方式:

对发送对象进行封装,优势是代码简单,有模块层次感。如果方法多个地方调用可以提高代码可维护性。

细节:

  • 在封装类上必须标记@Component注解或者子注解。
  • 引入有Output注解的类。

  1. 第四步:配置发送对象。

建议在spring-boot的启动类上使用EnableBinding注解。

  1. 第五步:调用发送对象。

  1. 消息事务。

7. 消费者

本步骤将描述消费者主要配置、消费流程演示。

  1. 配置说明。
  1. 主要配置。

配置名

类型

默认值

说明

orderly

boolean

false

是否为顺序消费

broadcasting

boolean

false

是否使用广播模式

sql

String

null

SelectorType 模式为 sql

tags

String

null

SelectorType 模式为 tag

  1. Spring-bind配置。

配置名

类型

默认值

说明

group

String

无(必须)

消费组

concurrency

int

1

消费线程数

max-attempts

int

3

消费失败,spring重试次数。不建议使用, rocketmq默认有重试机制

back-off-initial-interval

int

100(毫秒)

第一次重试间隔

back-off-multiplier

int

2

重试时间因子。重试时间=back-off-initial-interval*back-off-multiplier

  1. 消费流程演示。
  1. 第一步:配置RocketMQ-binder 核心配置。

  1. 第二步:配置发送对象。

  1. 第三步:创建消费对象。

创建消费接口:必须创建消费接口,@StreamListener只读取@Input的配置,比如@StreamListener("xxxx") 里面的xxxx不存在Input里面,直接无效。

建议命名方式 {业务作用}StreamBindingConsumer。

Push模式:在方法上使用StreamListener注解表示消费。

建议命名方式 {业务作用}PushModeConsumer。

  1. MessageSource 支持。
  1. 创建pull对象。
  • 返回对象一定是PollableMessageSource。
  • 返回对象一定是PollableMessageSource。
  • 返回对象一定是PollableMessageSource。
  • 建议一个线程执行。
  • 建议在无限循环(while(true) or for(;;))调用pull方法。
  • 建议命名方式 {业务作用}PushModeConsumer。

实验地址:https://developer.aliyun.com/adc/scenario/5d7eb51b5b2348f2a30197ce7bc3ae4b

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
21天前
|
Java 开发者 Spring
深入理解 Spring Boot 中的 @EnableAutoConfiguration 注解:概念与实践
【4月更文挑战第21天】在 Spring Boot 项目中,@EnableAutoConfiguration 注解是实现自动配置的核心,它可以根据项目的依赖和配置,自动地配置 Spring 应用程序中的 Bean
40 3
|
21天前
|
Java API 网络架构
深入理解 Spring Boot 中的 @RestController 注解:概念与实践
【4月更文挑战第20天】在现代Web开发中,创建RESTful服务已成为常态。Spring Boot通过提供@RestController注解,极大简化了REST API的开发过程。本篇博客旨在详细介绍@RestController的概念、优势以及在Spring Boot项目中的具体应用方法。
43 8
|
21天前
|
XML Java 数据格式
Spring框架入门:IoC与DI
【5月更文挑战第15天】本文介绍了Spring框架的核心特性——IoC(控制反转)和DI(依赖注入)。IoC通过将对象的创建和依赖关系管理交给容器,实现解耦。DI作为IoC的实现方式,允许外部注入依赖对象。文章讨论了过度依赖容器、配置复杂度等常见问题,并提出通过合理划分配置、使用注解简化管理等解决策略。同时,提醒开发者注意过度依赖注入和循环依赖,建议适度使用构造器注入和避免循环引用。通过代码示例展示了注解实现DI和配置类的使用。掌握IoC和DI能提升应用的灵活性和可维护性,实践中的反思和优化至关重要。
34 4
|
1天前
|
JSON 前端开发 Java
Spring Web MVC入门(3)——响应
Spring Web MVC入门(3)——响应
6 1
|
1天前
|
JSON 前端开发 Java
Spring Web MVC入门(2)——请求(上)
Spring Web MVC入门(2)——请求
8 0
|
1天前
|
设计模式 前端开发 Java
Spring Web MVC入门(1)
Spring Web MVC入门(1)
3 0
|
10天前
|
Java API 网络架构
利用Java Spring Boot构建微服务架构的实践探索
随着业务复杂性的增长和互联网技术的飞速发展,微服务架构已成为现代软件开发中不可或缺的一部分。本文旨在探讨如何利用Java Spring Boot框架构建微服务架构,包括微服务的定义、优势,以及通过实际案例展示如何设计、开发和部署微服务。我们将关注服务拆分、服务间通信、数据一致性、服务治理等核心问题,并探讨如何结合Spring Cloud生态中的组件来实现高效、可靠的微服务架构。
|
21天前
|
NoSQL Java MongoDB
【MongoDB 专栏】MongoDB 与 Spring Boot 的集成实践
【5月更文挑战第11天】本文介绍了如何将非关系型数据库MongoDB与Spring Boot框架集成,以实现高效灵活的数据管理。Spring Boot简化了Spring应用的构建和部署,MongoDB则以其对灵活数据结构的处理能力受到青睐。集成步骤包括:添加MongoDB依赖、配置连接信息、创建数据访问对象(DAO)以及进行数据操作。通过这种方式,开发者可以充分利用两者优势,应对各种数据需求。在实际应用中,结合微服务架构等技术,可以构建高性能、可扩展的系统。掌握MongoDB与Spring Boot集成对于提升开发效率和项目质量至关重要,未来有望在更多领域得到广泛应用。
【MongoDB 专栏】MongoDB 与 Spring Boot 的集成实践
|
21天前
|
安全 Java 数据库连接
在IntelliJ IDEA中通过Spring Boot集成达梦数据库:从入门到精通
在IntelliJ IDEA中通过Spring Boot集成达梦数据库:从入门到精通
|
21天前
|
消息中间件 安全 Java
探索|Spring并行初始化加速的思路和实践
作者通过看过的两篇文章发现实现Spring初始化加速的思路和方案有很多类似之处,通过本文记录一下当时的思考和实践。

热门文章

最新文章

相关产品

  • 云消息队列 MQ