Spring-cloud-stream-binder-rocketmq入门与实践

简介: 本场景带您体验如何在 Spring 生态中优雅地使用 Apache RocketMQ,感受最受欢迎业务开发框架与最受欢迎消息平台结合的魅力。

Spring-cloud-stream-binder-rocketmq入门与实践


1. 创建资源

开始实验之前,您需要先创建ECS实例资源。

  1. 在实验室页面,单击创建资源
  2. (可选)在实验室页面左侧导航栏中,单击云产品资源列表,可查看本次实验资源相关信息(例如IP地址、用户信息等)。

说明 资源创建过程需要1~3分钟。

2. 启动名称服务器和代理

  1. 本步骤指导您如何在已搭建好RocketMQ实例中启动名称服务器和代理。
  2. 执行如下命令,切换目录至rocketmq-4.9.3/bin下。
cd /root/rocketmq-4.9.3/bin
  1. 执行如下命令,启动服务器。

说明 按CTRL+C可结束当前进程。

nohup sh mqnamesrv &

tail -f ~/logs/rocketmqlogs/namesrv.log

若启动代理失败,解决方案如下。

  1. 执行如下命令,查看 mqnamesrv 进程。
ps -ef |grep mqnamesrv

  1. 执行如下命令,清除掉该进程(该命令进程编号以上图示例)。
kill -9 6987
  1. 执行如下命令,清除Java进程。
sh mqshutdown namesrv
sh mqshutdown broker

  1. 执行如下命令,启动代理。

说明 按CTRL+C可结束当前进程。

nohup sh mqbroker -n localhost:9876 &

tail -f ~/logs/rocketmqlogs/broker.log

  1. 执行如下命令,创建topic。
cd /root/rocketmq-4.9.3/bin
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendDataType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendOperationType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t syncSendOrderly
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendAndReceive
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t broadcastConsumer
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t transactionMessage

3. 启动 java 工程

本步骤指导您如何启动java工程。

  1. 执行如下命令,启动Java工程。

说明 首次运行可能要花费3分钟左右的编译时间。

cd /root/rocketmq-handson-apply
mvn install -Dmaven.test.skip
export local=$RANDOM
java -jar  rocketmq-spring-cloud-stream/target/rocketmq-spring-cloud-stream-1.0.0.jar

启动成功,返回信息如下。

  1. 单击当前页面右上角,再开启一个会话窗口,进行响应测试。

4. 访问并测试执行效果

本步骤指导您如何进行测试,并查看其相应效果。

  1. 测试同步消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "127.0.0.1:28082/messageSend/sendSync?id=1&name=xiaoming&action=go"

再观察会话1中的日志,会观察到如下的输出。

  1. 测试异步消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "127.0.0.1:28082/messageSend/sendAsync?id=2&name=xiaohua&action=to"

再观察会话1中的日志,会观察到如下的输出。

  1. 测试事务消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

  • ags =1 是回调提交
  • ags =2 是回滚事务消息
  • ags =3 是直接提交
wget -q -O- "127.0.0.1:28082/messageSend/sendTransactional?id=1&name=xiaoming&action=go&ags=1"
wget -q -O- "127.0.0.1:28082/messageSend/sendTransactional?id=1&name=xiaoming&action=go&ags=2"
wget -q -O- "127.0.0.1:28082/sendTransactional?id=1&name=xiaoming&action=go&ags=3"

再观察会话1中的日志,会观察到如下的输出。

5. 开发流程

本步骤讲述了核心配置。

  1. maven依赖。

  1. 配置讲解。
${固定前缀}.${识别标记}.${consumer or producer or null}.${变量}
  1. 固定标记。

spring固定前缀是:spring.cloud.stream.bindings。

RocketMQ固定前最是: spring.cloud.stream.rocketmq。

  1. 识别标记。

识别标记是配置中最要的,配置里面同样识别为一组。这组数据为一组配置,实例化一个生产者或者消费者。比如识别标记为 topic。注解Output("topic"),Input("topic"),StreamListener("topic"),就会识别对应的标记。

  1. spring-bind配置。

配置名

类型

默认值

说明

destination

String

无(必须)

topic名

content-type

String

无(必须)

数据序列化方式。如果传递是对象建议使用application/json。如果传递是string建议使用text/plain

配置演示,如下所示。

6. 生产者

本步骤将描述生产者主要配置、发送配置、发送流程演示。

  1. 配置说明。
  1. 主要配置。

配置名

类型

默认值

说明

group

String

无(必须)

消费组

maxMessageSize

int

1024 * 1024 * 4

消息最大大小

transactional

boolean

