kafka-Java-SpringBoot-listener API开发

简介: listener开发过程是独立的,你也可以不开发,使用@KafkaListener注解来监听kafka的消息,我的方式是实现一个唯一方法的接口,然后在该方法里面进行消费,无需关心kafka的具体实现,只需要添加一个topics到配置值文件即可.

listener开发过程是独立的,你也可以不开发,使用@KafkaListener注解来监听kafka的消息,我的方式是实现一个唯一方法的接口,然后在该方法里面进行消费,无需关心kafka的具体实现,只需要添加一个topics到配置值文件即可.
项目git地址:

git@github.com:wudonghua/Java-Kafka-SpringBoot-API.git

接口:

import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1414:52
 */
public interface IKafkaListener {
    void listener(ConsumerRecord<?, ?> record);
}

为实现IKafkaListener接口方注解.其中的TOPICS属性在ConsumerConfiguration中添加,即 private List topics;属性.

import javassist.CannotCompileException;
import javassist.ClassPool;
import javassist.CtClass;
import javassist.NotFoundException;
import javassist.bytecode.AnnotationsAttribute;
import javassist.bytecode.ConstPool;
import javassist.bytecode.annotation.Annotation;
import javassist.bytecode.annotation.ArrayMemberValue;
import javassist.bytecode.annotation.MemberValue;
import javassist.bytecode.annotation.StringMemberValue;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.kafka.annotation.KafkaListener;
import riven.kafka.api.configuration.ConsumerConfiguration;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1415:18
 * 为实现了IKafkaListener接口方法listener追加注解
 */
public class KafkaListenerInitConfig implements BeanPostProcessor, ApplicationContextAware {

