listener开发过程是独立的,你也可以不开发,使用@KafkaListener注解来监听kafka的消息,我的方式是实现一个唯一方法的接口,然后在该方法里面进行消费,无需关心kafka的具体实现,只需要添加一个topics到配置值文件即可.
项目git地址:
git@github.com:wudonghua/Java-Kafka-SpringBoot-API.git
接口:
import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* @Author dw07-Riven770[wudonghua@gznb.com]
* @Date 2017/12/1414:52
*/
public interface IKafkaListener {
void listener(ConsumerRecord<?, ?> record);
}
为实现IKafkaListener接口方注解.其中的TOPICS属性在ConsumerConfiguration中添加,即 private List topics;属性.
import javassist.CannotCompileException;
import javassist.ClassPool;
import javassist.CtClass;
import javassist.NotFoundException;
import javassist.bytecode.AnnotationsAttribute;
import javassist.bytecode.ConstPool;
import javassist.bytecode.annotation.Annotation;
import javassist.bytecode.annotation.ArrayMemberValue;
import javassist.bytecode.annotation.MemberValue;
import javassist.bytecode.annotation.StringMemberValue;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.kafka.annotation.KafkaListener;
import riven.kafka.api.configuration.ConsumerConfiguration;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
/**
* @Author dw07-Riven770[wudonghua@gznb.com]
* @Date 2017/12/1415:18
* 为实现了IKafkaListener接口方法listener追加注解
*/
public class KafkaListenerInitConfig implements BeanPostProcessor, ApplicationContextAware {
private ApplicationContext applicationContext;
private static final String TOPICS = "topics";
private static final String LISTENER = "listener";
@Autowired
private ConsumerConfiguration consumerConfiguration;
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) {
try {
if (bean instanceof IKafkaListener)
return injectBean(setAnnotation(bean));
//设置参数注解:
return bean;
} catch (NotFoundException | CannotCompileException | IllegalAccessException | InstantiationException e) {
e.printStackTrace();
}
return bean;
}
/**
* 添加Autowired注入的内容
*
* @param iKafkaListener
* @return
* @throws IllegalAccessException
*/
private IKafkaListener injectBean(IKafkaListener iKafkaListener) throws IllegalAccessException {
Field[] fields = iKafkaListener.getClass().getFields();
Field[] declaredFields = iKafkaListener.getClass().getDeclaredFields();
Field[] allFiles = ArrayUtils.addAll(fields, declaredFields);
//判断是否有@Autowired注解
for (Field field : allFiles) {
if (field.getAnnotation(Autowired.class) == null)
continue;
field.setAccessible(true);
field.set(iKafkaListener, this.applicationContext.getBean(field.getName()));
}
return iKafkaListener;
}
/**
* 添加注解及其属性
*
* @param bean
* @return
* @throws NotFoundException
* @throws CannotCompileException
* @throws IllegalAccessException
* @throws InstantiationException
*/
private IKafkaListener setAnnotation(Object bean) throws NotFoundException, CannotCompileException, IllegalAccessException, InstantiationException {
ClassPool classPool = ClassPool.getDefault();
//获取当前Bean的常量池
CtClass ctClass = classPool.getCtClass(bean.getClass().getName());
ConstPool constPool = ctClass.getClassFile().getConstPool();
//获取对应的注解内容
List<StringMemberValue> list = new ArrayList<>(consumerConfiguration.getTopics().size());
//创建注解属性
consumerConfiguration.getTopics().forEach(topic -> list.add(new StringMemberValue(topic, constPool)));
MemberValue[] memberValues = new MemberValue[list.size()];
ArrayMemberValue arrayMemberValue = new ArrayMemberValue(constPool);
arrayMemberValue.setValue(list.toArray(memberValues));
//创建注解
Annotation topics = new Annotation(KafkaListener.class.getName(), constPool);
//为注解属性赋值
topics.addMemberValue(TOPICS, arrayMemberValue);
//创建注解容器
AnnotationsAttribute annotationsAttribute = new AnnotationsAttribute(constPool, AnnotationsAttribute.visibleTag);
annotationsAttribute.setAnnotation(topics);
//把注解放到目标方法
ctClass.getDeclaredMethod(LISTENER).getMethodInfo().addAttribute(annotationsAttribute);
//生成一个全新的对象
Class aClass = ctClass.toClass(new ClassLoader() {
@Override
public Class<?> loadClass(String name) throws ClassNotFoundException {
return super.loadClass(name);
}
}, null);
return (IKafkaListener) aClass.newInstance();
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
/**
* 获取SpringIOC
*
* @param applicationContext
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
把KafkaConsumerListener注册到SpringIOC之中:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author dw07-Riven770[wudonghua@gznb.com]
* @Date 2017/12/1416:04
* 把KafkaConsumerListener注册到SpringIOC之中
*/
@Configuration
@ConditionalOnProperty(name = {"Riven.kafka.consumer.bootstrapServers","Riven.kafka.consumer.groupId"})
public class CreateKafkaListener {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Bean
public KafkaListenerInitConfig init() {
return new KafkaListenerInitConfig();
}
}
同样的,在配置文件根目录下创建Spring监听器:
spring.factories文件
并添加需要Spring监听初始化的类路径(多个使用,逗号隔开):
org.springframework.boot.autoconfigure.EnableAutoConfiguration=riven.kafka.api.producer.ProducerInitialize,riven.kafka.api.consumer.ConsumerInitialize,riven.kafka.api.listener.CreateKafkaListener