false

是否是事务消息

sync

boolean

false

是否以同步方式发送

vipChannelEnabled

boolean

true

是否使用vip channel

sendMessageTimeout

int

3000

发送消息超时时间

compressMessageBodyThreshold

int

1024 * 4

消息压缩门限值, 超过该值会自动压缩

retryTimesWhenSendFailed

int

2

同步发送消息失败前内部重试发送的最大次数

retryTimesWhenSendAsyncFailed

int

2

异步发送消息失败前内部重试发送的最大次数

关于output1详细讲解下。

  1. 发送配置。

在message构建时写入到setHeader里面,如下所示。

配置名

类型

默认值

说明

MessageConst.PROPERTY_TAGS

String

标签

MessageConst.PROPERTYDELAYTIME_LEVEL

int

0

延迟消费

BinderHeaders.PARTITION_HEADER

boolean

false

是否为分区顺序消息

RocketMQBinderConstants.ROCKETTRANSACTIONALARG

Object

发送事务消息时用于传入自定义参数

  1. 发送流程演示。
  1. 第一步:配置RocketMQ-binder 核心配置。

  1. 第二步:配置发送对象。

  1. 第三步:创建发送对象。

另外一种方式:

对发送对象进行封装,优势是代码简单,有模块层次感。如果方法多个地方调用可以提高代码可维护性。

细节:

  • 在封装类上必须标记@Component注解或者子注解。
  • 引入有Output注解的类。

  1. 第四步:配置发送对象。

建议在spring-boot的启动类上使用EnableBinding注解。

  1. 第五步:调用发送对象。

  1. 消息事务。

7. 消费者

本步骤将描述消费者主要配置、消费流程演示。

  1. 配置说明。
  1. 主要配置。

配置名

类型

默认值

说明

orderly

boolean

false

是否为顺序消费

broadcasting

boolean

false

是否使用广播模式

sql

String

null

SelectorType 模式为 sql

tags

String

null

SelectorType 模式为 tag

  1. Spring-bind配置。

配置名

类型

默认值

说明

group

String

无(必须)

消费组

concurrency

int

1

消费线程数

max-attempts

int

3

消费失败,spring重试次数。不建议使用, rocketmq默认有重试机制

back-off-initial-interval

int

100(毫秒)

第一次重试间隔

back-off-multiplier

int

2

重试时间因子。重试时间=back-off-initial-interval*back-off-multiplier

  1. 消费流程演示。
  1. 第一步:配置RocketMQ-binder 核心配置。

  1. 第二步:配置发送对象。

  1. 第三步:创建消费对象。

创建消费接口:必须创建消费接口,@StreamListener只读取@Input的配置,比如@StreamListener("xxxx") 里面的xxxx不存在Input里面,直接无效。

建议命名方式 {业务作用}StreamBindingConsumer。

Push模式:在方法上使用StreamListener注解表示消费。

建议命名方式 {业务作用}PushModeConsumer。

  1. MessageSource 支持。
  1. 创建pull对象。
  • 返回对象一定是PollableMessageSource。
  • 返回对象一定是PollableMessageSource。
  • 返回对象一定是PollableMessageSource。
  • 建议一个线程执行。
  • 建议在无限循环(while(true) or for(;;))调用pull方法。
  • 建议命名方式 {业务作用}PushModeConsumer。

实验地址:https://developer.aliyun.com/adc/scenario/5d7eb51b5b2348f2a30197ce7bc3ae4b

