RocketMQ-Spring学习

简介: 们可以它的一系列动作操作,会首先将相关配置MessageConverterConfiguration、ListenerContainerConfiguration、ExtProducerResetConfiguration、ExtConsumerResetConfiguration、RocketMQTransactionConfiguration、RocketMQListenerConfiguration进行导入。在自动注入前处理事务配置RocketMQTransactionConfiguration,在自动注入后处理消息转换器配置MessageConverterConfiguration。

RocketMQ整合Spring后,可以很方便的使用。那么这里rocketmq-spring做了哪些事情,可以使得我们可以很方便使用呢?

一、rocketmq-spring对消息的处理过程

  rocketmq-spring-boot整合后的入口:org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration,通过这个类,我们可以它的一系列动作操作,会首先将相关配置MessageConverterConfiguration、ListenerContainerConfiguration、ExtProducerResetConfiguration、ExtConsumerResetConfiguration、RocketMQTransactionConfiguration、RocketMQListenerConfiguration进行导入。在自动注入前处理事务配置RocketMQTransactionConfiguration,在自动注入后处理消息转换器配置MessageConverterConfiguration。

完成之后会填充两个bean:

defaultMQProducer

defaultLitePullConsumer

根据当前启动的生产和消费者,进行填充。完成后,就可以根据注册的bean容器,此时会执行bean后置处理器操作,完成对带RocketMQMessageListener的注解进行增强,然后注册。注册完成后,便可以对默认rocketmq监听器容器DefaultRocketMQListenerContainer进行启动,启动的过程其实是启动DefaultMQPushConsumer的过程,也即此时会调用rocketmq的this.defaultMQPushConsumerImpl.start();完成对消费者的启动。启动消费者完成后,等待消费。根据消息是否需要顺序消费,分为顺序消费和并发消费。此时都会调用同一个 handleMessage(messageExt)来处理消息,此时可以看到分为两种:

rocketMQListener

rocketMQReplyListener

此时根据当前选择的监听器来做对应业务实现处理,执行消费。

这里可以看到rocketmq-spring对litepull模式的消息没有demo。需要去rocketmq项目找例子看具体的过程。

1.首先我们可以从rocketmq-spring-boot-sample中看到整合还是很方便的。

下面我们来看一下example:

二、生产者

image-20230206204310051.png

生产者demo:

@SpringBootApplication

publicclassProducerACLApplicationimplementsCommandLineRunner {

   @Override

   publicvoidrun(String... args) throwsException {

       // 发送消息

       SendResultsendResult=rocketMQTemplate.syncSend(springTopic+":acl", "Hello, ACL Msg!");

       System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);

       // 使用spring message发送消息

       sendResult=rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello, World! I'm from spring message & ACL Msg").build());

       System.out.printf("syncSend2 to topic %s sendResult=%s %n", springTopic, sendResult);

        //发送事务消息

       testTransaction();

   }

从demo中,我们可以看到通常分为三类:

使用rocketmq模板发送消费

使用springMessage发送消息

发送事务消息

      同时spring自动注入后,其实是可以通过RocketMQProperties拿到的topic信息有限,因此这里采用了值注入的方式,方便放入多个topic,从而实现对多个业务系统对接消息。

三、消费者

image-20230206210112132.png

同样,也是配置好需要消费的信息后,执行监听器或者使用定时任务

@SpringBootApplication

publicclassConsumerACLApplicationimplementsCommandLineRunner {

   @Resource

   privateRocketMQTemplaterocketMQTemplate;

   @Override

   publicvoidrun(String... args) throwsException {

       List<String>messages=rocketMQTemplate.receive(String.class);

       System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);

   }

}

或者采用监听的方式:

@Service

@RocketMQMessageListener(topic="normal_topic_define_in_cloud_MQ",consumerGroup="group_define_in_cloud_MQ",accessKey="AK", secretKey="SK")

publicclassACLStringConsumerimplementsRocketMQListener<String> {

   

   @Override

   publicvoidonMessage(Stringmessage) {

       System.out.printf("------- ACL StringConsumer received: %s \n", message);

   }

}

      配置和RocketMQProperties里面的配置名称一样,它就可以帮你读到对应的配置,并进行相关bean的注入,可以在RocketMQAutoConfiguration可以看到对应的配置信息。比如此时业务系统是消费端的时候,此时会将对应的配置信息注入到defaultLitePullConsumer,相关参数,来源于rocketMQProperties和consumerConfig中:

   pullConsumer拉取消费者

   nameServernameServer地址

   group消费组

   topic主题

   accessChannel通道

   messageModel消息模式,分为广播、集群

   selectorType筛选器类型,分为sql92、tag

   accessKeykey

   secretKey密钥

   pullBatchSize拉取批次大小

   tlsEnable是否开启tls

   enableMsgTrace是否开启msgTrace

   customizedTraceTopic自定义链路主题

   namespace命名空间

   instanceName实例名称

       RocketMQListenerConfiguration会注册相关的监听器后置处理器。可以看到后置处理器RocketMQMessageListenerBeanPostProcessor中会执行后置处理方法postProcessAfterInitialization对带RocketMQMessageListener注解的class进行增强,同时在监听容器配置中注册容器:

