RocketMQ-Spring学习

简介: 们可以它的一系列动作操作,会首先将相关配置MessageConverterConfiguration、ListenerContainerConfiguration、ExtProducerResetConfiguration、ExtConsumerResetConfiguration、RocketMQTransactionConfiguration、RocketMQListenerConfiguration进行导入。在自动注入前处理事务配置RocketMQTransactionConfiguration,在自动注入后处理消息转换器配置MessageConverterConfiguration。

RocketMQ整合Spring后,可以很方便的使用。那么这里rocketmq-spring做了哪些事情,可以使得我们可以很方便使用呢?

一、rocketmq-spring对消息的处理过程

  rocketmq-spring-boot整合后的入口:org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration,通过这个类,我们可以它的一系列动作操作,会首先将相关配置MessageConverterConfiguration、ListenerContainerConfiguration、ExtProducerResetConfiguration、ExtConsumerResetConfiguration、RocketMQTransactionConfiguration、RocketMQListenerConfiguration进行导入。在自动注入前处理事务配置RocketMQTransactionConfiguration,在自动注入后处理消息转换器配置MessageConverterConfiguration。

完成之后会填充两个bean:

defaultMQProducer

defaultLitePullConsumer

根据当前启动的生产和消费者,进行填充。完成后,就可以根据注册的bean容器,此时会执行bean后置处理器操作,完成对带RocketMQMessageListener的注解进行增强,然后注册。注册完成后,便可以对默认rocketmq监听器容器DefaultRocketMQListenerContainer进行启动,启动的过程其实是启动DefaultMQPushConsumer的过程,也即此时会调用rocketmq的this.defaultMQPushConsumerImpl.start();完成对消费者的启动。启动消费者完成后,等待消费。根据消息是否需要顺序消费,分为顺序消费和并发消费。此时都会调用同一个 handleMessage(messageExt)来处理消息,此时可以看到分为两种:

rocketMQListener

rocketMQReplyListener

此时根据当前选择的监听器来做对应业务实现处理,执行消费。

这里可以看到rocketmq-spring对litepull模式的消息没有demo。需要去rocketmq项目找例子看具体的过程。

1.首先我们可以从rocketmq-spring-boot-sample中看到整合还是很方便的。

下面我们来看一下example:

二、生产者

image-20230206204310051.png

生产者demo:

@SpringBootApplication

publicclassProducerACLApplicationimplementsCommandLineRunner {

   @Override

   publicvoidrun(String... args) throwsException {

       // 发送消息

       SendResultsendResult=rocketMQTemplate.syncSend(springTopic+":acl", "Hello, ACL Msg!");

       System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);

       // 使用spring message发送消息

       sendResult=rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello, World! I'm from spring message & ACL Msg").build());

       System.out.printf("syncSend2 to topic %s sendResult=%s %n", springTopic, sendResult);

        //发送事务消息

       testTransaction();

   }

从demo中,我们可以看到通常分为三类:

使用rocketmq模板发送消费

使用springMessage发送消息

发送事务消息

      同时spring自动注入后,其实是可以通过RocketMQProperties拿到的topic信息有限,因此这里采用了值注入的方式,方便放入多个topic,从而实现对多个业务系统对接消息。

三、消费者

image-20230206210112132.png

同样,也是配置好需要消费的信息后,执行监听器或者使用定时任务

@SpringBootApplication

publicclassConsumerACLApplicationimplementsCommandLineRunner {

   @Resource

   privateRocketMQTemplaterocketMQTemplate;

   @Override

   publicvoidrun(String... args) throwsException {

       List<String>messages=rocketMQTemplate.receive(String.class);

       System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);

   }

}

或者采用监听的方式:

@Service

@RocketMQMessageListener(topic="normal_topic_define_in_cloud_MQ",consumerGroup="group_define_in_cloud_MQ",accessKey="AK", secretKey="SK")