相关实践学习
5分钟轻松打造应对流量洪峰的稳定商城交易系统
本实验通过SAE极速部署一个微服务电商商城,同时结合RocketMQ异步解耦、削峰填谷的能力,带大家体验面对流量洪峰仍旧稳定可靠的商城交易系统!
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
14天前
|
人工智能 Java API
Spring AI 实战|Spring AI入门之DeepSeek调用
本文介绍了Spring AI框架如何帮助Java开发者轻松集成和使用大模型API。文章从Spring AI的初探开始,探讨了其核心能力及应用场景,包括手动与自动发起请求、流式响应实现打字机效果,以及兼容不同AI服务(如DeepSeek、通义千问)的方法。同时,还详细讲解了如何在生产环境中添加监控以优化性能和成本管理。通过Spring AI,开发者可以简化大模型调用流程,降低复杂度,为企业智能应用开发提供强大支持。最后,文章展望了Spring AI在未来AI时代的重要作用,鼓励开发者积极拥抱这一技术变革。
480 71
Spring AI 实战|Spring AI入门之DeepSeek调用
|
2月前
|
JSON 前端开发 Java
深入理解 Spring Boot 中日期时间格式化:@DateTimeFormat 与 @JsonFormat 完整实践
在 Spring Boot 开发中,日期时间格式化是前后端交互的常见痛点。本文详细解析了 **@DateTimeFormat** 和 **@JsonFormat** 两个注解的用法,分别用于将前端传入的字符串解析为 Java 时间对象,以及将时间对象序列化为指定格式返回给前端。通过完整示例代码,展示了从数据接收、业务处理到结果返回的全流程,并总结了解决时区问题和全局配置的最佳实践,助你高效处理日期时间需求。
228 0
|
2月前
|
安全 Java 数据库
Spring Security 实战指南:从入门到精通
本文详细介绍了Spring Security在Java Web项目中的应用,涵盖登录、权限控制与安全防护等功能。通过Filter Chain过滤器链实现请求拦截与认证授权,核心组件包括AuthenticationProvider和UserDetailsService,负责用户信息加载与密码验证。文章还解析了项目结构,如SecurityConfig配置类、User实体类及自定义登录逻辑,并探讨了Method-Level Security、CSRF防护、Remember-Me等进阶功能。最后总结了Spring Security的核心机制与常见配置,帮助开发者构建健壮的安全系统。
170 0
|
2月前
|
存储 Java 数据库
Spring Boot 注册登录系统:问题总结与优化实践
在Spring Boot开发中,注册登录模块常面临数据库设计、密码加密、权限配置及用户体验等问题。本文以便利店销售系统为例,详细解析四大类问题:数据库字段约束(如默认值缺失)、密码加密(明文存储风险)、Spring Security配置(路径权限不当)以及表单交互(数据丢失与提示不足)。通过优化数据库结构、引入BCrypt加密、完善安全配置和改进用户交互,提供了一套全面的解决方案,助力开发者构建更 robust 的系统。
83 0
|
6天前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
170 6
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
7天前
|
人工智能 Java 决策智能
Spring AI Alibaba Graph:多智能体框架实践
Spring AI Alibaba 是一个面向 Java 开发者的开源人工智能框架,旨在简化 AI 应用开发。本文重点介绍其 Graph 组件,用于解决工作流与多智能体协作问题。Graph 组件通过声明式编程接口,提供统一的上下文管理、消息记忆、人工确认节点等功能,支持复杂 AI 应用的构建。
|
6天前
|
Java 关系型数据库 MySQL
【Spring】【事务】初学者直呼学会了的Spring事务入门
本文深入解析了Spring事务的核心概念与使用方法。Spring事务是一种数据库事务管理机制,通过确保操作的原子性、一致性、隔离性和持久性(ACID),维护数据完整性。文章详细讲解了声明式事务(@Transactional注解)和编程式事务(TransactionTemplate、PlatformTransactionManager)的区别与用法,并探讨了事务传播行为(如REQUIRED、REQUIRES_NEW等)及隔离级别(如READ_COMMITTED、REPEATABLE_READ)。
61 1
|
2月前
|
Java 开发者 微服务
Spring Cloud OpenFeign详解与实践
总结起来说,Spring Cloud OpenFeign提供了一种简单易懂且高效的方式去实现微服务之间通信.它隐藏了许多复杂性,并且允许开发者以声明式方式编写HTTP客户端代码.如果你正在开发基于Spring Cloud 的微服务架构系统,Spring Cloud Open Feign是一个非常好用且强大工具.
170 33
|
2月前
|
JSON 前端开发 Java
深入理解 Spring Boot 中日期时间格式化:@DateTimeFormat 与 @JsonFormat 完整实践
在 Spring Boot 开发中,处理前后端日期交互是一个常见问题。本文通过 **@DateTimeFormat** 和 **@JsonFormat** 两个注解,详细讲解了如何解析前端传来的日期字符串以及以指定格式返回日期数据。文章从实际案例出发,结合代码演示两者的使用场景与注意事项,解决解析失败、时区偏差等问题,并提供全局配置与局部注解的实践经验。帮助开发者高效应对日期时间格式化需求,提升开发效率。
383 2
|
2月前
|
存储 安全 Java
Spring Security 入门与详解
Spring Security 是 Spring 框架中的核心安全模块,提供认证、授权及防护功能。本文详解其核心概念,包括认证(Authentication)、授权(Authorization)和过滤器链(Security Filter Chain)。同时,通过代码示例介绍基本配置,如 PasswordEncoder、UserDetailsService 和自定义登录页面等。最后总结常见问题与解决方法,助你快速掌握 Spring Security 的使用与优化。
271 0

相关产品

  • 云消息队列 MQ