Spring-cloud-stream-binder-rocketmq入门与实践

简介: 本场景带您体验如何在 Spring 生态中优雅地使用 Apache RocketMQ,感受最受欢迎业务开发框架与最受欢迎消息平台结合的魅力。

Spring-cloud-stream-binder-rocketmq入门与实践


1. 创建资源

开始实验之前,您需要先创建ECS实例资源。

  1. 在实验室页面,单击创建资源
  2. (可选)在实验室页面左侧导航栏中,单击云产品资源列表,可查看本次实验资源相关信息(例如IP地址、用户信息等)。

说明 资源创建过程需要1~3分钟。

2. 启动名称服务器和代理

  1. 本步骤指导您如何在已搭建好RocketMQ实例中启动名称服务器和代理。
  2. 执行如下命令,切换目录至rocketmq-4.9.3/bin下。
cd /root/rocketmq-4.9.3/bin
  1. 执行如下命令,启动服务器。

说明 按CTRL+C可结束当前进程。

nohup sh mqnamesrv &

tail -f ~/logs/rocketmqlogs/namesrv.log

若启动代理失败,解决方案如下。

  1. 执行如下命令,查看 mqnamesrv 进程。
ps -ef |grep mqnamesrv

  1. 执行如下命令,清除掉该进程(该命令进程编号以上图示例)。
kill -9 6987
  1. 执行如下命令,清除Java进程。
sh mqshutdown namesrv
sh mqshutdown broker

  1. 执行如下命令,启动代理。

说明 按CTRL+C可结束当前进程。

nohup sh mqbroker -n localhost:9876 &

tail -f ~/logs/rocketmqlogs/broker.log

  1. 执行如下命令,创建topic。
cd /root/rocketmq-4.9.3/bin
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendDataType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendOperationType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t syncSendOrderly
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendAndReceive
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t broadcastConsumer
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t transactionMessage

3. 启动 java 工程

本步骤指导您如何启动java工程。

  1. 执行如下命令,启动Java工程。

说明 首次运行可能要花费3分钟左右的编译时间。

cd /root/rocketmq-handson-apply
mvn install -Dmaven.test.skip
export local=$RANDOM
java -jar  rocketmq-spring-cloud-stream/target/rocketmq-spring-cloud-stream-1.0.0.jar

启动成功,返回信息如下。

  1. 单击当前页面右上角,再开启一个会话窗口,进行响应测试。

4. 访问并测试执行效果

本步骤指导您如何进行测试,并查看其相应效果。

  1. 测试同步消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "127.0.0.1:28082/messageSend/sendSync?id=1&name=xiaoming&action=go"

再观察会话1中的日志,会观察到如下的输出。

  1. 测试异步消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

wget -q -O- "127.0.0.1:28082/messageSend/sendAsync?id=2&name=xiaohua&action=to"

再观察会话1中的日志,会观察到如下的输出。

  1. 测试事务消息。

执行如下命令,向刚才启动的程序发送一条简单的请求,以观察响应。

  • ags =1 是回调提交
  • ags =2 是回滚事务消息
  • ags =3 是直接提交
wget -q -O- "127.0.0.1:28082/messageSend/sendTransactional?id=1&name=xiaoming&action=go&ags=1"
wget -q -O- "127.0.0.1:28082/messageSend/sendTransactional?id=1&name=xiaoming&action=go&ags=2"
wget -q -O- "127.0.0.1:28082/sendTransactional?id=1&name=xiaoming&action=go&ags=3"

再观察会话1中的日志,会观察到如下的输出。

5. 开发流程

本步骤讲述了核心配置。

  1. maven依赖。

  1. 配置讲解。
${固定前缀}.${识别标记}.${consumer or producer or null}.${变量}
  1. 固定标记。

spring固定前缀是:spring.cloud.stream.bindings。

RocketMQ固定前最是: spring.cloud.stream.rocketmq。

  1. 识别标记。

识别标记是配置中最要的,配置里面同样识别为一组。这组数据为一组配置,实例化一个生产者或者消费者。比如识别标记为 topic。注解Output("topic"),Input("topic"),StreamListener("topic"),就会识别对应的标记。

  1. spring-bind配置。

配置名

类型