publicclassACLStringConsumerimplementsRocketMQListener<String> {

   

   @Override

   publicvoidonMessage(Stringmessage) {

       System.out.printf("------- ACL StringConsumer received: %s \n", message);

   }

}

      配置和RocketMQProperties里面的配置名称一样,它就可以帮你读到对应的配置,并进行相关bean的注入,可以在RocketMQAutoConfiguration可以看到对应的配置信息。比如此时业务系统是消费端的时候,此时会将对应的配置信息注入到defaultLitePullConsumer,相关参数,来源于rocketMQProperties和consumerConfig中:

   pullConsumer拉取消费者

   nameServernameServer地址

   group消费组

   topic主题

   accessChannel通道

   messageModel消息模式,分为广播、集群

   selectorType筛选器类型,分为sql92、tag

   accessKeykey

   secretKey密钥

   pullBatchSize拉取批次大小

   tlsEnable是否开启tls

   enableMsgTrace是否开启msgTrace

   customizedTraceTopic自定义链路主题

   namespace命名空间

   instanceName实例名称

       RocketMQListenerConfiguration会注册相关的监听器后置处理器。可以看到后置处理器RocketMQMessageListenerBeanPostProcessor中会执行后置处理方法postProcessAfterInitialization对带RocketMQMessageListener注解的class进行增强,同时在监听容器配置中注册容器:

@Override

   publicObjectpostProcessAfterInitialization(Objectbean, StringbeanName) throwsBeansException {

       Class<?>targetClass=AopUtils.getTargetClass(bean);

       RocketMQMessageListenerann=targetClass.getAnnotation(RocketMQMessageListener.class);

       if (ann!=null) {

           // 执行注解增强

           RocketMQMessageListenerenhance=enhance(targetClass, ann);

           //如果监听容器配置不为空,则注册容器

           if (listenerContainerConfiguration!=null) {

               listenerContainerConfiguration.registerContainer(beanName, bean, enhance);

           }

       }

       returnbean;

   }

注册完容器后,就可以启动容器了,因此可以在org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration#registerContainer方法中看到容器启动:

container.start();

也即此时我们可以看到容器是默认rocketmq监听器容器DefaultRocketMQListenerContainer启动的是默认推模式的消费DefaultMQPushConsumer,而启动的时候其实就是调用rocketmq中的消费者方法:

this.defaultMQPushConsumerImpl.start();

启动消费者,等待消费。

推消费分为两种,可以从消费模式种看到:

     在默认mq监听容器DefaultRocketMQListenerContainer中可以看到在初始化rocketmq推模式消费 initRocketMQPushConsumer,这个方法很重要,会通过onMessage执行我们写的监听方法

 switch (consumeMode) {

           caseORDERLY:

               consumer.setMessageListener(newDefaultMessageListenerOrderly());

               break;

           caseCONCURRENTLY:

               consumer.setMessageListener(newDefaultMessageListenerConcurrently());

               break;

           default:

               thrownewIllegalArgumentException("Property 'consumeMode' was wrong.");

}

根据消费模式分为两种,一种是按照顺序消费,一种是并发消费,可以看到里面有一个重要方法:

  handleMessage(messageExt);

我们可以看到这个方法:

privatevoidhandleMessage(

       MessageExtmessageExt) throwsMQClientException, RemotingException, InterruptedException {

       //不带响应

       if (rocketMQListener!=null) {

           //执行rocketmq监听 处理消息

           rocketMQListener.onMessage(doConvertMessage(messageExt));

       } elseif (rocketMQReplyListener!=null) {

           //有响应,消费消息

           ObjectreplyContent=rocketMQReplyListener.onMessage(doConvertMessage(messageExt));

           Message<?>message=MessageBuilder.withPayload(replyContent).build();

           org.apache.rocketmq.common.message.MessagereplyMessage=MessageUtil.createReplyMessage(messageExt, convertToBytes(message));

           //获取生产者,执行生产者发送回调

           DefaultMQProducerproducer=consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();

           producer.setSendMsgTimeout(replyTimeout);

           producer.send(replyMessage, newSendCallback() {

               @Override

               publicvoidonSuccess(SendResultsendResult) {

                   if (sendResult.getSendStatus() !=SendStatus.SEND_OK) {

                       log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());

                   } else {

                       log.debug("Consumer replies message success.");

                   }

               }

               @Override

               publicvoidonException(Throwablee) {

                   log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());

               }

           });

       }

   }

