Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析

20191116123525638.png


概述


【依赖】

  <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
  </dependency>


【配置】

#kafka
spring.kafka.bootstrap-servers=10.11.114.247:9092
spring.kafka.producer.acks=1
spring.kafka.producer.retries=3
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.group-id=zfprocessor_group
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.artisan.common.entity.messages
spring.kafka.consumer.max-poll-records=500
spring.kafka.consumer.fetch-min-size=10
spring.kafka.consumer.fetch-max-wait=10000ms
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.listener.type=batch
spring.kafka.listener.ack-mode=manual
logging.level.org.springframework.kafka=ERROR
logging.level.org.apache.kafka=ERROR


Spring-kafka生产者源码流程


ListenableFuture<SendResult<Object, Object>> result = kafkaTemplate.send(TOPICA.TOPIC, messageMock);


主要的源码流程如下


20210401231308744.png

Spring-kafka消费者源码流程(@EnableKafka@KafkaListener

消费的话,比较复杂

 @KafkaListener(topics = TOPICA.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPICA.TOPIC)
    public void onMessage(MessageMock messageMock){
        logger.info("【接受到消息][线程:{} 消息内容:{}]", Thread.currentThread().getName(), messageMock);
    }


划重点,主要关注


Flow


20210402001046269.png


作为引子,我们继续来梳理下源码


20210401232847522.png

继续

20210401232957759.png

继续


20210401233101276.png

KafkaBootstrapConfiguration的主要功能是创建两个bean


KafkaListenerAnnotationBeanPostProcessor


实现了如下接口

implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton


主要功能就是监听@KafkaListener注解 。 bean的后置处理器 需要重写 postProcessAfterInitialization

@Override
  public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
    if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
       // 获取对应的class
      Class<?> targetClass = AopUtils.getTargetClass(bean);
      // 查找类是否有@KafkaListener注解
      Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
      final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
      final List<Method> multiMethods = new ArrayList<>();
      // 查找类中方法上是否有对应的@KafkaListener注解,
      Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
          (MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
            Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
            return (!listenerMethods.isEmpty() ? listenerMethods : null);
          });
      if (hasClassLevelListeners) {
        Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
            (ReflectionUtils.MethodFilter) method ->
                AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
        multiMethods.addAll(methodsWithHandler);
      }
      if (annotatedMethods.isEmpty()) {
        this.nonAnnotatedClasses.add(bean.getClass());
        this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
      }
      else {
        // Non-empty set of methods
        for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
          Method method = entry.getKey();
          for (KafkaListener listener : entry.getValue()) {
            // 处理@KafkaListener注解   重点看 
            processKafkaListener(listener, method, bean, beanName);
          }
        }
        this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
              + beanName + "': " + annotatedMethods);
      }
      if (hasClassLevelListeners) {
        processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
      }
    }
    return bean;
  }

重点方法

  protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
    Method methodToUse = checkProxy(method, bean);
    MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
    endpoint.setMethod(methodToUse);
    processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
  }


继续 processListener

protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
      Object bean, Object adminTarget, String beanName) {
    String beanRef = kafkaListener.beanRef();
    if (StringUtils.hasText(beanRef)) {
      this.listenerScope.addListener(beanRef, bean);
    }
    // 构建 endpoint
    endpoint.setBean(bean);
    endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
    endpoint.setId(getEndpointId(kafkaListener));
    endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
    endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
    endpoint.setTopics(resolveTopics(kafkaListener));
    endpoint.setTopicPattern(resolvePattern(kafkaListener));
    endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
    String group = kafkaListener.containerGroup();
    if (StringUtils.hasText(group)) {
      Object resolvedGroup = resolveExpression(group);
      if (resolvedGroup instanceof String) {
        endpoint.setGroup((String) resolvedGroup);
      }
    }
    String concurrency = kafkaListener.concurrency();
    if (StringUtils.hasText(concurrency)) {
      endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
    }
    String autoStartup = kafkaListener.autoStartup();
    if (StringUtils.hasText(autoStartup)) {
      endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
    }
    resolveKafkaProperties(endpoint, kafkaListener.properties());
    endpoint.setSplitIterables(kafkaListener.splitIterables());
    KafkaListenerContainerFactory<?> factory = null;
    String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
    if (StringUtils.hasText(containerFactoryBeanName)) {
      Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
      try {
        factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
      }
      catch (NoSuchBeanDefinitionException ex) {
        throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget
            + "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()
            + " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
      }
    }
    endpoint.setBeanFactory(this.beanFactory);
    String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
    if (StringUtils.hasText(errorHandlerBeanName)) {
      endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
    }
    // 将endpoint注册到registrar
    this.registrar.registerEndpoint(endpoint, factory);
    if (StringUtils.hasText(beanRef)) {
      this.listenerScope.removeListener(beanRef);
    }
  }


继续看 registerEndpoint

  public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
    Assert.notNull(endpoint, "Endpoint must be set");
    Assert.hasText(endpoint.getId(), "Endpoint id must be set");
    // Factory may be null, we defer the resolution right before actually creating the container
    // 把endpoint封装为KafkaListenerEndpointDescriptor
    KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
    synchronized (this.endpointDescriptors) {
      if (this.startImmediately) { // Register and start immediately
        this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
            resolveContainerFactory(descriptor), true);
      }
      else {
         // 将descriptor添加到endpointDescriptors
        this.endpointDescriptors.add(descriptor);
      }
    }
  }



