RocketMQ整合Spring后,可以很方便的使用。那么这里rocketmq-spring做了哪些事情,可以使得我们可以很方便使用呢?
一、rocketmq-spring对消息的处理过程
rocketmq-spring-boot整合后的入口:org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration,通过这个类,我们可以它的一系列动作操作,会首先将相关配置MessageConverterConfiguration、ListenerContainerConfiguration、ExtProducerResetConfiguration、ExtConsumerResetConfiguration、RocketMQTransactionConfiguration、RocketMQListenerConfiguration进行导入。在自动注入前处理事务配置RocketMQTransactionConfiguration,在自动注入后处理消息转换器配置MessageConverterConfiguration。
完成之后会填充两个bean:
defaultMQProducer
defaultLitePullConsumer
根据当前启动的生产和消费者,进行填充。完成后,就可以根据注册的bean容器,此时会执行bean后置处理器操作,完成对带RocketMQMessageListener的注解进行增强,然后注册。注册完成后,便可以对默认rocketmq监听器容器DefaultRocketMQListenerContainer进行启动,启动的过程其实是启动DefaultMQPushConsumer的过程,也即此时会调用rocketmq的this.defaultMQPushConsumerImpl.start();完成对消费者的启动。启动消费者完成后,等待消费。根据消息是否需要顺序消费,分为顺序消费和并发消费。此时都会调用同一个 handleMessage(messageExt)来处理消息,此时可以看到分为两种:
rocketMQListener
rocketMQReplyListener
此时根据当前选择的监听器来做对应业务实现处理,执行消费。
这里可以看到rocketmq-spring对litepull模式的消息没有demo。需要去rocketmq项目找例子看具体的过程。
1.首先我们可以从rocketmq-spring-boot-sample中看到整合还是很方便的。
下面我们来看一下example:
二、生产者
生产者demo:
@SpringBootApplication
publicclassProducerACLApplicationimplementsCommandLineRunner {
@Override
publicvoidrun(String... args) throwsException {
// 发送消息
SendResultsendResult=rocketMQTemplate.syncSend(springTopic+":acl", "Hello, ACL Msg!");
System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
// 使用spring message发送消息
sendResult=rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello, World! I'm from spring message & ACL Msg").build());
System.out.printf("syncSend2 to topic %s sendResult=%s %n", springTopic, sendResult);
//发送事务消息
testTransaction();
}
从demo中,我们可以看到通常分为三类:
使用rocketmq模板发送消费
使用springMessage发送消息
发送事务消息
同时spring自动注入后,其实是可以通过RocketMQProperties拿到的topic信息有限,因此这里采用了值注入的方式,方便放入多个topic,从而实现对多个业务系统对接消息。
三、消费者
同样,也是配置好需要消费的信息后,执行监听器或者使用定时任务
@SpringBootApplication
publicclassConsumerACLApplicationimplementsCommandLineRunner {
@Resource
privateRocketMQTemplaterocketMQTemplate;
@Override
publicvoidrun(String... args) throwsException {
List<String>messages=rocketMQTemplate.receive(String.class);
System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);
}
}
或者采用监听的方式:
@Service
@RocketMQMessageListener(topic="normal_topic_define_in_cloud_MQ",consumerGroup="group_define_in_cloud_MQ",accessKey="AK", secretKey="SK")
publicclassACLStringConsumerimplementsRocketMQListener<String> {
@Override
publicvoidonMessage(Stringmessage) {
System.out.printf("------- ACL StringConsumer received: %s \n", message);
}
}
配置和RocketMQProperties里面的配置名称一样,它就可以帮你读到对应的配置,并进行相关bean的注入,可以在RocketMQAutoConfiguration可以看到对应的配置信息。比如此时业务系统是消费端的时候,此时会将对应的配置信息注入到defaultLitePullConsumer,相关参数,来源于rocketMQProperties和consumerConfig中:
pullConsumer拉取消费者
nameServernameServer地址
group消费组
topic主题
accessChannel通道
messageModel消息模式,分为广播、集群
selectorType筛选器类型,分为sql92、tag
accessKeykey
secretKey密钥
pullBatchSize拉取批次大小
tlsEnable是否开启tls
enableMsgTrace是否开启msgTrace
customizedTraceTopic自定义链路主题
namespace命名空间
instanceName实例名称
RocketMQListenerConfiguration会注册相关的监听器后置处理器。可以看到后置处理器RocketMQMessageListenerBeanPostProcessor中会执行后置处理方法postProcessAfterInitialization对带RocketMQMessageListener注解的class进行增强,同时在监听容器配置中注册容器:
@Override
publicObjectpostProcessAfterInitialization(Objectbean, StringbeanName) throwsBeansException {
Class<?>targetClass=AopUtils.getTargetClass(bean);
RocketMQMessageListenerann=targetClass.getAnnotation(RocketMQMessageListener.class);
if (ann!=null) {
// 执行注解增强
RocketMQMessageListenerenhance=enhance(targetClass, ann);
//如果监听容器配置不为空,则注册容器
if (listenerContainerConfiguration!=null) {
listenerContainerConfiguration.registerContainer(beanName, bean, enhance);
}
}
returnbean;
}
注册完容器后,就可以启动容器了,因此可以在org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration#registerContainer方法中看到容器启动:
container.start();
也即此时我们可以看到容器是默认rocketmq监听器容器DefaultRocketMQListenerContainer启动的是默认推模式的消费DefaultMQPushConsumer,而启动的时候其实就是调用rocketmq中的消费者方法:
this.defaultMQPushConsumerImpl.start();
启动消费者,等待消费。
推消费分为两种,可以从消费模式种看到:
在默认mq监听容器DefaultRocketMQListenerContainer中可以看到在初始化rocketmq推模式消费 initRocketMQPushConsumer,这个方法很重要,会通过onMessage执行我们写的监听方法
switch (consumeMode) {
caseORDERLY:
consumer.setMessageListener(newDefaultMessageListenerOrderly());
break;
caseCONCURRENTLY:
consumer.setMessageListener(newDefaultMessageListenerConcurrently());
break;
default:
thrownewIllegalArgumentException("Property 'consumeMode' was wrong.");
}
根据消费模式分为两种,一种是按照顺序消费,一种是并发消费,可以看到里面有一个重要方法:
handleMessage(messageExt);
我们可以看到这个方法:
privatevoidhandleMessage(
MessageExtmessageExt) throwsMQClientException, RemotingException, InterruptedException {
//不带响应
if (rocketMQListener!=null) {
//执行rocketmq监听 处理消息
rocketMQListener.onMessage(doConvertMessage(messageExt));
} elseif (rocketMQReplyListener!=null) {
//有响应,消费消息
ObjectreplyContent=rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
Message<?>message=MessageBuilder.withPayload(replyContent).build();
org.apache.rocketmq.common.message.MessagereplyMessage=MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
//获取生产者,执行生产者发送回调
DefaultMQProducerproducer=consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
producer.setSendMsgTimeout(replyTimeout);
producer.send(replyMessage, newSendCallback() {
@Override
publicvoidonSuccess(SendResultsendResult) {
if (sendResult.getSendStatus() !=SendStatus.SEND_OK) {
log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
} else {
log.debug("Consumer replies message success.");
}
}
@Override
publicvoidonException(Throwablee) {
log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
}
});
}
}
如果监听器rocketMQListener存在,则执行消费消息,也即onMessage的操作,执行业务系统的onMessage接口的实现。
如果响应监听器rocketMQReplyListener存在,则获取响应信息,MessageBuilder构建Message对象信息,此时的消息类型是Spring Message类型的,需要转成Rocketmq自己的类型的。从默认mq推消费者实现中获取默认的mq生产者,执行生产者发送回调。
从这个接口的注释可以看到:
/**
* The consumer supporting request-reply should implement this interface.
*
* @param <T> the type of data received by the listener
* @param <R> the type of data replying to producer
*/
publicinterfaceRocketMQReplyListener<T, R> {
/**
* @param message data received by the listener
* @return data replying to producer
*/
RonMessage(Tmessage);
}
四、ExtRocketMQTemplate
在提供的demo中,出现了ExtRocketMQTemplate这个类,用原来的RocketMQTemplate不行吗,为啥会有这个类呢?
@ExtRocketMQConsumerConfiguration(topic = "${demo.rocketmq.topic}", group = "string_consumer", tlsEnable = "${demo.ext.consumer.tlsEnable}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
可以看到这个类上面的配置@ExtRocketMQConsumerConfiguration
StringNAME_SERVER_PLACEHOLDER="${rocketmq.name-server:}";
StringGROUP_PLACEHOLDER="${rocketmq.pull-consumer.group:}";
StringTOPIC_PLACEHOLDER="${rocketmq.pull-consumer.topic:}";
StringACCESS_CHANNEL_PLACEHOLDER="${rocketmq.access-channel:}";
StringACCESS_KEY_PLACEHOLDER="${rocketmq.pull-consumer.access-key:}";
StringSECRET_KEY_PLACEHOLDER="${rocketmq.pull-consumer.secret-key:}";
StringTRACE_TOPIC_PLACEHOLDER="${rocketmq.pull-consumer.customized-trace-topic:}";
可以看到如果有多个消息集群的时候,这里可以方便我们替换rocketmq的地址信息,从而实现对多服务的消费。也即业务系统可能出现对接多个消息,其消息的topic可能不同,方便对接,因此其注解上面可以可以方便替换成想要的rocketmq的地址信息。
可以看到三个重置方法:
ExtProducerResetConfiguration
ExtConsumerResetConfiguration
都实现了SmartInitializingSingleton,其会在bean加载完成后执行。
五、RocketMQTemplate
在RocketMQTemplate中,我们看到的是大量的发送方法,主要包括这几类方法:
同步发送
异步发送
只发送不管消费
延迟消息 发送
事务消息发送
根据顺序、延迟程度、是否批量进一步分为下面这些方法:
send
sendAndReceive
syncSend
syncSendDelayTimeSeconds
syncSendDelayTimeMills
syncSendDeliverTimeMills
syncSendOrderly
asyncSend
asyncSendOrderly
sendOneWay
sendOneWayOrderly
sendMessageInTransaction