kafka-Java-SpringBoot-listener API开发-阿里云开发者社区

开发者社区> ritit> 正文

kafka-Java-SpringBoot-listener API开发

简介: listener开发过程是独立的,你也可以不开发,使用@KafkaListener注解来监听kafka的消息,我的方式是实现一个唯一方法的接口,然后在该方法里面进行消费,无需关心kafka的具体实现,只需要添加一个topics到配置值文件即可.
+关注继续查看

listener开发过程是独立的,你也可以不开发,使用@KafkaListener注解来监听kafka的消息,我的方式是实现一个唯一方法的接口,然后在该方法里面进行消费,无需关心kafka的具体实现,只需要添加一个topics到配置值文件即可.
项目git地址:

git@github.com:wudonghua/Java-Kafka-SpringBoot-API.git

接口:

import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1414:52
 */
public interface IKafkaListener {
    void listener(ConsumerRecord<?, ?> record);
}

为实现IKafkaListener接口方注解.其中的TOPICS属性在ConsumerConfiguration中添加,即 private List topics;属性.

import javassist.CannotCompileException;
import javassist.ClassPool;
import javassist.CtClass;
import javassist.NotFoundException;
import javassist.bytecode.AnnotationsAttribute;
import javassist.bytecode.ConstPool;
import javassist.bytecode.annotation.Annotation;
import javassist.bytecode.annotation.ArrayMemberValue;
import javassist.bytecode.annotation.MemberValue;
import javassist.bytecode.annotation.StringMemberValue;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.kafka.annotation.KafkaListener;
import riven.kafka.api.configuration.ConsumerConfiguration;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1415:18
 * 为实现了IKafkaListener接口方法listener追加注解
 */
public class KafkaListenerInitConfig implements BeanPostProcessor, ApplicationContextAware {

