kafka-Java-SpringBoot-listener API开发

简介: listener开发过程是独立的,你也可以不开发,使用@KafkaListener注解来监听kafka的消息,我的方式是实现一个唯一方法的接口,然后在该方法里面进行消费,无需关心kafka的具体实现,只需要添加一个topics到配置值文件即可.

listener开发过程是独立的,你也可以不开发,使用@KafkaListener注解来监听kafka的消息,我的方式是实现一个唯一方法的接口,然后在该方法里面进行消费,无需关心kafka的具体实现,只需要添加一个topics到配置值文件即可.
项目git地址:

git@github.com:wudonghua/Java-Kafka-SpringBoot-API.git

接口:

import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1414:52
 */
public interface IKafkaListener {
    void listener(ConsumerRecord<?, ?> record);
}

为实现IKafkaListener接口方注解.其中的TOPICS属性在ConsumerConfiguration中添加,即 private List topics;属性.

import javassist.CannotCompileException;
import javassist.ClassPool;
import javassist.CtClass;
import javassist.NotFoundException;
import javassist.bytecode.AnnotationsAttribute;
import javassist.bytecode.ConstPool;
import javassist.bytecode.annotation.Annotation;
import javassist.bytecode.annotation.ArrayMemberValue;
import javassist.bytecode.annotation.MemberValue;
import javassist.bytecode.annotation.StringMemberValue;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.kafka.annotation.KafkaListener;
import riven.kafka.api.configuration.ConsumerConfiguration;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1415:18
 * 为实现了IKafkaListener接口方法listener追加注解
 */
public class KafkaListenerInitConfig implements BeanPostProcessor, ApplicationContextAware {