默认值

说明

destination

String

无(必须)

topic名

content-type

String

无(必须)

数据序列化方式。如果传递是对象建议使用application/json。如果传递是string建议使用text/plain

配置演示,如下所示。

6. 生产者

本步骤将描述生产者主要配置、发送配置、发送流程演示。

  1. 配置说明。
  1. 主要配置。

配置名

类型

默认值

说明

group

String

无(必须)

消费组

maxMessageSize

int

1024 * 1024 * 4

消息最大大小

transactional

boolean

false

是否是事务消息

sync

boolean

false

是否以同步方式发送

vipChannelEnabled

boolean

true

是否使用vip channel

sendMessageTimeout

int

3000

发送消息超时时间

compressMessageBodyThreshold

int

1024 * 4

消息压缩门限值, 超过该值会自动压缩

retryTimesWhenSendFailed

int

2

同步发送消息失败前内部重试发送的最大次数

retryTimesWhenSendAsyncFailed

int

2

异步发送消息失败前内部重试发送的最大次数

关于output1详细讲解下。

  1. 发送配置。

在message构建时写入到setHeader里面,如下所示。

配置名

类型

默认值

说明

MessageConst.PROPERTY_TAGS

String

标签

MessageConst.PROPERTYDELAYTIME_LEVEL

int

0

延迟消费

BinderHeaders.PARTITION_HEADER

boolean

false

是否为分区顺序消息

RocketMQBinderConstants.ROCKETTRANSACTIONALARG

Object

发送事务消息时用于传入自定义参数

  1. 发送流程演示。
  1. 第一步:配置RocketMQ-binder 核心配置。

  1. 第二步:配置发送对象。

  1. 第三步:创建发送对象。

另外一种方式:

对发送对象进行封装,优势是代码简单,有模块层次感。如果方法多个地方调用可以提高代码可维护性。

细节:

  • 在封装类上必须标记@Component注解或者子注解。
  • 引入有Output注解的类。

  1. 第四步:配置发送对象。

建议在spring-boot的启动类上使用EnableBinding注解。

  1. 第五步:调用发送对象。

  1. 消息事务。

7. 消费者

本步骤将描述消费者主要配置、消费流程演示。

  1. 配置说明。
  1. 主要配置。

配置名

类型

默认值

说明

orderly

boolean

false

是否为顺序消费

broadcasting

boolean

false

是否使用广播模式

sql

String

null

SelectorType 模式为 sql

tags

String

null

SelectorType 模式为 tag

  1. Spring-bind配置。

配置名

类型

默认值

说明

group

String

无(必须)

消费组

concurrency

int

1

消费线程数

max-attempts

int

3

消费失败,spring重试次数。不建议使用, rocketmq默认有重试机制

back-off-initial-interval

int

100(毫秒)

第一次重试间隔

back-off-multiplier

int

2

重试时间因子。重试时间=back-off-initial-interval*back-off-multiplier

  1. 消费流程演示。
  1. 第一步:配置RocketMQ-binder 核心配置。

  1. 第二步:配置发送对象。

  1. 第三步:创建消费对象。

创建消费接口:必须创建消费接口,@StreamListener只读取@Input的配置,比如@StreamListener("xxxx") 里面的xxxx不存在Input里面,直接无效。

建议命名方式 {业务作用}StreamBindingConsumer。

Push模式:在方法上使用StreamListener注解表示消费。

建议命名方式 {业务作用}PushModeConsumer。

  1. MessageSource 支持。
  1. 创建pull对象。
  • 返回对象一定是PollableMessageSource。
  • 返回对象一定是PollableMessageSource。
  • 返回对象一定是PollableMessageSource。
  • 建议一个线程执行。
  • 建议在无限循环(while(true) or for(;;))调用pull方法。
  • 建议命名方式 {业务作用}PushModeConsumer。

