Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现

简介: Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现

概述


在实际应用中,往往需要根据业务需求动态开启/关闭Kafka消费者监听。例如,在某些时间段内,可能需要暂停对某个Topic的消费,或者在某些条件下才开启对某个Topic的消费。


在Spring Boot中,要实现动态的控制或关闭消费以及动态开启或关闭监听,可以使用Spring Kafka提供的一些功能。




思路

首先,需要配置Kafka消费者的相关属性。在Spring Boot中,可以通过在application.properties或application.yml文件中添加相应的配置来实现。


以下是一个示例配置:

spring.kafka.consumer.bootstrap-servers=<Kafka服务器地址>
spring.kafka.consumer.group-id=<消费者组ID>


接下来,可以创建一个Kafka消费者,使用@KafkaListener注解来指定要监听的Kafka主题,并编写相应的消息处理方法。例如:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
    @KafkaListener(topics = "<Kafka主题>")
    public void receive(String message) {
        // 处理接收到的消息
    }
}


现在,你可以使用以下两种方法来控制或关闭消费以及动态开启或关闭监听:


方法1:使用@KafkaListener注解的autoStartup属性

@KafkaListener注解具有一个名为autoStartup的属性,可以用于控制是否自动启动消费者。默认情况下,它的值为true,表示自动启动。如果将其设置为false,则消费者将不会自动启动。


@KafkaListener(topics = "<Kafka主题>", autoStartup = "false")
public void receive(String message) {
    // 处理接收到的消息
}


要在运行时动态启动消费者,你可以通过KafkaListenerEndpointRegistry bean来手动启动:

@Autowired
private KafkaListenerEndpointRegistry endpointRegistry;
// 启动消费者
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").start();


同样,你也可以使用stop()方法来停止消费者:

// 停止消费者
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").stop();


方法2:使用KafkaListenerEndpointRegistry bean的pause()resume()方法

KafkaListenerEndpointRegistry bean提供了pause()resume()方法,用于暂停和恢复消费者的监听。

@Autowired
private KafkaListenerEndpointRegistry endpointRegistry;
// 暂停消费者监听
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").pause();
// 恢复消费者监听
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").resume();

使用这些方法,可以在运行时动态地控制或关闭消费,以及动态地开启或关闭监听。


Code

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
/**
 * @author artisan
 */
@Slf4j
@Configuration
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServer;
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;
    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;
    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;
    @Value("${spring.kafka.consumer.group-id}")
    private String group_id;
    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;
    @Value("${spring.kafka.consumer.max-poll-interval-ms}")
    private String maxPollIntervalMs;
    @Value("${spring.kafka.listener.concurrency}")
    private Integer concurrency;
    private final String consumerInterceptor = "net.zf.module.system.kafka.interceptor.FailureRateInterceptor";
    /**
     * 消费者配置信息
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>(32);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,maxPollIntervalMs);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,group_id);
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,consumerInterceptor );
        return props;
    }
    /**
     * 消费者批量工厂
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setBatchListener(true);
        factory.setConcurrency(concurrency);
        return factory;
    }
    /**
     * 异常处理器
     *
     * @return
     */
    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() {
        return (message, exception, consumer) -> {
//            log.error("消息{} , 异常原因{}", message, exception.getMessage());
            log.error("consumerAwareListenerErrorHandler called");
            return null;
        };
    }
}


使用

   @KafkaListener(topicPattern = KafkaTopicConstant.ATTACK_MESSAGE + ".*",
            containerFactory = "batchFactory",
            errorHandler = "consumerAwareListenerErrorHandler",
            id = "attackConsumer")
    public void processMessage(List<String> records, Acknowledgment ack)  {
        log.info("AttackKafkaConsumer 当前线程 {} , 本次拉取的数据总量:{} ", Thread.currentThread().getId(), records.size());
        try {
            List<AttackMessage> attackMessages = new ArrayList();
            records.stream().forEach(record -> {
                messageExecutorFactory.process(KafkaTopicConstant.ATTACK_MESSAGE).execute(record, attackMessages);
            });
            if (!attackMessages.isEmpty()) {
                attackMessageESService.addDocuments(attackMessages, false);
            }
        } finally {
            ack.acknowledge();
        }
    }