总的来看: 得到一个含有KafkaListener基本信息的Endpoint,将Endpoint被封装到KafkaListenerEndpointDescriptor,KafkaListenerEndpointDescriptor被添加到KafkaListenerEndpointRegistrar.endpointDescriptors中,至此这部分的流程结束了,感觉没有下文呀。

20210402223326415.png


KafkaListenerEndpointRegistrar.endpointDescriptors 这个List中的数据怎么用呢?

public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {}


KafkaListenerEndpointRegistrar 实现了 InitializingBean 接口,重写 afterPropertiesSet,该方法会在bean实例化完成后执行

  @Override
  public void afterPropertiesSet() {
    registerAllEndpoints();
  }


继续 registerAllEndpoints();

  protected void registerAllEndpoints() {
    synchronized (this.endpointDescriptors) {
    // 遍历KafkaListenerEndpointDescriptor 
      for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
          // 注册 
        this.endpointRegistry.registerListenerContainer(
            descriptor.endpoint, resolveContainerFactory(descriptor));
      }
      this.startImmediately = true;  // trigger immediate startup
    }
  }


继续

  public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
    registerListenerContainer(endpoint, factory, false);
  }


go

public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
      boolean startImmediately) {
    Assert.notNull(endpoint, "Endpoint must not be null");
    Assert.notNull(factory, "Factory must not be null");
    String id = endpoint.getId();
    Assert.hasText(id, "Endpoint id must not be empty");
    synchronized (this.listenerContainers) {
      Assert.state(!this.listenerContainers.containsKey(id),
          "Another endpoint is already registered with id '" + id + "'");   
        // 创建Endpoint对应的MessageListenerContainer,将创建好的MessageListenerContainer放入listenerContainers
      MessageListenerContainer container = createListenerContainer(endpoint, factory);
      this.listenerContainers.put(id, container);
      // 如果KafkaListener注解中有对应的group信息,则将container添加到对应的group中
      if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
        List<MessageListenerContainer> containerGroup;
        if (this.applicationContext.containsBean(endpoint.getGroup())) {
          containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
        }
        else {
          containerGroup = new ArrayList<MessageListenerContainer>();
          this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
        }
        containerGroup.add(container);
      }
      if (startImmediately) {
        startIfNecessary(container);
      }
    }
  }


相关文章
|
9天前
|
移动开发 前端开发 JavaScript
从入门到精通:H5游戏源码开发技术全解析与未来趋势洞察
H5游戏凭借其跨平台、易传播和开发成本低的优势,近年来发展迅猛。接下来,让我们深入了解 H5 游戏源码开发的技术教程以及未来的发展趋势。
|
7天前
|
存储 前端开发 JavaScript
在线教育网课系统源码开发指南:功能设计与技术实现深度解析
在线教育网课系统是近年来发展迅猛的教育形式的核心载体,具备用户管理、课程管理、教学互动、学习评估等功能。本文从功能和技术两方面解析其源码开发,涵盖前端(HTML5、CSS3、JavaScript等)、后端(Java、Python等)、流媒体及云计算技术,并强调安全性、稳定性和用户体验的重要性。
|
16天前
|
机器学习/深度学习 自然语言处理 算法
生成式 AI 大语言模型(LLMs)核心算法及源码解析:预训练篇
生成式 AI 大语言模型(LLMs)核心算法及源码解析:预训练篇
114 0
|
3月前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
3月前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
3月前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
2月前
|
自然语言处理 数据处理 索引
mindspeed-llm源码解析(一)preprocess_data
mindspeed-llm是昇腾模型套件代码仓,原来叫"modelLink"。这篇文章带大家阅读一下数据处理脚本preprocess_data.py(基于1.0.0分支),数据处理是模型训练的第一步,经常会用到。
75 0
|
20天前
|
Java 数据库 开发者
详细介绍SpringBoot启动流程及配置类解析原理
通过对 Spring Boot 启动流程及配置类解析原理的深入分析,我们可以看到 Spring Boot 在启动时的灵活性和可扩展性。理解这些机制不仅有助于开发者更好地使用 Spring Boot 进行应用开发,还能够在面对问题时,迅速定位和解决问题。希望本文能为您在 Spring Boot 开发过程中提供有效的指导和帮助。
69 12
|
23天前
|
Java 应用服务中间件 Maven
SpringBoot项目打包成war包
通过上述步骤,我们成功地将一个Spring Boot应用打包成WAR文件,并部署到外部的Tomcat服务器中。这种方式适用于需要与传统Servlet容器集成的场景。
41 8
|
2月前
|
XML Java 应用服务中间件
Spring Boot 两种部署到服务器的方式
本文介绍了Spring Boot项目的两种部署方式:jar包和war包。Jar包方式使用内置Tomcat,只需配置JDK 1.8及以上环境,通过`nohup java -jar`命令后台运行,并开放服务器端口即可访问。War包则需将项目打包后放入外部Tomcat的webapps目录,修改启动类继承`SpringBootServletInitializer`并调整pom.xml中的打包类型为war,最后启动Tomcat访问应用。两者各有优劣,jar包更简单便捷,而war包适合传统部署场景。需要注意的是,war包部署时,内置Tomcat的端口配置不会生效。
507 17
Spring Boot 两种部署到服务器的方式

热门文章

最新文章

推荐镜像

更多