    private ApplicationContext applicationContext;
    private static final String TOPICS = "topics";
    private static final String LISTENER = "listener";
    @Autowired
    private ConsumerConfiguration consumerConfiguration;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) {
        try {
            if (bean instanceof IKafkaListener)
                return injectBean(setAnnotation(bean));
            //设置参数注解:
            return bean;
        } catch (NotFoundException | CannotCompileException | IllegalAccessException | InstantiationException e) {
            e.printStackTrace();
        }
        return bean;
    }

    /**
     * 添加Autowired注入的内容
     *
     * @param iKafkaListener
     * @return
     * @throws IllegalAccessException
     */
    private IKafkaListener injectBean(IKafkaListener iKafkaListener) throws IllegalAccessException {
        Field[] fields = iKafkaListener.getClass().getFields();
        Field[] declaredFields = iKafkaListener.getClass().getDeclaredFields();
        Field[] allFiles = ArrayUtils.addAll(fields, declaredFields);
        //判断是否有@Autowired注解
        for (Field field : allFiles) {
            if (field.getAnnotation(Autowired.class) == null)
                continue;
            field.setAccessible(true);
            field.set(iKafkaListener, this.applicationContext.getBean(field.getName()));
        }
        return iKafkaListener;
    }

    /**
     * 添加注解及其属性
     *
     * @param bean
     * @return
     * @throws NotFoundException
     * @throws CannotCompileException
     * @throws IllegalAccessException
     * @throws InstantiationException
     */
    private IKafkaListener setAnnotation(Object bean) throws NotFoundException, CannotCompileException, IllegalAccessException, InstantiationException {
        ClassPool classPool = ClassPool.getDefault();
        //获取当前Bean的常量池
        CtClass ctClass = classPool.getCtClass(bean.getClass().getName());
        ConstPool constPool = ctClass.getClassFile().getConstPool();
        //获取对应的注解内容
        List<StringMemberValue> list = new ArrayList<>(consumerConfiguration.getTopics().size());
        //创建注解属性
        consumerConfiguration.getTopics().forEach(topic -> list.add(new StringMemberValue(topic, constPool)));
        MemberValue[] memberValues = new MemberValue[list.size()];
        ArrayMemberValue arrayMemberValue = new ArrayMemberValue(constPool);
        arrayMemberValue.setValue(list.toArray(memberValues));
        //创建注解
        Annotation topics = new Annotation(KafkaListener.class.getName(), constPool);
        //为注解属性赋值
        topics.addMemberValue(TOPICS, arrayMemberValue);
        //创建注解容器
        AnnotationsAttribute annotationsAttribute = new AnnotationsAttribute(constPool, AnnotationsAttribute.visibleTag);
        annotationsAttribute.setAnnotation(topics);
        //把注解放到目标方法
        ctClass.getDeclaredMethod(LISTENER).getMethodInfo().addAttribute(annotationsAttribute);

        //生成一个全新的对象
        Class aClass = ctClass.toClass(new ClassLoader() {
            @Override
            public Class<?> loadClass(String name) throws ClassNotFoundException {
                return super.loadClass(name);
            }
        }, null);
        return (IKafkaListener) aClass.newInstance();
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    /**
     * 获取SpringIOC
     *
     * @param applicationContext
     * @throws BeansException
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

把KafkaConsumerListener注册到SpringIOC之中:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @Author dw07-Riven770[wudonghua@gznb.com]
* @Date 2017/12/1416:04
* 把KafkaConsumerListener注册到SpringIOC之中
*/
@Configuration
@ConditionalOnProperty(name = {"Riven.kafka.consumer.bootstrapServers","Riven.kafka.consumer.groupId"})
public class CreateKafkaListener {
   private Logger logger = LoggerFactory.getLogger(this.getClass());

   @Bean
   public KafkaListenerInitConfig init() {
       return new KafkaListenerInitConfig();
   }
}

同样的,在配置文件根目录下创建Spring监听器:
spring.factories文件
并添加需要Spring监听初始化的类路径(多个使用,逗号隔开):

org.springframework.boot.autoconfigure.EnableAutoConfiguration=riven.kafka.api.producer.ProducerInitialize,riven.kafka.api.consumer.ConsumerInitialize,riven.kafka.api.listener.CreateKafkaListener
目录
相关文章
|
1月前
|
缓存 监控 前端开发
顺企网 API 开发实战:搜索 / 详情接口从 0 到 1 落地(附 Elasticsearch 优化 + 错误速查)
企业API开发常陷参数、缓存、错误处理三大坑?本指南拆解顺企网双接口全流程,涵盖搜索优化、签名验证、限流应对,附可复用代码与错误速查表,助你2小时高效搞定开发,提升响应速度与稳定性。
|
2月前
|
数据可视化 测试技术 API
从接口性能到稳定性:这些API调试工具,让你的开发过程事半功倍
在软件开发中,接口调试与测试对接口性能、稳定性、准确性及团队协作至关重要。随着开发节奏加快,传统方式已难满足需求,专业API工具成为首选。本文介绍了Apifox、Postman、YApi、SoapUI、JMeter、Swagger等主流工具,对比其功能与适用场景,并推荐Apifox作为集成度高、支持中文、可视化强的一体化解决方案,助力提升API开发与测试效率。
|
1月前
|
安全 前端开发 Java
《深入理解Spring》:现代Java开发的核心框架
Spring自2003年诞生以来,已成为Java企业级开发的基石,凭借IoC、AOP、声明式编程等核心特性,极大简化了开发复杂度。本系列将深入解析Spring框架核心原理及Spring Boot、Cloud、Security等生态组件,助力开发者构建高效、可扩展的应用体系。(238字)
|
1月前
|
API 开发者 数据采集
高效获取淘宝商品详情:API 开发实现链接解析的完整技术方案
2025反向海淘新机遇:依托代购系统,聚焦小众垂直品类,结合Pandabay数据选品,降本增效。系统实现智能翻译、支付风控、物流优化,助力中式养生茶等品类利润翻倍,新手也能快速入局全球市场。
高效获取淘宝商品详情:API 开发实现链接解析的完整技术方案
|
2月前
|
数据采集 缓存 API
小红书笔记详情 API 实战指南:从开发对接、场景落地到收益挖掘(附避坑技巧)
本文详解小红书笔记详情API的开发对接、实战场景与收益模式,涵盖注册避坑、签名生成、数据解析全流程,并分享品牌营销、内容创作、SAAS工具等落地应用,助力开发者高效掘金“种草经济”。
小红书笔记详情 API 实战指南:从开发对接、场景落地到收益挖掘(附避坑技巧)
|
2月前
|
人工智能 Java 机器人
基于Spring AI Alibaba + Spring Boot + Ollama搭建本地AI对话机器人API
Spring AI Alibaba集成Ollama,基于Java构建本地大模型应用,支持流式对话、knife4j接口可视化,实现高隐私、免API密钥的离线AI服务。
1870 1
基于Spring AI Alibaba + Spring Boot + Ollama搭建本地AI对话机器人API
|
1月前
|
存储 缓存 算法
淘宝买家秀 API 深度开发:多模态内容解析与合规推荐技术拆解
本文详解淘宝买家秀接口(taobao.reviews.get)的合规调用、数据标准化与智能推荐全链路方案。涵盖权限申请、多模态数据清洗、情感分析、混合推荐模型及缓存优化,助力开发者提升审核效率60%、商品转化率增长28%,实现UGC数据高效变现。
|
1月前
|
存储 缓存 算法
亚马逊 SP-API 深度开发:关键字搜索接口的购物意图挖掘与合规竞品分析
本文深度解析亚马逊SP-API关键字搜索接口的合规调用与商业应用,涵盖意图识别、竞品分析、性能优化全链路。通过COSMO算法解析用户购物意图,结合合规技术方案提升关键词转化率,助力卖家实现数据驱动决策,安全高效优化运营。
|
2月前
|
消息中间件 人工智能 Java
抖音微信爆款小游戏大全:免费休闲/竞技/益智/PHP+Java全筏开源开发
本文基于2025年最新行业数据,深入解析抖音/微信爆款小游戏的开发逻辑,重点讲解PHP+Java双引擎架构实战,涵盖技术选型、架构设计、性能优化与开源生态,提供完整开源工具链,助力开发者从理论到落地打造高留存、高并发的小游戏产品。
|
2月前
|
存储 Java 关系型数据库
Java 项目实战基于面向对象思想的汽车租赁系统开发实例 汽车租赁系统 Java 面向对象项目实战
本文介绍基于Java面向对象编程的汽车租赁系统技术方案与应用实例,涵盖系统功能需求分析、类设计、数据库设计及具体代码实现,帮助开发者掌握Java在实际项目中的应用。
117 0