在这段代码中,@KafkaListener注解表示这是一个Kafka消费者,


   topicPattern参数指定了该消费者要监听的主题的模式,即以 KafkaTopicConstant.ATTACK_MESSAGE开头的所有主题。


   containerFactory参数指定了用于创建Kafka监听器容器的工厂类别名。


   errorHandler参数指定了用于处理监听器抛出异常的错误处理器。id参数指定了该消费者的ID。


在该消费者的方法中,当有消息到达时,records参数将包含一组消息记录,ack参数用于手动确认已经消费了这些消息。


在方法中,首先记录了当前线程ID和拉取的数据总量。将消息记录逐一处理,并将处理结果存储在一个名为attackMessages的列表中。如果列表不为空,则将其添加到ES搜索引擎中。


最后,手动确认已经消费了这些消息。


【控制】

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class KafkaConsumerController {
    @Autowired
    private KafkaListenerEndpointRegistry registry;
    /**
     * 开启监听
     */
    @GetMapping("/start")
    public void start() {
        // 判断监听容器是否启动,未启动则将其启动
        if (!registry.getListenerContainer("attackConsumer").isRunning()) {
            log.info("start  ");
            registry.getListenerContainer("attackConsumer").start();
        }
        // 将其恢复
        registry.getListenerContainer("attackConsumer").resume();
        log.info("resume over ");
    }
    /**
     * 关闭监听
     */
    @GetMapping("/pause")
    public void pause() {
        // 暂停监听
        registry.getListenerContainer("attackConsumer").pause();
        log.info("pause");
    }
}



扩展


KafkaListenerEndpointRegistry


KafkaListenerEndpointRegistry是 Spring Kafka 提供的一个组件,用于管理 Kafka 消费者监听器的注册和启动。它是一个接口,提供了管理 Kafka 监听器容器的方法,如注册和启动监听器容器,以及暂停和恢复监听器容器等。


在 Spring Boot 应用程序中使用 @KafkaListener 注解时,Spring Kafka 会自动创建一个 KafkaListenerEndpointRegistry 实例,并使用它来管理所有的 Kafka 监听器容器。 它是 Spring Kafka 中的一个核心组件,用于实现 Kafka 消费者的监听和控制。


相关文章
|
5月前
|
消息中间件 监控 Java
Apache Kafka 分布式流处理平台技术详解与实践指南
本文档全面介绍 Apache Kafka 分布式流处理平台的核心概念、架构设计和实践应用。作为高吞吐量、低延迟的分布式消息系统,Kafka 已成为现代数据管道和流处理应用的事实标准。本文将深入探讨其生产者-消费者模型、主题分区机制、副本复制、流处理API等核心机制,帮助开发者构建可靠、可扩展的实时数据流处理系统。
526 4
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
996 7
|
7月前
|
消息中间件 存储 监控
Apache Kafka 3.0与KRaft模式的革新解读
在该架构中,Kafka集群依旧包含多个broker节点,但已不再依赖ZooKeeper集群。被选中的Kafka集群Controller将从KRaft Quorum中加载其状态,并在必要时通知其他Broker节点关于元数据的变更。这种设计支持更多分区与快速Controller切换,并有效避免了因数据不一致导致的问题。
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
693 5
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
581 1
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
消息中间件 监控 Kafka
Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面
随着大数据技术的发展,Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件的修改、启动命令、API 示例代码等,帮助你快速上手并有效管理 Kafka 集群。
257 0
|
4月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
758 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
448 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
|
6月前
|
SQL 人工智能 数据挖掘
Apache Flink:从实时数据分析到实时AI
Apache Flink 是实时数据处理领域的核心技术,历经十年发展,已从学术项目成长为实时计算的事实标准。它在现代数据架构中发挥着关键作用,支持实时数据分析、湖仓集成及实时 AI 应用。随着 Flink 2.0 的发布,其在流式湖仓、AI 驱动决策等方面展现出强大潜力,正推动企业迈向智能化、实时化的新阶段。
775 9
Apache Flink:从实时数据分析到实时AI

推荐镜像

更多