经验大分享:spring项目在启动的时候执行方法初始化

简介: 经验大分享:spring项目在启动的时候执行方法初始化

说明:老项目,使用的是spring 3项目,需要对接RocketMQ,配置完之后,在消费者监听方法中,发现业务处理service注入不进来,最后检查发现是因为消费者监听工具类没有被正确的初始化,所以它里边的业务service注入之后是个null,于是各种折腾,特此记录一下

方式一:

解决:对需要初始化的类实现InitializingBean接口,重写afterPropertiesSet()方法,在afterPropertiesSet方法中调用需要被初始化的方法

代码如下:

import xx.xxx.component.BaseServiceMqConsumer;

import xx.xxx.service.VideoConsumerService;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import //代码效果参考:http://www.zidongmutanji.com/zsjx/546609.html

org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.common.message.MessageExt;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.DependsOn;

import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

import java.util.List;

@DependsOn("RocketMqConfig")

@Component

public class RocketMqConsumerUtil implements InitializingBean //代码效果参考:http://www.zidongmutanji.com/zsjx/304231.html

{

private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class);

@Autowired

private VideoConsumerService videoConsumerService;

/

接收消息

/

public void listener(){

// 获取消息生产者

DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer();

// 订阅主体

try {

consumer.subscribe(RocketMqUtil.topic, "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

/

默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息

/

public ConsumeConcurrentlyStatus consumeMessage(

List[/span>MessageExt

MessageExt messageExt = msgs.get(0);

String msg = null;

try {

msg = new String(messageExt.getBody(),"utf-8");

} catch (UnsupportedEncodingException e) {

log.error("消息编码失败,MsgBody:{}",new String(messageExt.getBody()));

e.printStackTrace();

}

log.info("消费开始-MsgBody:{}",msg);

// String msg = new String(messageExt.getBody());

// log.info("MsgBody:{}",new String(messageExt.getBody()));

if (messageExt.getTopic().equals(RocketMqUtil.topic)) {

// topic的消费逻辑

if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) {

// 根据Tag消费消息,具体消费消息的业务方法

videoConsumerService.dealVideoMsg(msg);

}

} else if (messageExt.getTopic().equals("TopicTest2")) {

// 执行TopicTest2的消费逻辑

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

/**

Consumer对象在使用之前必须要调用start初始化,初始化一次即可[/span>br

/

consumer.start();

log.info("rocketmq-consumer 启动成功---");

} catch (MQClientException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

@Override

public void afterPropertiesSet() throws Exception {

listener();//调用需要被初始化的方法

}

}

方式二:

使用注解@PostContruct 指定需要被初始化执行的方法

package net.greatsoft.xxx.utils;

import xxx.xxx.component.BaseServiceMqConsumer;

import net.greatsoft.xxx.service.VideoConsumerService;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.common.message.MessageExt;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.DependsOn;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

import java.io.UnsupportedEncodingException;

import java.util.List;

@DependsOn("RocketMqConfig")

@Component

public class RocketMqConsumerUtil {

private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class);

@Autowired

private VideoConsumerService videoConsumerService;

/**

接收消息8

/

@PostConstruct

public void listener(){

// 获取消息生产者

DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer();

// 订阅主体

try {

consumer.subscribe(RocketMqUtil.topic, "");

consumer.registerMessageListener(new MessageListenerConcurrently() {

/

默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息

* /

public ConsumeConcurrentlyStatus consumeMessage(

List[/span>MessageExt

MessageExt messageExt = msgs.get(0);

String msg = null;

try {

msg = new String(messageExt.getBody(),"utf-8");

} catch (UnsupportedEncodingException e) {

log.error("消息编码失败,MsgBody:{}",new String(messageExt.getBody()));

e.printStackTrace();

}

log.info("消费开始-MsgBody:{}",msg);

if (messageExt.getTopic().equals(RocketMqUtil.topic)) {

// topic的消费逻辑

if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) {

// 根据Tag消费消息,具体消费消息的业务方法

videoConsumerService.dealVideoMsg(msg);

}

} else if (messageExt.getTopic().equals("TopicTest2")) {

// 执行TopicTest2的消费逻辑

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

/

Consumer对象在使用之前必须要调用start初始化,初始化一次即可[/span>br

/

consumer.start();

log.info("rocketmq-consumer 启动成功---");

} catch (MQClientException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

方式三:

在spring的xml配置文件中使用 的init 属性来执行初始化的Bean

[/span>bean id="rocketMqConsumerUtil" class="xx.xxx.utils.RocketMqConsumerUtil"

scope="singleton" init-method="listener"/>

package net.greatsoft.jinNanHealth.utils;

import net.greatsoft.jinNanHealth.component.BaseServiceMqConsumer;

import net.greatsoft.jinNanHealth.service.VideoConsumerService;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.common.message.MessageExt;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.DependsOn;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

import java.io.UnsupportedEncodingException;

import java.util.List;

/

@author xc

@date 2020-07-23

* /

@DependsOn("RocketMqUtil")

@Component

public class RocketMqConsumerUtil {

private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class);

@Autowired

private VideoConsumerService videoConsumerService;

/

接收消息8

/

public void listener(){

// 获取消息生产者

DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer();

// 订阅主体

try {

consumer.subscribe(RocketMqUtil.topic, "");

consumer.registerMessageListener(new MessageListenerConcurrently() {

/**

默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息

/

public ConsumeConcurrentlyStatus consumeMessage(

List[/span>MessageExt

MessageExt messageExt = msgs.get(0);

String msg = null;

try {

msg = new String(messageExt.getBody(),"utf-8");

} catch (UnsupportedEncodingException e) {

log.error("消息编码失败,MsgBody:{}",new String(messageExt.getBody()));

e.printStackTrace();

}

            log.info("消费开始-MsgBody:{}",msg);

if (messageExt.getTopic().equals(RocketMqUtil.topic)) {

// topic的消费逻辑

if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) {

// 根据Tag消费消息,具体消费消息的业务方法

videoConsumerService.dealVideoMsg(msg);

}

} else if (messageExt.getTopic().equals("TopicTest2")) {

// 执行TopicTest2的消费逻辑

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

/*

Consumer对象在使用之前必须要调用start初始化,初始化一次即可[/span>br

* /

consumer.start();

log.info("rocketmq-consumer 启动成功---");

} catch (MQClientException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1天前
|
IDE Java 机器人
Spring Boot中的多模块项目构建
Spring Boot中的多模块项目构建
|
1天前
|
消息中间件 Java 机器人
Spring Boot与NATS消息系统的集成方法
Spring Boot与NATS消息系统的集成方法
|
1天前
|
Java Spring 容器
spring如何进行依赖注入,通过set方法把Dao注入到serves
spring如何进行依赖注入,通过set方法把Dao注入到serves
|
1天前
|
XML Java API
经验大分享:Spring实现AOP的三种方式
经验大分享:Spring实现AOP的三种方式
|
1天前
|
XML Java API
经验大分享:Spring实现AOP的三种方式
经验大分享:Spring实现AOP的三种方式
|
1天前
|
Java API 容器
spring8-getBean()方法使用
spring8-getBean()方法使用
|
2天前
|
XML Java 数据格式
经验大分享:Spring基础篇——Spring容器和应用上下文理解
经验大分享:Spring基础篇——Spring容器和应用上下文理解
|
2天前
|
XML 安全 Java
经验大分享:Spring之旅(2)
经验大分享:Spring之旅(2)
|
18小时前
|
Java
SpringBoot之内部配置加载顺序和外部配置加载顺序
SpringBoot之内部配置加载顺序和外部配置加载顺序
|
1天前
|
NoSQL 搜索推荐 Java
使用Spring Boot实现与Neo4j图数据库的集成
使用Spring Boot实现与Neo4j图数据库的集成