说明:老项目,使用的是spring 3项目,需要对接RocketMQ,配置完之后,在消费者监听方法中,发现业务处理service注入不进来,最后检查发现是因为消费者监听工具类没有被正确的初始化,所以它里边的业务service注入之后是个null,于是各种折腾,特此记录一下
方式一:
解决:对需要初始化的类实现InitializingBean接口,重写afterPropertiesSet()方法,在afterPropertiesSet方法中调用需要被初始化的方法
代码如下:
import xx.xxx.component.BaseServiceMqConsumer;
import xx.xxx.service.VideoConsumerService;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import //代码效果参考:http://www.zidongmutanji.com/zsjx/546609.html
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.util.List;
@DependsOn("RocketMqConfig")
@Component
public class RocketMqConsumerUtil implements InitializingBean //代码效果参考:http://www.zidongmutanji.com/zsjx/304231.html
{private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class);
@Autowired
private VideoConsumerService videoConsumerService;
/
接收消息
/
public void listener(){
// 获取消息生产者
DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer();
// 订阅主体
try {
consumer.subscribe(RocketMqUtil.topic, "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
/
默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
/
public ConsumeConcurrentlyStatus consumeMessage(
List[/span>MessageExt
MessageExt messageExt = msgs.get(0);
String msg = null;
try {
msg = new String(messageExt.getBody(),"utf-8");
} catch (UnsupportedEncodingException e) {
log.error("消息编码失败,MsgBody:{}",new String(messageExt.getBody()));
e.printStackTrace();
}
log.info("消费开始-MsgBody:{}",msg);
// String msg = new String(messageExt.getBody());
// log.info("MsgBody:{}",new String(messageExt.getBody()));
if (messageExt.getTopic().equals(RocketMqUtil.topic)) {
// topic的消费逻辑
if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) {
// 根据Tag消费消息,具体消费消息的业务方法
videoConsumerService.dealVideoMsg(msg);
}
} else if (messageExt.getTopic().equals("TopicTest2")) {
// 执行TopicTest2的消费逻辑
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
Consumer对象在使用之前必须要调用start初始化,初始化一次即可[/span>br
/
consumer.start();
log.info("rocketmq-consumer 启动成功---");
} catch (MQClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void afterPropertiesSet() throws Exception {
listener();//调用需要被初始化的方法
}
}
方式二:
使用注解@PostContruct 指定需要被初始化执行的方法
package net.greatsoft.xxx.utils;
import xxx.xxx.component.BaseServiceMqConsumer;
import net.greatsoft.xxx.service.VideoConsumerService;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
import java.util.List;
@DependsOn("RocketMqConfig")
@Component
public class RocketMqConsumerUtil {
private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class);
@Autowired
private VideoConsumerService videoConsumerService;
/**
接收消息8
/
@PostConstruct
public void listener(){
// 获取消息生产者
DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer();
// 订阅主体
try {
consumer.subscribe(RocketMqUtil.topic, "");
consumer.registerMessageListener(new MessageListenerConcurrently() {
/
默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
* /
public ConsumeConcurrentlyStatus consumeMessage(
List[/span>MessageExt
MessageExt messageExt = msgs.get(0);
String msg = null;
try {
msg = new String(messageExt.getBody(),"utf-8");
} catch (UnsupportedEncodingException e) {
log.error("消息编码失败,MsgBody:{}",new String(messageExt.getBody()));
e.printStackTrace();
}
log.info("消费开始-MsgBody:{}",msg);
if (messageExt.getTopic().equals(RocketMqUtil.topic)) {
// topic的消费逻辑
if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) {
// 根据Tag消费消息,具体消费消息的业务方法
videoConsumerService.dealVideoMsg(msg);
}
} else if (messageExt.getTopic().equals("TopicTest2")) {
// 执行TopicTest2的消费逻辑
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/
Consumer对象在使用之前必须要调用start初始化,初始化一次即可[/span>br
/
consumer.start();
log.info("rocketmq-consumer 启动成功---");
} catch (MQClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
方式三:
在spring的xml配置文件中使用 的init 属性来执行初始化的Bean
[/span>bean id="rocketMqConsumerUtil" class="xx.xxx.utils.RocketMqConsumerUtil"
scope="singleton" init-method="listener"/>
package net.greatsoft.jinNanHealth.utils;
import net.greatsoft.jinNanHealth.component.BaseServiceMqConsumer;
import net.greatsoft.jinNanHealth.service.VideoConsumerService;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
import java.util.List;
/
@author xc
@date 2020-07-23
* /
@DependsOn("RocketMqUtil")
@Component
public class RocketMqConsumerUtil {
private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class);
@Autowired
private VideoConsumerService videoConsumerService;
/
接收消息8
/
public void listener(){
// 获取消息生产者
DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer();
// 订阅主体
try {
consumer.subscribe(RocketMqUtil.topic, "");
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
/
public ConsumeConcurrentlyStatus consumeMessage(
List[/span>MessageExt
MessageExt messageExt = msgs.get(0);
String msg = null;
try {
msg = new String(messageExt.getBody(),"utf-8");
} catch (UnsupportedEncodingException e) {
log.error("消息编码失败,MsgBody:{}",new String(messageExt.getBody()));
e.printStackTrace();
}
log.info("消费开始-MsgBody:{}",msg);
if (messageExt.getTopic().equals(RocketMqUtil.topic)) {
// topic的消费逻辑
if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) {
// 根据Tag消费消息,具体消费消息的业务方法
videoConsumerService.dealVideoMsg(msg);
}
} else if (messageExt.getTopic().equals("TopicTest2")) {
// 执行TopicTest2的消费逻辑
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/*
Consumer对象在使用之前必须要调用start初始化,初始化一次即可[/span>br
* /
consumer.start();
log.info("rocketmq-consumer 启动成功---");
} catch (MQClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}