kafka-Java-SpringBoot-listener API开发

简介: listener开发过程是独立的,你也可以不开发,使用@KafkaListener注解来监听kafka的消息,我的方式是实现一个唯一方法的接口,然后在该方法里面进行消费,无需关心kafka的具体实现,只需要添加一个topics到配置值文件即可.

listener开发过程是独立的,你也可以不开发,使用@KafkaListener注解来监听kafka的消息,我的方式是实现一个唯一方法的接口,然后在该方法里面进行消费,无需关心kafka的具体实现,只需要添加一个topics到配置值文件即可.
项目git地址:

git@github.com:wudonghua/Java-Kafka-SpringBoot-API.git

接口:

import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1414:52
 */
public interface IKafkaListener {
    void listener(ConsumerRecord<?, ?> record);
}

为实现IKafkaListener接口方注解.其中的TOPICS属性在ConsumerConfiguration中添加,即 private List topics;属性.

import javassist.CannotCompileException;
import javassist.ClassPool;
import javassist.CtClass;
import javassist.NotFoundException;
import javassist.bytecode.AnnotationsAttribute;
import javassist.bytecode.ConstPool;
import javassist.bytecode.annotation.Annotation;
import javassist.bytecode.annotation.ArrayMemberValue;
import javassist.bytecode.annotation.MemberValue;
import javassist.bytecode.annotation.StringMemberValue;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.kafka.annotation.KafkaListener;
import riven.kafka.api.configuration.ConsumerConfiguration;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1415:18
 * 为实现了IKafkaListener接口方法listener追加注解
 */
public class KafkaListenerInitConfig implements BeanPostProcessor, ApplicationContextAware {