@Override

   publicObjectpostProcessAfterInitialization(Objectbean, StringbeanName) throwsBeansException {

       Class<?>targetClass=AopUtils.getTargetClass(bean);

       RocketMQMessageListenerann=targetClass.getAnnotation(RocketMQMessageListener.class);

       if (ann!=null) {

           // 执行注解增强

           RocketMQMessageListenerenhance=enhance(targetClass, ann);

           //如果监听容器配置不为空,则注册容器

           if (listenerContainerConfiguration!=null) {

               listenerContainerConfiguration.registerContainer(beanName, bean, enhance);

           }

       }

       returnbean;

   }

注册完容器后,就可以启动容器了,因此可以在org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration#registerContainer方法中看到容器启动:

container.start();

也即此时我们可以看到容器是默认rocketmq监听器容器DefaultRocketMQListenerContainer启动的是默认推模式的消费DefaultMQPushConsumer,而启动的时候其实就是调用rocketmq中的消费者方法:

this.defaultMQPushConsumerImpl.start();

启动消费者,等待消费。

推消费分为两种,可以从消费模式种看到:

     在默认mq监听容器DefaultRocketMQListenerContainer中可以看到在初始化rocketmq推模式消费 initRocketMQPushConsumer,这个方法很重要,会通过onMessage执行我们写的监听方法

 switch (consumeMode) {

           caseORDERLY:

               consumer.setMessageListener(newDefaultMessageListenerOrderly());

               break;

           caseCONCURRENTLY:

               consumer.setMessageListener(newDefaultMessageListenerConcurrently());

               break;

           default:

               thrownewIllegalArgumentException("Property 'consumeMode' was wrong.");

}

根据消费模式分为两种,一种是按照顺序消费,一种是并发消费,可以看到里面有一个重要方法:

  handleMessage(messageExt);

我们可以看到这个方法:

privatevoidhandleMessage(

       MessageExtmessageExt) throwsMQClientException, RemotingException, InterruptedException {

       //不带响应

       if (rocketMQListener!=null) {

           //执行rocketmq监听 处理消息

           rocketMQListener.onMessage(doConvertMessage(messageExt));

       } elseif (rocketMQReplyListener!=null) {

           //有响应,消费消息

           ObjectreplyContent=rocketMQReplyListener.onMessage(doConvertMessage(messageExt));

           Message<?>message=MessageBuilder.withPayload(replyContent).build();

           org.apache.rocketmq.common.message.MessagereplyMessage=MessageUtil.createReplyMessage(messageExt, convertToBytes(message));

           //获取生产者,执行生产者发送回调

           DefaultMQProducerproducer=consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();

           producer.setSendMsgTimeout(replyTimeout);

           producer.send(replyMessage, newSendCallback() {

               @Override

               publicvoidonSuccess(SendResultsendResult) {

                   if (sendResult.getSendStatus() !=SendStatus.SEND_OK) {

                       log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());

                   } else {

                       log.debug("Consumer replies message success.");

                   }

               }

               @Override

               publicvoidonException(Throwablee) {

                   log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());

               }

           });

       }

   }

如果监听器rocketMQListener存在,则执行消费消息,也即onMessage的操作,执行业务系统的onMessage接口的实现。

如果响应监听器rocketMQReplyListener存在,则获取响应信息,MessageBuilder构建Message对象信息,此时的消息类型是Spring Message类型的,需要转成Rocketmq自己的类型的。从默认mq推消费者实现中获取默认的mq生产者,执行生产者发送回调。

从这个接口的注释可以看到:

/**

* The consumer supporting request-reply should implement this interface.

*

* @param <T> the type of data received by the listener

* @param <R> the type of data replying to producer

*/

publicinterfaceRocketMQReplyListener<T, R> {

   /**

    * @param message data received by the listener

    * @return data replying to producer

    */

   RonMessage(Tmessage);

}

四、ExtRocketMQTemplate

在提供的demo中,出现了ExtRocketMQTemplate这个类,用原来的RocketMQTemplate不行吗,为啥会有这个类呢?

@ExtRocketMQConsumerConfiguration(topic = "${demo.rocketmq.topic}", group = "string_consumer", tlsEnable = "${demo.ext.consumer.tlsEnable}")

public class ExtRocketMQTemplate extends RocketMQTemplate {

}

