概述
【依赖】
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
【配置】
#kafka spring.kafka.bootstrap-servers=10.11.114.247:9092 spring.kafka.producer.acks=1 spring.kafka.producer.retries=3 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.consumer.group-id=zfprocessor_group spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=com.artisan.common.entity.messages spring.kafka.consumer.max-poll-records=500 spring.kafka.consumer.fetch-min-size=10 spring.kafka.consumer.fetch-max-wait=10000ms spring.kafka.listener.missing-topics-fatal=false spring.kafka.listener.type=batch spring.kafka.listener.ack-mode=manual logging.level.org.springframework.kafka=ERROR logging.level.org.apache.kafka=ERROR
Spring-kafka生产者源码流程
ListenableFuture<SendResult<Object, Object>> result = kafkaTemplate.send(TOPICA.TOPIC, messageMock);
主要的源码流程如下
Spring-kafka消费者源码流程(@EnableKafka
和@KafkaListener
)
消费的话,比较复杂
@KafkaListener(topics = TOPICA.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPICA.TOPIC) public void onMessage(MessageMock messageMock){ logger.info("【接受到消息][线程:{} 消息内容:{}]", Thread.currentThread().getName(), messageMock); }
划重点,主要关注
Flow
作为引子,我们继续来梳理下源码
继续
继续
KafkaBootstrapConfiguration的主要功能是创建两个bean
KafkaListenerAnnotationBeanPostProcessor
实现了如下接口
implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton
主要功能就是监听@KafkaListener注解 。 bean的后置处理器 需要重写 postProcessAfterInitialization
@Override public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { if (!this.nonAnnotatedClasses.contains(bean.getClass())) { // 获取对应的class Class<?> targetClass = AopUtils.getTargetClass(bean); // 查找类是否有@KafkaListener注解 Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass); final boolean hasClassLevelListeners = classLevelListeners.size() > 0; final List<Method> multiMethods = new ArrayList<>(); // 查找类中方法上是否有对应的@KafkaListener注解, Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> { Set<KafkaListener> listenerMethods = findListenerAnnotations(method); return (!listenerMethods.isEmpty() ? listenerMethods : null); }); if (hasClassLevelListeners) { Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass, (ReflectionUtils.MethodFilter) method -> AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null); multiMethods.addAll(methodsWithHandler); } if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(bean.getClass()); this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass()); } else { // Non-empty set of methods for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) { Method method = entry.getKey(); for (KafkaListener listener : entry.getValue()) { // 处理@KafkaListener注解 重点看 processKafkaListener(listener, method, bean, beanName); } } this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '" + beanName + "': " + annotatedMethods); } if (hasClassLevelListeners) { processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName); } } return bean; }
重点方法
protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) { Method methodToUse = checkProxy(method, bean); MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>(); endpoint.setMethod(methodToUse); processListener(endpoint, kafkaListener, bean, methodToUse, beanName); }
继续 processListener
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean, Object adminTarget, String beanName) { String beanRef = kafkaListener.beanRef(); if (StringUtils.hasText(beanRef)) { this.listenerScope.addListener(beanRef, bean); } // 构建 endpoint endpoint.setBean(bean); endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory); endpoint.setId(getEndpointId(kafkaListener)); endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId())); endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener)); endpoint.setTopics(resolveTopics(kafkaListener)); endpoint.setTopicPattern(resolvePattern(kafkaListener)); endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix")); String group = kafkaListener.containerGroup(); if (StringUtils.hasText(group)) { Object resolvedGroup = resolveExpression(group); if (resolvedGroup instanceof String) { endpoint.setGroup((String) resolvedGroup); } } String concurrency = kafkaListener.concurrency(); if (StringUtils.hasText(concurrency)) { endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency")); } String autoStartup = kafkaListener.autoStartup(); if (StringUtils.hasText(autoStartup)) { endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup")); } resolveKafkaProperties(endpoint, kafkaListener.properties()); endpoint.setSplitIterables(kafkaListener.splitIterables()); KafkaListenerContainerFactory<?> factory = null; String containerFactoryBeanName = resolve(kafkaListener.containerFactory()); if (StringUtils.hasText(containerFactoryBeanName)) { Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name"); try { factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class); } catch (NoSuchBeanDefinitionException ex) { throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget + "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName() + " with id '" + containerFactoryBeanName + "' was found in the application context", ex); } } endpoint.setBeanFactory(this.beanFactory); String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler"); if (StringUtils.hasText(errorHandlerBeanName)) { endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class)); } // 将endpoint注册到registrar this.registrar.registerEndpoint(endpoint, factory); if (StringUtils.hasText(beanRef)) { this.listenerScope.removeListener(beanRef); } }
继续看 registerEndpoint
public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) { Assert.notNull(endpoint, "Endpoint must be set"); Assert.hasText(endpoint.getId(), "Endpoint id must be set"); // Factory may be null, we defer the resolution right before actually creating the container // 把endpoint封装为KafkaListenerEndpointDescriptor KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory); synchronized (this.endpointDescriptors) { if (this.startImmediately) { // Register and start immediately this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor), true); } else { // 将descriptor添加到endpointDescriptors this.endpointDescriptors.add(descriptor); } } }
总的来看: 得到一个含有KafkaListener基本信息的Endpoint,将Endpoint被封装到KafkaListenerEndpointDescriptor,KafkaListenerEndpointDescriptor被添加到KafkaListenerEndpointRegistrar.endpointDescriptors中,至此这部分的流程结束了,感觉没有下文呀。
KafkaListenerEndpointRegistrar.endpointDescriptors 这个List中的数据怎么用呢?
public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {}
KafkaListenerEndpointRegistrar 实现了 InitializingBean 接口,重写 afterPropertiesSet,该方法会在bean实例化完成后执行
@Override public void afterPropertiesSet() { registerAllEndpoints(); }
继续 registerAllEndpoints();
protected void registerAllEndpoints() { synchronized (this.endpointDescriptors) { // 遍历KafkaListenerEndpointDescriptor for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) { // 注册 this.endpointRegistry.registerListenerContainer( descriptor.endpoint, resolveContainerFactory(descriptor)); } this.startImmediately = true; // trigger immediate startup } }
继续
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) { registerListenerContainer(endpoint, factory, false); }
go
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory, boolean startImmediately) { Assert.notNull(endpoint, "Endpoint must not be null"); Assert.notNull(factory, "Factory must not be null"); String id = endpoint.getId(); Assert.hasText(id, "Endpoint id must not be empty"); synchronized (this.listenerContainers) { Assert.state(!this.listenerContainers.containsKey(id), "Another endpoint is already registered with id '" + id + "'"); // 创建Endpoint对应的MessageListenerContainer,将创建好的MessageListenerContainer放入listenerContainers MessageListenerContainer container = createListenerContainer(endpoint, factory); this.listenerContainers.put(id, container); // 如果KafkaListener注解中有对应的group信息,则将container添加到对应的group中 if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) { List<MessageListenerContainer> containerGroup; if (this.applicationContext.containsBean(endpoint.getGroup())) { containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class); } else { containerGroup = new ArrayList<MessageListenerContainer>(); this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup); } containerGroup.add(container); } if (startImmediately) { startIfNecessary(container); } } }