    private ApplicationContext applicationContext;
    private static final String TOPICS = "topics";
    private static final String LISTENER = "listener";
    @Autowired
    private ConsumerConfiguration consumerConfiguration;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) {
        try {
            if (bean instanceof IKafkaListener)
                return injectBean(setAnnotation(bean));
            //设置参数注解:
            return bean;
        } catch (NotFoundException | CannotCompileException | IllegalAccessException | InstantiationException e) {
            e.printStackTrace();
        }
        return bean;
    }

    /**
     * 添加Autowired注入的内容
     *
     * @param iKafkaListener
     * @return
     * @throws IllegalAccessException
     */
    private IKafkaListener injectBean(IKafkaListener iKafkaListener) throws IllegalAccessException {
        Field[] fields = iKafkaListener.getClass().getFields();
        Field[] declaredFields = iKafkaListener.getClass().getDeclaredFields();
        Field[] allFiles = ArrayUtils.addAll(fields, declaredFields);
        //判断是否有@Autowired注解
        for (Field field : allFiles) {
            if (field.getAnnotation(Autowired.class) == null)
                continue;
            field.setAccessible(true);
            field.set(iKafkaListener, this.applicationContext.getBean(field.getName()));
        }
        return iKafkaListener;
    }

    /**
     * 添加注解及其属性
     *
     * @param bean
     * @return
     * @throws NotFoundException
     * @throws CannotCompileException
     * @throws IllegalAccessException
     * @throws InstantiationException
     */
    private IKafkaListener setAnnotation(Object bean) throws NotFoundException, CannotCompileException, IllegalAccessException, InstantiationException {
        ClassPool classPool = ClassPool.getDefault();
        //获取当前Bean的常量池
        CtClass ctClass = classPool.getCtClass(bean.getClass().getName());
        ConstPool constPool = ctClass.getClassFile().getConstPool();
        //获取对应的注解内容
        List<StringMemberValue> list = new ArrayList<>(consumerConfiguration.getTopics().size());
        //创建注解属性
        consumerConfiguration.getTopics().forEach(topic -> list.add(new StringMemberValue(topic, constPool)));
        MemberValue[] memberValues = new MemberValue[list.size()];
        ArrayMemberValue arrayMemberValue = new ArrayMemberValue(constPool);
        arrayMemberValue.setValue(list.toArray(memberValues));
        //创建注解
        Annotation topics = new Annotation(KafkaListener.class.getName(), constPool);
        //为注解属性赋值
        topics.addMemberValue(TOPICS, arrayMemberValue);
        //创建注解容器
        AnnotationsAttribute annotationsAttribute = new AnnotationsAttribute(constPool, AnnotationsAttribute.visibleTag);
        annotationsAttribute.setAnnotation(topics);
        //把注解放到目标方法
        ctClass.getDeclaredMethod(LISTENER).getMethodInfo().addAttribute(annotationsAttribute);

        //生成一个全新的对象
        Class aClass = ctClass.toClass(new ClassLoader() {
            @Override
            public Class<?> loadClass(String name) throws ClassNotFoundException {
                return super.loadClass(name);
            }
        }, null);
        return (IKafkaListener) aClass.newInstance();
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    /**
     * 获取SpringIOC
     *
     * @param applicationContext
     * @throws BeansException
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

把KafkaConsumerListener注册到SpringIOC之中:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @Author dw07-Riven770[wudonghua@gznb.com]
* @Date 2017/12/1416:04
* 把KafkaConsumerListener注册到SpringIOC之中
*/
@Configuration
@ConditionalOnProperty(name = {"Riven.kafka.consumer.bootstrapServers","Riven.kafka.consumer.groupId"})
public class CreateKafkaListener {
   private Logger logger = LoggerFactory.getLogger(this.getClass());

   @Bean
   public KafkaListenerInitConfig init() {
       return new KafkaListenerInitConfig();
   }
}

同样的,在配置文件根目录下创建Spring监听器:
spring.factories文件
并添加需要Spring监听初始化的类路径(多个使用,逗号隔开):

org.springframework.boot.autoconfigure.EnableAutoConfiguration=riven.kafka.api.producer.ProducerInitialize,riven.kafka.api.consumer.ConsumerInitialize,riven.kafka.api.listener.CreateKafkaListener

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
OAF_开发系列16_实现OAF与XML Publisher整合
http://wenku.baidu.com/link?url=y2SFKHP5qqn4bl_iNeqLGjXsTvhyFuhkMraIbWZdTXbzcv0vTefrZFFBDWie0cAAKuTwgwhrMAVvTjAo_f1mH1CdAPhhl_JCLeb3BuVTYaSERP技术讨论群: 2...
530 0
OAF_开发系列15_实现OAF组件重用和继承(案例)
20150717 Created By BaoXinjian 一、摘要 组件的重用和继承 如果你想包含共享对象到你的页面中,你可以简单继承它们。 比如,在OAFToolBox Sample Library/Tutorial中,我们创建了一个通用区域(名为OrganizationRN),因此同样的内容不需要编码就可以包含在多个页面中。
719 0
OAF_开发系列14_实现OAF代码动态新增控件
dERP技术讨论群: 288307890 技术交流,技术讨论,欢迎加入 Technology Blog Created By Oracle ERP - 鲍新建
664 0
你的开发利器Spring自定义注解
自定义注解在开发中是一把利器,经常会被使用到。但是对于自定义注解,只是停留在表面的使用,没有做到知其然,而知其所以然。所以这篇文章就是来了解自定义注解这把开发利器的
1932 0
PHP开发API接口注意事项
1、单文件实现多接口的形式有很多种,例如:if..elseif.. 或 switch 或 动态方法 (也就是TP的这种访问函数体的形式) 2、对于数据的输出最好用json,json具有相当强大的跨平台性,市场上各大主流编程语言都支持json解析,json正在逐步取代xml,成为网络数据的通用格式 阿里云代金券1000元免费领取地址:https://promotion.
1172 0
OAF_开发系列07_实现OAF下拉菜单的上下联动Poplist Synchor(案例)
20150706 Created By BaoXinjian 一、摘要 下拉列表的级联显示是非常常用的一种界面显示效果,在FORMS中我常作,作法也很简单,可OAF中显然有点麻烦了 现假定有张表,里面有两个字段,一个是Province(省),一个是City(市) 现需要在页面上放置两个下拉列表字段,一个选择省,一个选择市,当然,选择市的下拉列表值需要根据省的下拉列表的选择进行筛选。
1438 0
OAF_开发系列03_实现OAF如何在保存前判断数据是否存在变更(案例)
2014-06-26 Created By BaoXinjian 一、摘要 在OAF的开发中,可能有这样的需求,在选择保存按钮时,如果存在改动的数据,则提交事务,保存到数据库中; 如果不存在改动的数据,就提示用户当前没有数据可更改; 解决时需要判断页面中所使用的视图对象是否发生过改动,存在多种方法   1. 调用OADBTransaction.isDirty()方法 此方法用于判断当前事务中,视图对象是否发生过变更。
932 0
安卓开发ScrollView嵌套ListView只显示一行
在用列表控件做一个“更多功能”的界面的时候 1 2 7 8 12 13 17 18 23 27 28 33 37 38 39 原本用ScrollView嵌套...
941 0
+关注
ritit
活到老学到老
52
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载