可以看到这个类上面的配置@ExtRocketMQConsumerConfiguration

   StringNAME_SERVER_PLACEHOLDER="${rocketmq.name-server:}";

   StringGROUP_PLACEHOLDER="${rocketmq.pull-consumer.group:}";

   StringTOPIC_PLACEHOLDER="${rocketmq.pull-consumer.topic:}";

   StringACCESS_CHANNEL_PLACEHOLDER="${rocketmq.access-channel:}";

   StringACCESS_KEY_PLACEHOLDER="${rocketmq.pull-consumer.access-key:}";

   StringSECRET_KEY_PLACEHOLDER="${rocketmq.pull-consumer.secret-key:}";

   StringTRACE_TOPIC_PLACEHOLDER="${rocketmq.pull-consumer.customized-trace-topic:}";

可以看到如果有多个消息集群的时候,这里可以方便我们替换rocketmq的地址信息,从而实现对多服务的消费。也即业务系统可能出现对接多个消息,其消息的topic可能不同,方便对接,因此其注解上面可以可以方便替换成想要的rocketmq的地址信息。

可以看到三个重置方法:

ExtProducerResetConfiguration

ExtConsumerResetConfiguration

都实现了SmartInitializingSingleton,其会在bean加载完成后执行。

五、RocketMQTemplate

在RocketMQTemplate中,我们看到的是大量的发送方法,主要包括这几类方法:

同步发送

异步发送

只发送不管消费

延迟消息 发送

事务消息发送

根据顺序、延迟程度、是否批量进一步分为下面这些方法:

send

sendAndReceive

syncSend

syncSendDelayTimeSeconds

syncSendDelayTimeMills

syncSendDeliverTimeMills

syncSendOrderly

asyncSend

asyncSendOrderly

sendOneWay

sendOneWayOrderly

sendMessageInTransaction


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
前端开发 Java 开发者
Spring生态学习路径与源码深度探讨
【11月更文挑战第13天】Spring框架作为Java企业级开发中的核心框架,其丰富的生态系统和强大的功能吸引了无数开发者的关注。学习Spring生态不仅仅是掌握Spring Framework本身,更需要深入理解其周边组件和工具,以及源码的底层实现逻辑。本文将从Spring生态的学习路径入手,详细探讨如何系统地学习Spring,并深入解析各个重点的底层实现逻辑。
72 9
|
3月前
|
前端开发 Java 数据库
SpringBoot学习
【10月更文挑战第7天】Spring学习
46 9
|
2月前
|
Java Kotlin 索引
学习Spring框架特性及jiar包下载
Spring 5作为最新版本,更新了JDK基线至8,修订了核心框架,增强了反射和接口功能,支持响应式编程及Kotlin语言,引入了函数式Web框架,并提升了测试功能。Spring框架可在其官网下载,包括文档、jar包和XML Schema文档,适用于Java SE和Java EE项目。
36 0
|
3月前
|
XML Java 数据格式
Spring学习
【10月更文挑战第6天】Spring学习
29 1
|
3月前
|
Java 测试技术 开发者
springboot学习四:Spring Boot profile多环境配置、devtools热部署
这篇文章主要介绍了如何在Spring Boot中进行多环境配置以及如何整合DevTools实现热部署,以提高开发效率。
119 2
|
3月前
|
前端开发 Java 程序员
springboot 学习十五:Spring Boot 优雅的集成Swagger2、Knife4j
这篇文章是关于如何在Spring Boot项目中集成Swagger2和Knife4j来生成和美化API接口文档的详细教程。
304 1
|
3月前
|
Java API Spring
springboot学习七:Spring Boot2.x 拦截器基础入门&实战项目场景实现
这篇文章是关于Spring Boot 2.x中拦截器的入门教程和实战项目场景实现的详细指南。
42 0
springboot学习七:Spring Boot2.x 拦截器基础入门&实战项目场景实现
|
3月前
|
Java API Spring
springboot学习六:Spring Boot2.x 过滤器基础入门&实战项目场景实现
这篇文章是关于Spring Boot 2.x中过滤器的基础知识和实战项目应用的教程。
47 0
springboot学习六:Spring Boot2.x 过滤器基础入门&实战项目场景实现
|
3月前
|
Java 关系型数据库 MySQL
springboot学习五:springboot整合Mybatis 连接 mysql数据库
这篇文章是关于如何使用Spring Boot整合MyBatis来连接MySQL数据库,并进行基本的增删改查操作的教程。
358 0
springboot学习五:springboot整合Mybatis 连接 mysql数据库
|
3月前
|
Java 关系型数据库 MySQL
springboot学习四:springboot链接mysql数据库,使用JdbcTemplate 操作mysql
这篇文章是关于如何使用Spring Boot框架通过JdbcTemplate操作MySQL数据库的教程。
133 0
springboot学习四:springboot链接mysql数据库,使用JdbcTemplate 操作mysql