    private ApplicationContext applicationContext;
    private static final String TOPICS = "topics";
    private static final String LISTENER = "listener";
    @Autowired
    private ConsumerConfiguration consumerConfiguration;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) {
        try {
            if (bean instanceof IKafkaListener)
                return injectBean(setAnnotation(bean));
            //设置参数注解:
            return bean;
        } catch (NotFoundException | CannotCompileException | IllegalAccessException | InstantiationException e) {
            e.printStackTrace();
        }
        return bean;
    }

    /**
     * 添加Autowired注入的内容
     *
     * @param iKafkaListener
     * @return
     * @throws IllegalAccessException
     */
    private IKafkaListener injectBean(IKafkaListener iKafkaListener) throws IllegalAccessException {
        Field[] fields = iKafkaListener.getClass().getFields();
        Field[] declaredFields = iKafkaListener.getClass().getDeclaredFields();
        Field[] allFiles = ArrayUtils.addAll(fields, declaredFields);
        //判断是否有@Autowired注解
        for (Field field : allFiles) {
            if (field.getAnnotation(Autowired.class) == null)
                continue;
            field.setAccessible(true);
            field.set(iKafkaListener, this.applicationContext.getBean(field.getName()));
        }
        return iKafkaListener;
    }

    /**
     * 添加注解及其属性
     *
     * @param bean
     * @return
     * @throws NotFoundException
     * @throws CannotCompileException
     * @throws IllegalAccessException
     * @throws InstantiationException
     */
    private IKafkaListener setAnnotation(Object bean) throws NotFoundException, CannotCompileException, IllegalAccessException, InstantiationException {
        ClassPool classPool = ClassPool.getDefault();
        //获取当前Bean的常量池
        CtClass ctClass = classPool.getCtClass(bean.getClass().getName());
        ConstPool constPool = ctClass.getClassFile().getConstPool();
        //获取对应的注解内容
        List<StringMemberValue> list = new ArrayList<>(consumerConfiguration.getTopics().size());
        //创建注解属性
        consumerConfiguration.getTopics().forEach(topic -> list.add(new StringMemberValue(topic, constPool)));
        MemberValue[] memberValues = new MemberValue[list.size()];
        ArrayMemberValue arrayMemberValue = new ArrayMemberValue(constPool);
        arrayMemberValue.setValue(list.toArray(memberValues));
        //创建注解
        Annotation topics = new Annotation(KafkaListener.class.getName(), constPool);
        //为注解属性赋值
        topics.addMemberValue(TOPICS, arrayMemberValue);
        //创建注解容器
        AnnotationsAttribute annotationsAttribute = new AnnotationsAttribute(constPool, AnnotationsAttribute.visibleTag);
        annotationsAttribute.setAnnotation(topics);
        //把注解放到目标方法
        ctClass.getDeclaredMethod(LISTENER).getMethodInfo().addAttribute(annotationsAttribute);

        //生成一个全新的对象
        Class aClass = ctClass.toClass(new ClassLoader() {
            @Override
            public Class<?> loadClass(String name) throws ClassNotFoundException {
                return super.loadClass(name);
            }
        }, null);
        return (IKafkaListener) aClass.newInstance();
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    /**
     * 获取SpringIOC
     *
     * @param applicationContext
     * @throws BeansException
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

把KafkaConsumerListener注册到SpringIOC之中:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @Author dw07-Riven770[wudonghua@gznb.com]
* @Date 2017/12/1416:04
* 把KafkaConsumerListener注册到SpringIOC之中
*/
@Configuration
@ConditionalOnProperty(name = {"Riven.kafka.consumer.bootstrapServers","Riven.kafka.consumer.groupId"})
public class CreateKafkaListener {
   private Logger logger = LoggerFactory.getLogger(this.getClass());

   @Bean
   public KafkaListenerInitConfig init() {
       return new KafkaListenerInitConfig();
   }
}

同样的,在配置文件根目录下创建Spring监听器:
spring.factories文件
并添加需要Spring监听初始化的类路径(多个使用,逗号隔开):

org.springframework.boot.autoconfigure.EnableAutoConfiguration=riven.kafka.api.producer.ProducerInitialize,riven.kafka.api.consumer.ConsumerInitialize,riven.kafka.api.listener.CreateKafkaListener
目录
相关文章
|
1天前
|
API 数据安全/隐私保护 UED
探索鸿蒙的蓝牙A2DP与访问API:从学习到实现的开发之旅
在掌握了鸿蒙系统的开发基础后,我挑战了蓝牙功能的开发。通过Bluetooth A2DP和Access API,实现了蓝牙音频流传输、设备连接和权限管理。具体步骤包括:理解API作用、配置环境与权限、扫描并连接设备、实现音频流控制及动态切换设备。最终,我构建了一个简单的蓝牙音频播放器,具备设备扫描、连接、音频播放与停止、切换输出设备等功能。这次开发让我对蓝牙技术有了更深的理解,也为未来的复杂项目打下了坚实的基础。
81 58
探索鸿蒙的蓝牙A2DP与访问API:从学习到实现的开发之旅
|
3天前
|
移动开发 前端开发 Java
Java最新图形化界面开发技术——JavaFx教程(含UI控件用法介绍、属性绑定、事件监听、FXML)
JavaFX是Java的下一代图形用户界面工具包。JavaFX是一组图形和媒体API,我们可以用它们来创建和部署富客户端应用程序。 JavaFX允许开发人员快速构建丰富的跨平台应用程序,允许开发人员在单个编程接口中组合图形,动画和UI控件。本文详细介绍了JavaFx的常见用法,相信读完本教程你一定有所收获!
Java最新图形化界面开发技术——JavaFx教程(含UI控件用法介绍、属性绑定、事件监听、FXML)
|
4天前
|
JSON Java Apache
Java基础-常用API-Object类
继承是面向对象编程的重要特性,允许从已有类派生新类。Java采用单继承机制,默认所有类继承自Object类。Object类提供了多个常用方法,如`clone()`用于复制对象,`equals()`判断对象是否相等,`hashCode()`计算哈希码,`toString()`返回对象的字符串表示,`wait()`、`notify()`和`notifyAll()`用于线程同步,`finalize()`在对象被垃圾回收时调用。掌握这些方法有助于更好地理解和使用Java中的对象行为。
|
1天前
|
供应链 搜索推荐 API
1688榜单商品详细信息API接口的开发、应用与收益
1688作为全球知名的B2B电商平台,为企业提供丰富的商品信息和交易机会。为满足企业对数据的需求,1688开发了榜单商品详细信息API接口,帮助企业批量获取商品详情,应用于信息采集、校验、同步与数据分析等领域,提升运营效率、优化库存管理、精准推荐、制定市场策略、降低采购成本并提高客户满意度。该接口通过HTTP请求调用,支持多种应用场景,助力企业在电商领域实现可持续发展。
22 4
|
8天前
|
存储 安全 Java
Spring Boot 编写 API 的 10条最佳实践
本文总结了 10 个编写 Spring Boot API 的最佳实践,包括 RESTful API 设计原则、注解使用、依赖注入、异常处理、数据传输对象(DTO)建模、安全措施、版本控制、文档生成、测试策略以及监控和日志记录。每个实践都配有详细的编码示例和解释,帮助开发者像专业人士一样构建高质量的 API。
|
8天前
|
监控 供应链 搜索推荐
阿里妈妈商品详情API接口:开发、应用与收益的深度剖析
阿里妈妈是阿里巴巴旗下的数字营销平台,其商品详情API接口为开发者提供了获取淘宝、天猫等电商平台商品详细信息的工具。本文介绍了该接口的开发流程、应用场景及带来的收益,揭示了其在电商生态中的重要地位。
59 6
|
8天前
|
供应链 搜索推荐 API
1688APP原数据API接口的开发、应用与收益(一篇文章全明白)
1688作为全球知名的B2B电商平台,通过开放的原数据API接口,为开发者提供了丰富的数据资源,涵盖商品信息、交易数据、店铺信息、物流信息和用户信息等。本文将深入探讨1688 APP原数据API接口的开发、应用及其带来的商业收益,包括提升流量、优化库存管理、增强用户体验等方面。
50 6
|
10天前
|
监控 搜索推荐 API
京东商品详情API接口的开发、应用与收益探索
在数字化和互联网高速发展的时代,京东通过开放商品详情API接口,为开发者、企业和商家提供了丰富的数据源和创新空间。本文将探讨该API接口的开发背景、流程、应用场景及带来的多重收益,包括促进生态系统建设、提升数据利用效率和推动数字化转型等。
35 3
|
11天前
|
前端开发 Java 测试技术
java日常开发中如何写出优雅的好维护的代码
代码可读性太差,实际是给团队后续开发中埋坑,优化在平时,没有那个团队会说我专门给你一个月来优化之前的代码,所以在日常开发中就要多注意可读性问题,不要写出几天之后自己都看不懂的代码。
49 2
|
4天前
|
存储 搜索推荐 API
小红书笔记详情API接口的开发、应用与收益
小红书笔记详情API接口为开发者、企业和内容创作者提供了获取平台丰富资源的通道。通过该接口,用户可以提取笔记的详细信息(如标题、正文、标签等),并应用于市场调研、竞品分析、内容创作、电商推荐等多个领域。这不仅有助于提升品牌影响力和优化用户体验,还能挖掘商业机会,促进内容创新,增强用户互动与社群凝聚力。总之,小红书笔记详情API接口为企业和个人在社交媒体领域探索新增长点提供了重要工具。
32 0