Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现

简介: Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现

概述


在实际应用中,往往需要根据业务需求动态开启/关闭Kafka消费者监听。例如,在某些时间段内,可能需要暂停对某个Topic的消费,或者在某些条件下才开启对某个Topic的消费。


在Spring Boot中,要实现动态的控制或关闭消费以及动态开启或关闭监听,可以使用Spring Kafka提供的一些功能。




思路

首先,需要配置Kafka消费者的相关属性。在Spring Boot中,可以通过在application.properties或application.yml文件中添加相应的配置来实现。


以下是一个示例配置:

spring.kafka.consumer.bootstrap-servers=<Kafka服务器地址>
spring.kafka.consumer.group-id=<消费者组ID>


接下来,可以创建一个Kafka消费者,使用@KafkaListener注解来指定要监听的Kafka主题,并编写相应的消息处理方法。例如:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
    @KafkaListener(topics = "<Kafka主题>")
    public void receive(String message) {
        // 处理接收到的消息
    }
}


现在,你可以使用以下两种方法来控制或关闭消费以及动态开启或关闭监听:


方法1:使用@KafkaListener注解的autoStartup属性

@KafkaListener注解具有一个名为autoStartup的属性,可以用于控制是否自动启动消费者。默认情况下,它的值为true,表示自动启动。如果将其设置为false,则消费者将不会自动启动。


@KafkaListener(topics = "<Kafka主题>", autoStartup = "false")
public void receive(String message) {
    // 处理接收到的消息
}


要在运行时动态启动消费者,你可以通过KafkaListenerEndpointRegistry bean来手动启动:

@Autowired
private KafkaListenerEndpointRegistry endpointRegistry;
// 启动消费者
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").start();


同样,你也可以使用stop()方法来停止消费者:

// 停止消费者
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").stop();


方法2:使用KafkaListenerEndpointRegistry bean的pause()resume()方法

KafkaListenerEndpointRegistry bean提供了pause()resume()方法,用于暂停和恢复消费者的监听。

@Autowired
private KafkaListenerEndpointRegistry endpointRegistry;
// 暂停消费者监听
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").pause();
// 恢复消费者监听
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").resume();

使用这些方法,可以在运行时动态地控制或关闭消费,以及动态地开启或关闭监听。


Code

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
/**
 * @author artisan
 */
@Slf4j
@Configuration
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServer;
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;
    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;
    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;
    @Value("${spring.kafka.consumer.group-id}")
    private String group_id;
    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;
    @Value("${spring.kafka.consumer.max-poll-interval-ms}")
    private String maxPollIntervalMs;
    @Value("${spring.kafka.listener.concurrency}")
    private Integer concurrency;
    private final String consumerInterceptor = "net.zf.module.system.kafka.interceptor.FailureRateInterceptor";
    /**
     * 消费者配置信息
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>(32);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,maxPollIntervalMs);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,group_id);
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,consumerInterceptor );
        return props;
    }
    /**
     * 消费者批量工厂
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setBatchListener(true);
        factory.setConcurrency(concurrency);
        return factory;
    }
    /**
     * 异常处理器
     *
     * @return
     */
    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() {
        return (message, exception, consumer) -> {
//            log.error("消息{} , 异常原因{}", message, exception.getMessage());
            log.error("consumerAwareListenerErrorHandler called");
            return null;
        };
    }
}


使用

   @KafkaListener(topicPattern = KafkaTopicConstant.ATTACK_MESSAGE + ".*",
            containerFactory = "batchFactory",
            errorHandler = "consumerAwareListenerErrorHandler",
            id = "attackConsumer")
    public void processMessage(List<String> records, Acknowledgment ack)  {
        log.info("AttackKafkaConsumer 当前线程 {} , 本次拉取的数据总量:{} ", Thread.currentThread().getId(), records.size());
        try {
            List<AttackMessage> attackMessages = new ArrayList();
            records.stream().forEach(record -> {
                messageExecutorFactory.process(KafkaTopicConstant.ATTACK_MESSAGE).execute(record, attackMessages);
            });
            if (!attackMessages.isEmpty()) {
                attackMessageESService.addDocuments(attackMessages, false);
            }
        } finally {
            ack.acknowledge();
        }
    }

在这段代码中,@KafkaListener注解表示这是一个Kafka消费者,


   topicPattern参数指定了该消费者要监听的主题的模式,即以 KafkaTopicConstant.ATTACK_MESSAGE开头的所有主题。


   containerFactory参数指定了用于创建Kafka监听器容器的工厂类别名。


   errorHandler参数指定了用于处理监听器抛出异常的错误处理器。id参数指定了该消费者的ID。


在该消费者的方法中,当有消息到达时,records参数将包含一组消息记录,ack参数用于手动确认已经消费了这些消息。


在方法中,首先记录了当前线程ID和拉取的数据总量。将消息记录逐一处理,并将处理结果存储在一个名为attackMessages的列表中。如果列表不为空,则将其添加到ES搜索引擎中。


最后,手动确认已经消费了这些消息。


【控制】

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class KafkaConsumerController {
    @Autowired
    private KafkaListenerEndpointRegistry registry;
    /**
     * 开启监听
     */
    @GetMapping("/start")
    public void start() {
        // 判断监听容器是否启动,未启动则将其启动
        if (!registry.getListenerContainer("attackConsumer").isRunning()) {
            log.info("start  ");
            registry.getListenerContainer("attackConsumer").start();
        }
        // 将其恢复
        registry.getListenerContainer("attackConsumer").resume();
        log.info("resume over ");
    }
    /**
     * 关闭监听
     */
    @GetMapping("/pause")
    public void pause() {
        // 暂停监听
        registry.getListenerContainer("attackConsumer").pause();
        log.info("pause");
    }
}



扩展


KafkaListenerEndpointRegistry


KafkaListenerEndpointRegistry是 Spring Kafka 提供的一个组件,用于管理 Kafka 消费者监听器的注册和启动。它是一个接口,提供了管理 Kafka 监听器容器的方法,如注册和启动监听器容器,以及暂停和恢复监听器容器等。


在 Spring Boot 应用程序中使用 @KafkaListener 注解时,Spring Kafka 会自动创建一个 KafkaListenerEndpointRegistry 实例,并使用它来管理所有的 Kafka 监听器容器。 它是 Spring Kafka 中的一个核心组件,用于实现 Kafka 消费者的监听和控制。


相关文章
|
2月前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
133 7
|
2月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
102 5
|
2月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
103 4
|
2月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
72 5
|
2月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
56 1
|
2月前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
2月前
|
消息中间件 监控 Kafka
Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面
随着大数据技术的发展,Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件的修改、启动命令、API 示例代码等,帮助你快速上手并有效管理 Kafka 集群。
55 0
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
111 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
63 1
|
5月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
364 9

推荐镜像

更多