经验大分享:spring项目在启动的时候执行方法初始化

简介: 经验大分享:spring项目在启动的时候执行方法初始化

说明:老项目,使用的是spring 3项目,需要对接RocketMQ,配置完之后,在消费者监听方法中,发现业务处理service注入不进来,最后检查发现是因为消费者监听工具类没有被正确的初始化,所以它里边的业务service注入之后是个null,于是各种折腾,特此记录一下

方式一:

解决:对需要初始化的类实现InitializingBean接口,重写afterPropertiesSet()方法,在afterPropertiesSet方法中调用需要被初始化的方法

代码如下:

import xx.xxx.component.BaseServiceMqConsumer;

import xx.xxx.service.VideoConsumerService;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import //代码效果参考:http://www.zidongmutanji.com/zsjx/546609.html

org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.common.message.MessageExt;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.DependsOn;

import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

import java.util.List;

@DependsOn("RocketMqConfig")

@Component

public class RocketMqConsumerUtil implements InitializingBean //代码效果参考:http://www.zidongmutanji.com/zsjx/304231.html

{

private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class);

@Autowired

private VideoConsumerService videoConsumerService;

/

接收消息

/

public void listener(){

// 获取消息生产者

DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer();

// 订阅主体

try {

consumer.subscribe(RocketMqUtil.topic, "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

/

默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息

/

public ConsumeConcurrentlyStatus consumeMessage(

List[/span>MessageExt

MessageExt messageExt = msgs.get(0);

String msg = null;

try {

msg = new String(messageExt.getBody(),"utf-8");

} catch (UnsupportedEncodingException e) {

log.error("消息编码失败,MsgBody:{}",new String(messageExt.getBody()));

e.printStackTrace();

}

log.info("消费开始-MsgBody:{}",msg);

// String msg = new String(messageExt.getBody());

// log.info("MsgBody:{}",new String(messageExt.getBody()));

if (messageExt.getTopic().equals(RocketMqUtil.topic)) {

// topic的消费逻辑

if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) {

// 根据Tag消费消息,具体消费消息的业务方法

videoConsumerService.dealVideoMsg(msg);

}

} else if (messageExt.getTopic().equals("TopicTest2")) {

// 执行TopicTest2的消费逻辑

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

/**

Consumer对象在使用之前必须要调用start初始化,初始化一次即可[/span>br

/

consumer.start();

log.info("rocketmq-consumer 启动成功---");

} catch (MQClientException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

@Override

public void afterPropertiesSet() throws Exception {

listener();//调用需要被初始化的方法

}

}

方式二:

使用注解@PostContruct 指定需要被初始化执行的方法

package net.greatsoft.xxx.utils;

import xxx.xxx.component.BaseServiceMqConsumer;

import net.greatsoft.xxx.service.VideoConsumerService;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.common.message.MessageExt;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.DependsOn;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

import java.io.UnsupportedEncodingException;

import java.util.List;

@DependsOn("RocketMqConfig")

@Component

public class RocketMqConsumerUtil {

private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class);

@Autowired

private VideoConsumerService videoConsumerService;

/**

接收消息8

/

@PostConstruct

public void listener(){

// 获取消息生产者

DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer();

// 订阅主体

try {

consumer.subscribe(RocketMqUtil.topic, "");

consumer.registerMessageListener(new MessageListenerConcurrently() {

/

默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息

* /

public ConsumeConcurrentlyStatus consumeMessage(

List[/span>MessageExt

MessageExt messageExt = msgs.get(0);

String msg = null;

try {

msg = new String(messageExt.getBody(),"utf-8");

} catch (UnsupportedEncodingException e) {

log.error("消息编码失败,MsgBody:{}",new String(messageExt.getBody()));

e.printStackTrace();

}

log.info("消费开始-MsgBody:{}",msg);

if (messageExt.getTopic().equals(RocketMqUtil.topic)) {

// topic的消费逻辑

if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) {

// 根据Tag消费消息,具体消费消息的业务方法

videoConsumerService.dealVideoMsg(msg);

}

} else if (messageExt.getTopic().equals("TopicTest2")) {

// 执行TopicTest2的消费逻辑

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

/

Consumer对象在使用之前必须要调用start初始化,初始化一次即可[/span>br

/

consumer.start();

log.info("rocketmq-consumer 启动成功---");

} catch (MQClientException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

方式三:

在spring的xml配置文件中使用 的init 属性来执行初始化的Bean

[/span>bean id="rocketMqConsumerUtil" class="xx.xxx.utils.RocketMqConsumerUtil"

scope="singleton" init-method="listener"/>

package net.greatsoft.jinNanHealth.utils;

import net.greatsoft.jinNanHealth.component.BaseServiceMqConsumer;

import net.greatsoft.jinNanHealth.service.VideoConsumerService;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.common.message.MessageExt;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.DependsOn;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

import java.io.UnsupportedEncodingException;

import java.util.List;

/

@author xc

@date 2020-07-23

* /

@DependsOn("RocketMqUtil")

@Component

public class RocketMqConsumerUtil {

private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class);

@Autowired

private VideoConsumerService videoConsumerService;

/

接收消息8

/

public void listener(){

// 获取消息生产者

DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer();

// 订阅主体

try {

consumer.subscribe(RocketMqUtil.topic, "");

consumer.registerMessageListener(new MessageListenerConcurrently() {

/**

默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息

/

public ConsumeConcurrentlyStatus consumeMessage(

List[/span>MessageExt

MessageExt messageExt = msgs.get(0);

String msg = null;

try {

msg = new String(messageExt.getBody(),"utf-8");

} catch (UnsupportedEncodingException e) {

log.error("消息编码失败,MsgBody:{}",new String(messageExt.getBody()));

e.printStackTrace();

}

            log.info("消费开始-MsgBody:{}",msg);

if (messageExt.getTopic().equals(RocketMqUtil.topic)) {

// topic的消费逻辑

if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) {

// 根据Tag消费消息,具体消费消息的业务方法

videoConsumerService.dealVideoMsg(msg);

}

} else if (messageExt.getTopic().equals("TopicTest2")) {

// 执行TopicTest2的消费逻辑

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

/*

Consumer对象在使用之前必须要调用start初始化,初始化一次即可[/span>br

* /

consumer.start();

log.info("rocketmq-consumer 启动成功---");

} catch (MQClientException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2天前
|
Java 数据库连接 API
Spring事务管理嵌套事务详解 : 同一个类中,一个方法调用另外一个有事务的方法
Spring事务管理嵌套事务详解 : 同一个类中,一个方法调用另外一个有事务的方法
|
6天前
|
SQL XML Java
解决Spring Boot项目中的数据库迁移问题
解决Spring Boot项目中的数据库迁移问题
|
7天前
|
负载均衡 Java 开发者
如何在Spring Boot项目中实现微服务架构?
如何在Spring Boot项目中实现微服务架构?
|
12天前
|
消息中间件 负载均衡 Java
最容易学会的springboot gralde spring cloud 多模块微服务项目
最容易学会的springboot gralde spring cloud 多模块微服务项目
|
7天前
|
SQL XML Java
解决Spring Boot项目中的数据库迁移问题
解决Spring Boot项目中的数据库迁移问题
|
7天前
|
Java BI Spring
在Spring Boot项目中集成异步任务处理
在Spring Boot项目中集成异步任务处理
|
7天前
|
Java 测试技术 数据库
在Spring Boot项目中集成单元测试的策略
在Spring Boot项目中集成单元测试的策略
|
4天前
|
Java 应用服务中间件 开发者
Java面试题:解释Spring Boot的优势及其自动配置原理
Java面试题:解释Spring Boot的优势及其自动配置原理
28 0
|
13天前
|
Java 开发者 Spring
深入理解Spring Boot中的自动配置原理
深入理解Spring Boot中的自动配置原理
|
14天前
|
前端开发 Java 微服务
Spring Boot与微前端架构的集成开发
Spring Boot与微前端架构的集成开发