kafka-Java-SpringBoot-listener API开发

简介: listener开发过程是独立的,你也可以不开发,使用@KafkaListener注解来监听kafka的消息,我的方式是实现一个唯一方法的接口,然后在该方法里面进行消费,无需关心kafka的具体实现,只需要添加一个topics到配置值文件即可.

listener开发过程是独立的,你也可以不开发,使用@KafkaListener注解来监听kafka的消息,我的方式是实现一个唯一方法的接口,然后在该方法里面进行消费,无需关心kafka的具体实现,只需要添加一个topics到配置值文件即可.
项目git地址:

git@github.com:wudonghua/Java-Kafka-SpringBoot-API.git
AI 代码解读

接口:

import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1414:52
 */
public interface IKafkaListener {
    void listener(ConsumerRecord<?, ?> record);
}
AI 代码解读

为实现IKafkaListener接口方注解.其中的TOPICS属性在ConsumerConfiguration中添加,即 private List topics;属性.

import javassist.CannotCompileException;
import javassist.ClassPool;
import javassist.CtClass;
import javassist.NotFoundException;
import javassist.bytecode.AnnotationsAttribute;
import javassist.bytecode.ConstPool;
import javassist.bytecode.annotation.Annotation;
import javassist.bytecode.annotation.ArrayMemberValue;
import javassist.bytecode.annotation.MemberValue;
import javassist.bytecode.annotation.StringMemberValue;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.kafka.annotation.KafkaListener;
import riven.kafka.api.configuration.ConsumerConfiguration;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1415:18
 * 为实现了IKafkaListener接口方法listener追加注解
 */
public class KafkaListenerInitConfig implements BeanPostProcessor, ApplicationContextAware {