    private ApplicationContext applicationContext;
    private static final String TOPICS = "topics";
    private static final String LISTENER = "listener";
    @Autowired
    private ConsumerConfiguration consumerConfiguration;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) {
        try {
            if (bean instanceof IKafkaListener)
                return injectBean(setAnnotation(bean));
            //设置参数注解:
            return bean;
        } catch (NotFoundException | CannotCompileException | IllegalAccessException | InstantiationException e) {
            e.printStackTrace();
        }
        return bean;
    }

    /**
     * 添加Autowired注入的内容
     *
     * @param iKafkaListener
     * @return
     * @throws IllegalAccessException
     */
    private IKafkaListener injectBean(IKafkaListener iKafkaListener) throws IllegalAccessException {
        Field[] fields = iKafkaListener.getClass().getFields();
        Field[] declaredFields = iKafkaListener.getClass().getDeclaredFields();
        Field[] allFiles = ArrayUtils.addAll(fields, declaredFields);
        //判断是否有@Autowired注解
        for (Field field : allFiles) {
            if (field.getAnnotation(Autowired.class) == null)
                continue;
            field.setAccessible(true);
            field.set(iKafkaListener, this.applicationContext.getBean(field.getName()));
        }
        return iKafkaListener;
    }

    /**
     * 添加注解及其属性
     *
     * @param bean
     * @return
     * @throws NotFoundException
     * @throws CannotCompileException
     * @throws IllegalAccessException
     * @throws InstantiationException
     */
    private IKafkaListener setAnnotation(Object bean) throws NotFoundException, CannotCompileException, IllegalAccessException, InstantiationException {
        ClassPool classPool = ClassPool.getDefault();
        //获取当前Bean的常量池
        CtClass ctClass = classPool.getCtClass(bean.getClass().getName());
        ConstPool constPool = ctClass.getClassFile().getConstPool();
        //获取对应的注解内容
        List<StringMemberValue> list = new ArrayList<>(consumerConfiguration.getTopics().size());
        //创建注解属性
        consumerConfiguration.getTopics().forEach(topic -> list.add(new StringMemberValue(topic, constPool)));
        MemberValue[] memberValues = new MemberValue[list.size()];
        ArrayMemberValue arrayMemberValue = new ArrayMemberValue(constPool);
        arrayMemberValue.setValue(list.toArray(memberValues));
        //创建注解
        Annotation topics = new Annotation(KafkaListener.class.getName(), constPool);
        //为注解属性赋值
        topics.addMemberValue(TOPICS, arrayMemberValue);
        //创建注解容器
        AnnotationsAttribute annotationsAttribute = new AnnotationsAttribute(constPool, AnnotationsAttribute.visibleTag);
        annotationsAttribute.setAnnotation(topics);
        //把注解放到目标方法
        ctClass.getDeclaredMethod(LISTENER).getMethodInfo().addAttribute(annotationsAttribute);

        //生成一个全新的对象
        Class aClass = ctClass.toClass(new ClassLoader() {
            @Override
            public Class<?> loadClass(String name) throws ClassNotFoundException {
                return super.loadClass(name);
            }
        }, null);
        return (IKafkaListener) aClass.newInstance();
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    /**
     * 获取SpringIOC
     *
     * @param applicationContext
     * @throws BeansException
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

把KafkaConsumerListener注册到SpringIOC之中:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @Author dw07-Riven770[wudonghua@gznb.com]
* @Date 2017/12/1416:04
* 把KafkaConsumerListener注册到SpringIOC之中
*/
@Configuration
@ConditionalOnProperty(name = {"Riven.kafka.consumer.bootstrapServers","Riven.kafka.consumer.groupId"})
public class CreateKafkaListener {
   private Logger logger = LoggerFactory.getLogger(this.getClass());

   @Bean
   public KafkaListenerInitConfig init() {
       return new KafkaListenerInitConfig();
   }
}

同样的,在配置文件根目录下创建Spring监听器:
spring.factories文件
并添加需要Spring监听初始化的类路径(多个使用,逗号隔开):

org.springframework.boot.autoconfigure.EnableAutoConfiguration=riven.kafka.api.producer.ProducerInitialize,riven.kafka.api.consumer.ConsumerInitialize,riven.kafka.api.listener.CreateKafkaListener
目录
相关文章
|
15天前
|
Java 开发者 微服务
Spring Boot 入门:简化 Java Web 开发的强大工具
Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级别的基于Spring框架的应用程序。它旨在简化Spring应用的初始搭建以及开发过程。
32 6
Spring Boot 入门:简化 Java Web 开发的强大工具
|
2天前
|
存储 JavaScript 前端开发
基于 SpringBoot 和 Vue 开发校园点餐订餐外卖跑腿Java源码
一个非常实用的校园外卖系统,基于 SpringBoot 和 Vue 的开发。这一系统源于黑马的外卖案例项目 经过站长的进一步改进和优化,提供了更丰富的功能和更高的可用性。 这个项目的架构设计非常有趣。虽然它采用了SpringBoot和Vue的组合,但并不是一个完全分离的项目。 前端视图通过JS的方式引入了Vue和Element UI,既能利用Vue的快速开发优势,
31 13
|
4天前
|
供应链 搜索推荐 API
探索1688榜单商品详细信息API接口:开发、应用与收益
本文深入探讨了1688榜单商品详细信息API接口的开发与应用,涵盖接口概述、开发条件、调用方法及数据处理等内容。该API帮助企业高效获取1688平台商品信息,应用于商品信息采集、校验、同步与数据分析等领域,有效提升了企业的运营效率、库存管理、销售转化率及市场策略制定能力,降低了采购成本,提升了客户满意度。
23 9
|
7天前
|
算法 Java API
如何使用Java开发获得淘宝商品描述API接口?
本文详细介绍如何使用Java开发调用淘宝商品描述API接口,涵盖从注册淘宝开放平台账号、阅读平台规则、创建应用并申请接口权限,到安装开发工具、配置开发环境、获取访问令牌,以及具体的Java代码实现和注意事项。通过遵循这些步骤,开发者可以高效地获取商品详情、描述及图片等信息,为项目和业务增添价值。
38 10
|
7天前
|
存储 API 数据库
使用Python开发获取商品销量详情API接口
本文介绍了使用Python开发获取商品销量详情的API接口方法,涵盖API接口概述、技术选型(Flask与FastAPI)、环境准备、API接口创建及调用淘宝开放平台API等内容。通过示例代码,详细说明了如何构建和调用API,以及开发过程中需要注意的事项,如数据库连接、API权限、错误处理、安全性和性能优化等。
40 5
|
6天前
|
数据可视化 搜索推荐 API
速卖通获得aliexpress商品详情API接口的开发、应用与收益。
速卖通(AliExpress)作为阿里巴巴旗下的跨境电商平台,为全球消费者提供丰富商品。其开放平台提供的API接口支持开发者获取商品详情等信息,本文探讨了速卖通商品详情API的开发流程、应用场景及潜在收益,包括提高运营效率、降低成本、增加收入和提升竞争力等方面。
20 1
|
10天前
|
JavaScript 安全 Java
java版药品不良反应智能监测系统源码,采用SpringBoot、Vue、MySQL技术开发
基于B/S架构,采用Java、SpringBoot、Vue、MySQL等技术自主研发的ADR智能监测系统,适用于三甲医院,支持二次开发。该系统能自动监测全院患者药物不良反应,通过移动端和PC端实时反馈,提升用药安全。系统涵盖规则管理、监测报告、系统管理三大模块,确保精准、高效地处理ADR事件。
|
18天前
|
Java 测试技术 API
详解Swagger:Spring Boot中的API文档生成与测试工具
详解Swagger:Spring Boot中的API文档生成与测试工具
32 4
|
18天前
|
安全 测试技术 API
构建高效RESTful API:后端开发的艺术与实践####
在现代软件开发的浩瀚星空中,RESTful API如同一座桥梁,连接着前端世界的绚丽多彩与后端逻辑的深邃复杂。本文旨在探讨如何精心打造一款既高效又易于维护的RESTful API,通过深入浅出的方式,剖析其设计原则、实现技巧及最佳实践,为后端开发者提供一份实用的指南。我们不深入晦涩的理论,只聚焦于那些能够即刻提升API品质与开发效率的关键点,让你的API在众多服务中脱颖而出。 ####
26 0
|
消息中间件 存储 安全
干货|SpringBoot JMS(ActiveMQ)API实践应用详解
Active是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。AC-tiveMQ使用Apache提供的...
314 0