实验地址:https://developer.aliyun.com/adc/scenario/5d7eb51b5b2348f2a30197ce7bc3ae4b

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1天前
|
存储 安全 Java
Spring Security 入门
Spring Security 是 Spring 框架中的安全模块,提供强大的认证和授权功能,支持防止常见攻击(如 CSRF 和会话固定攻击)。它通过过滤器链拦截请求,核心概念包括认证、授权和自定义过滤器。配置方面,涉及密码加密、用户信息服务、认证提供者及过滤器链设置。示例代码展示了如何配置登录、注销、CSRF防护等。常见问题包括循环重定向、静态资源被拦截和登录失败未返回错误信息,解决方法需确保路径正确和添加错误提示逻辑。
Spring Security 入门
|
6天前
|
消息中间件 Java 开发工具
【实践】快速学会使用云消息队列RabbitMQ版
本次分享的主题是快速学会使用云消息队列RabbitMQ版的实践。内容包括:如何创建和配置RabbitMQ实例,如Vhost、Exchange、Queue等;如何通过阿里云控制台管理静态用户名密码和AccessKey;以及如何使用RabbitMQ开源客户端进行消息生产和消费测试。最后介绍了实验资源的回收步骤,确保资源合理利用。通过详细的操作指南,帮助用户快速上手并掌握RabbitMQ的使用方法。
52 10
|
25天前
|
Java 开发者 微服务
Spring Boot 入门:简化 Java Web 开发的强大工具
Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级别的基于Spring框架的应用程序。它旨在简化Spring应用的初始搭建以及开发过程。
47 6
Spring Boot 入门:简化 Java Web 开发的强大工具
|
9天前
|
XML Java 数据格式
🌱 深入Spring的心脏:Bean配置的艺术与实践 🌟
本文深入探讨了Spring框架中Bean配置的奥秘,从基本概念到XML配置文件的使用,再到静态工厂方式实例化Bean的详细步骤,通过实际代码示例帮助读者更好地理解和应用Spring的Bean配置。希望对你的Spring开发之旅有所助益。
56 3
|
24天前
|
XML Java 数据格式
Spring Core核心类库的功能与应用实践分析
【12月更文挑战第1天】大家好,今天我们来聊聊Spring Core这个强大的核心类库。Spring Core作为Spring框架的基础,提供了控制反转(IOC)和依赖注入(DI)等核心功能,以及企业级功能,如JNDI和定时任务等。通过本文,我们将从概述、功能点、背景、业务点、底层原理等多个方面深入剖析Spring Core,并通过多个Java示例展示其应用实践,同时指出对应实践的优缺点。
52 14
|
20天前
|
缓存 Java 数据库连接
Spring框架中的事件机制:深入理解与实践
Spring框架是一个广泛使用的Java企业级应用框架,提供了依赖注入、面向切面编程(AOP)、事务管理、Web应用程序开发等一系列功能。在Spring框架中,事件机制是一种重要的通信方式,它允许不同组件之间进行松耦合的通信,提高了应用程序的可维护性和可扩展性。本文将深入探讨Spring框架中的事件机制,包括不同类型的事件、底层原理、应用实践以及优缺点。
49 8
|
25天前
|
负载均衡 Java 开发者
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
87 5
|
22天前
|
XML 前端开发 安全
Spring MVC:深入理解与应用实践
Spring MVC是Spring框架提供的一个用于构建Web应用程序的Model-View-Controller(MVC)实现。它通过分离业务逻辑、数据、显示来组织代码,使得Web应用程序的开发变得更加简洁和高效。本文将从概述、功能点、背景、业务点、底层原理等多个方面深入剖析Spring MVC,并通过多个Java示例展示其应用实践,同时指出对应实践的优缺点。
53 2
|
22天前
|
Java 数据库连接 数据库
从入门到精通---深入剖析Spring DAO
在Java企业级开发中,Spring框架以其强大的功能和灵活性,成为众多开发者的首选。Spring DAO(Data Access Object)作为Spring框架中处理数据访问的重要模块,对JDBC进行了抽象封装,极大地简化了数据访问异常的处理,并能统一管理JDBC事务。本文将从概述、功能点、背景、业务点、底层原理等多个方面深入剖析Spring DAO,并通过多个Java示例展示其应用实践,同时指出对应实践的优缺点。
22 1
|
26天前
|
安全 Java 数据安全/隐私保护
如何使用Spring Boot进行表单登录身份验证:从基础到实践
如何使用Spring Boot进行表单登录身份验证:从基础到实践
44 5

相关产品

  • 云消息队列 MQ