    private ApplicationContext applicationContext;
    private static final String TOPICS = "topics";
    private static final String LISTENER = "listener";
    @Autowired
    private ConsumerConfiguration consumerConfiguration;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) {
        try {
            if (bean instanceof IKafkaListener)
                return injectBean(setAnnotation(bean));
            //设置参数注解:
            return bean;
        } catch (NotFoundException | CannotCompileException | IllegalAccessException | InstantiationException e) {
            e.printStackTrace();
        }
        return bean;
    }

    /**
     * 添加Autowired注入的内容
     *
     * @param iKafkaListener
     * @return
     * @throws IllegalAccessException
     */
    private IKafkaListener injectBean(IKafkaListener iKafkaListener) throws IllegalAccessException {
        Field[] fields = iKafkaListener.getClass().getFields();
        Field[] declaredFields = iKafkaListener.getClass().getDeclaredFields();
        Field[] allFiles = ArrayUtils.addAll(fields, declaredFields);
        //判断是否有@Autowired注解
        for (Field field : allFiles) {
            if (field.getAnnotation(Autowired.class) == null)
                continue;
            field.setAccessible(true);
            field.set(iKafkaListener, this.applicationContext.getBean(field.getName()));
        }
        return iKafkaListener;
    }

    /**
     * 添加注解及其属性
     *
     * @param bean
     * @return
     * @throws NotFoundException
     * @throws CannotCompileException
     * @throws IllegalAccessException
     * @throws InstantiationException
     */
    private IKafkaListener setAnnotation(Object bean) throws NotFoundException, CannotCompileException, IllegalAccessException, InstantiationException {
        ClassPool classPool = ClassPool.getDefault();
        //获取当前Bean的常量池
        CtClass ctClass = classPool.getCtClass(bean.getClass().getName());
        ConstPool constPool = ctClass.getClassFile().getConstPool();
        //获取对应的注解内容
        List<StringMemberValue> list = new ArrayList<>(consumerConfiguration.getTopics().size());
        //创建注解属性
        consumerConfiguration.getTopics().forEach(topic -> list.add(new StringMemberValue(topic, constPool)));
        MemberValue[] memberValues = new MemberValue[list.size()];
        ArrayMemberValue arrayMemberValue = new ArrayMemberValue(constPool);
        arrayMemberValue.setValue(list.toArray(memberValues));
        //创建注解
        Annotation topics = new Annotation(KafkaListener.class.getName(), constPool);
        //为注解属性赋值
        topics.addMemberValue(TOPICS, arrayMemberValue);
        //创建注解容器
        AnnotationsAttribute annotationsAttribute = new AnnotationsAttribute(constPool, AnnotationsAttribute.visibleTag);
        annotationsAttribute.setAnnotation(topics);
        //把注解放到目标方法
        ctClass.getDeclaredMethod(LISTENER).getMethodInfo().addAttribute(annotationsAttribute);

        //生成一个全新的对象
        Class aClass = ctClass.toClass(new ClassLoader() {
            @Override
            public Class<?> loadClass(String name) throws ClassNotFoundException {
                return super.loadClass(name);
            }
        }, null);
        return (IKafkaListener) aClass.newInstance();
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    /**
     * 获取SpringIOC
     *
     * @param applicationContext
     * @throws BeansException
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}
AI 代码解读

把KafkaConsumerListener注册到SpringIOC之中:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @Author dw07-Riven770[wudonghua@gznb.com]
* @Date 2017/12/1416:04
* 把KafkaConsumerListener注册到SpringIOC之中
*/
@Configuration
@ConditionalOnProperty(name = {"Riven.kafka.consumer.bootstrapServers","Riven.kafka.consumer.groupId"})
public class CreateKafkaListener {
   private Logger logger = LoggerFactory.getLogger(this.getClass());

   @Bean
   public KafkaListenerInitConfig init() {
       return new KafkaListenerInitConfig();
   }
}
AI 代码解读

同样的,在配置文件根目录下创建Spring监听器:
spring.factories文件
并添加需要Spring监听初始化的类路径(多个使用,逗号隔开):

org.springframework.boot.autoconfigure.EnableAutoConfiguration=riven.kafka.api.producer.ProducerInitialize,riven.kafka.api.consumer.ConsumerInitialize,riven.kafka.api.listener.CreateKafkaListener
AI 代码解读
相关文章
1688商品数据实战:API搜索接口开发与供应链分析应用
本文详细介绍了如何通过1688开放API实现商品数据的获取与应用,涵盖接入准备、签名流程、数据解析存储及商业化场景。开发者可完成智能选品、价格监控和供应商评级等功能,同时提供代码示例与问题解决方案,确保法律合规与数据安全。适合企业开发者快速构建供应链管理系统。
|
11天前
|
如何在苹果内购开发中获取App Store Connect API密钥-共享密钥理解内购安全-优雅草卓伊凡
如何在苹果内购开发中获取App Store Connect API密钥-共享密钥理解内购安全-优雅草卓伊凡
78 15
如何在苹果内购开发中获取App Store Connect API密钥-共享密钥理解内购安全-优雅草卓伊凡
JAVA接入DeepSeek大模型接口开发---阿里云的百炼模型
随着大模型的越来越盛行,现在很多企业开始接入大模型的接口,今天我从java开发角度来写一个demo的示例,用于接入DeepSeek大模型,国内的大模型有很多的接入渠道,今天主要介绍下阿里云的百炼模型,因为这个模型是免费的,只要注册一个账户,就会免费送百万的token进行学习,今天就从一个简单的可以执行的示例开始进行介绍,希望可以分享给各位正在学习的同学们。
138 3
JAVA接入DeepSeek大模型接口开发---阿里云的百炼模型
鸿蒙相机开发实战:从设备适配到性能调优 —— 我的 ArkTS 录像功能落地手记(API 15)
本文分享鸿蒙相机开发经验,从环境准备到核心逻辑实现,涵盖权限声明、模块导入、Surface关联与分辨率匹配,再到录制控制及设备适配法则。通过实战案例解析,如旋转补偿、动态帧率调节和编解码优化,帮助开发者掌握功能实现、设备适配与体验设计三大要点,减少开发坑点。适合鸿蒙新手及希望深化硬件交互能力的工程师参考收藏。
56 2
|
22天前
|
《从头开始学java,一天一个知识点》之:字符串处理:String类的核心API
🌱 **《字符串处理:String类的核心API》一分钟速通!** 本文快速介绍Java中String类的3个高频API:`substring`、`indexOf`和`split`,并通过代码示例展示其用法。重点提示:`substring`的结束索引不包含该位置,`split`支持正则表达式。进一步探讨了String不可变性的高效设计原理及企业级编码规范,如避免使用`new String()`、拼接时使用`StringBuilder`等。最后通过互动解密游戏帮助读者巩固知识。 (上一篇:《多维数组与常见操作》 | 下一篇预告:《输入与输出:Scanner与System类》)
47 11
Java||Springboot读取本地目录的文件和文件结构,读取服务器文档目录数据供前端渲染的API实现
博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
Java||Springboot读取本地目录的文件和文件结构,读取服务器文档目录数据供前端渲染的API实现
微店API开发全攻略:解锁电商数据与业务自动化的核心能力
微店开放平台提供覆盖商品、订单、用户、营销、物流五大核心模块的API接口,支持企业快速构建电商中台系统。其API体系具备模块化设计、双重认证机制、高并发支持和数据隔离等特性。文档详细解析了商品管理、订单处理、营销工具等核心接口功能,并提供实战代码示例。同时,介绍了企业级整合方案设计,如订单全链路自动化和商品数据中台架构,以及性能优化与稳定性保障措施。最后,针对高频问题提供了排查指南,帮助开发者高效利用API实现电商数智化转型。适合中高级开发者阅读。
前后端分离开发:如何高效调试API?有工具 vs 无工具全解析
在前后端分离的开发模式中,API 调试的效率直接影响项目的质量和交付速度。通过本文的对比分析,我们可以看到无工具调试模式虽具备灵活性和代码复用能力,但在操作便利性和团队协作上稍显不足。而传统的外部调试工具带来了可视化、高效协作与扩展性,却可能存在工具切换带来的开发链路断层问题。Apipost-Hepler 融合了两者的优势,让开发者无需离开熟悉的 IDEA 环境,就能享受可视化调试工具的强大功能。
47 5
云原生应用实战:基于阿里云Serverless的API服务开发与部署
随着云计算的发展,Serverless架构日益流行。阿里云函数计算(Function Compute)作为Serverless服务,让开发者无需管理服务器即可运行代码,按需付费,简化开发运维流程。本文从零开始,介绍如何使用阿里云函数计算开发简单的API服务,并探讨其核心优势与最佳实践。通过Python示例,演示创建、部署及优化API的过程,涵盖环境准备、代码实现、性能优化和安全管理等内容,帮助读者快速上手Serverless开发。
课时146:使用JDT开发Java程序
在 Eclipse 之中提供有 JDT环境可以实现java 程序的开发,下面就通过一些功能进行演示。 项目开发流程

热门文章

最新文章