如果监听器rocketMQListener存在,则执行消费消息,也即onMessage的操作,执行业务系统的onMessage接口的实现。

如果响应监听器rocketMQReplyListener存在,则获取响应信息,MessageBuilder构建Message对象信息,此时的消息类型是Spring Message类型的,需要转成Rocketmq自己的类型的。从默认mq推消费者实现中获取默认的mq生产者,执行生产者发送回调。

从这个接口的注释可以看到:

/**

* The consumer supporting request-reply should implement this interface.

*

* @param <T> the type of data received by the listener

* @param <R> the type of data replying to producer

*/

publicinterfaceRocketMQReplyListener<T, R> {

   /**

    * @param message data received by the listener

    * @return data replying to producer

    */

   RonMessage(Tmessage);

}

四、ExtRocketMQTemplate

在提供的demo中,出现了ExtRocketMQTemplate这个类,用原来的RocketMQTemplate不行吗,为啥会有这个类呢?

@ExtRocketMQConsumerConfiguration(topic = "${demo.rocketmq.topic}", group = "string_consumer", tlsEnable = "${demo.ext.consumer.tlsEnable}")

public class ExtRocketMQTemplate extends RocketMQTemplate {

}

可以看到这个类上面的配置@ExtRocketMQConsumerConfiguration

   StringNAME_SERVER_PLACEHOLDER="${rocketmq.name-server:}";

   StringGROUP_PLACEHOLDER="${rocketmq.pull-consumer.group:}";

   StringTOPIC_PLACEHOLDER="${rocketmq.pull-consumer.topic:}";

   StringACCESS_CHANNEL_PLACEHOLDER="${rocketmq.access-channel:}";

   StringACCESS_KEY_PLACEHOLDER="${rocketmq.pull-consumer.access-key:}";

   StringSECRET_KEY_PLACEHOLDER="${rocketmq.pull-consumer.secret-key:}";

   StringTRACE_TOPIC_PLACEHOLDER="${rocketmq.pull-consumer.customized-trace-topic:}";

可以看到如果有多个消息集群的时候,这里可以方便我们替换rocketmq的地址信息,从而实现对多服务的消费。也即业务系统可能出现对接多个消息,其消息的topic可能不同,方便对接,因此其注解上面可以可以方便替换成想要的rocketmq的地址信息。

可以看到三个重置方法:

ExtProducerResetConfiguration

ExtConsumerResetConfiguration

都实现了SmartInitializingSingleton,其会在bean加载完成后执行。

五、RocketMQTemplate

在RocketMQTemplate中,我们看到的是大量的发送方法,主要包括这几类方法:

同步发送

异步发送

只发送不管消费

延迟消息 发送

事务消息发送

根据顺序、延迟程度、是否批量进一步分为下面这些方法:

send

sendAndReceive

syncSend

syncSendDelayTimeSeconds

syncSendDelayTimeMills

syncSendDeliverTimeMills

syncSendOrderly

asyncSend

asyncSendOrderly

sendOneWay

sendOneWayOrderly

sendMessageInTransaction


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
3月前
|
消息中间件 JSON 缓存
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
|
3月前
|
消息中间件 存储 数据安全/隐私保护
深入学习RabbitMQ五种模式(一)
深入学习RabbitMQ五种模式(一)
39 0
|
19天前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
15 0
|
2月前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
53 0
|
3月前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)(下)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
30 0
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)(上)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
69 0
|
3月前
|
消息中间件 存储 Java
RabbitMQ之延迟队列(手把手教你学习延迟队列)
【1月更文挑战第12天】延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列的。
345 1
|
3月前
|
消息中间件 存储
深入学习RabbitMQ五种模式(二)
深入学习RabbitMQ五种模式(二)
26 0
|
3月前
|
消息中间件 JSON 缓存
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(基于SpringBoot)
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(基于SpringBoot)

热门文章

最新文章