Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)

简介: Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)

在Spring Boot集成Kafka时,大家都知道可以使用@KafkaListener注解创建消费者。但是@KafkaListener注解是静态的,意味着在编译时就已经确定了消费者,无法动态地创建消费者。

不过事实上,使用Kafka提供的Java API,使用KafkaConsumer类就可以完成消费者的动态创建。

我们也知道在一个消费者组中,同一条消息只会被消费一次。而动态创建消费者的情景也通常是满足动态的发布订阅模型(一个发布者,但是可能有不定量的消费者),所以在这里我们使每个动态创建的消费者的消费者组也不一样即可。

今天我就来分享一下Spring Boot集成Kafka时动态地创建消费者以及动态删除Topic的实现。

1,动态创建消费者

(1) 创建消费者对象

我们可以定义一个“消费者工厂”类,专门用于创建Kafka消费者对象,如下:

package com.gitee.swsk33.kafkadynamicconsumer.factory;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.Properties;

@Component
public class KafkaDynamicConsumerFactory {
   

    @Autowired
    private KafkaProperties kafkaProperties;

    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeSerializerClassName;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeSerializerClassName;

    /**
     * 创建一个Kafka消费者
     *
     * @param topic   消费者订阅的话题
     * @param groupId 消费者组名
     * @return 消费者对象
     */
    public <K, V> KafkaConsumer<K, V> createConsumer(String topic, String groupId) throws ClassNotFoundException {
   
        Properties consumerProperties = new Properties();
        // 设定一些关于新的消费者的配置信息
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        // 设定新的消费者的组名
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        // 设定反序列化方式
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Class.forName(keyDeSerializerClassName));
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Class.forName(valueDeSerializerClassName));
        // 设定信任所有类型以反序列化
        consumerProperties.put("spring.json.trusted.packages", "*");
        // 新建一个消费者
        KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerProperties);
        // 使这个消费者订阅对应话题
        consumer.subscribe(Collections.singleton(topic));
        return consumer;
    }

}

可见这里我们注入了配置文件中反序列化的配置,并用于新创建的消费者对象。

(2) 使用定时任务实现消费者实时订阅

上面仅仅是创建了消费者,但是消费者接收消息以及处理消息的操作,也是需要我们手动定义的。

如何让创建的消费者都去不停的接收并处理我们的消息呢?大致思路如下:

  • 使用定时任务,在定时任务中使消费者不停地接收并处理消息
  • 与此同时,将每个定时任务和消费者都存起来,后面在消费者不需要的时候可以移除它们并关闭定时任务

这里,我们编写一个上下文类,用于存放所有的消费者定时任务,并编写增加和移除定时任务的方法:

package com.gitee.swsk33.kafkadynamicconsumer.context;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.*;

/**
 * Kafka消费者任务上下文
 */
public class KafkaConsumerContext {
   

    /**
     * 存放所有自己创建的Kafka消费者任务
     * key: groupId
     * value: kafka消费者任务
     */
    private static final Map<String, KafkaConsumer<?, ?>> consumerMap = new ConcurrentHashMap<>();

    /**
     * 存放所有定时任务的哈希表
     * key: groupId
     * value: 定时任务对象,用于定时执行kafka消费者的消息消费任务
     */
    private static final Map<String, ScheduledFuture<?>> scheduleMap = new ConcurrentHashMap<>();

    /**
     * 任务调度器,用于定时任务
     */
    private static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(24);

    /**
     * 添加一个Kafka消费者任务
     *
     * @param groupId  消费者的组名
     * @param consumer 消费者对象
     * @param <K>      消息键类型
     * @param <V>      消息值类型
     */
    public static <K, V> void addConsumerTask(String groupId, KafkaConsumer<K, V> consumer) {
   
        // 先存入消费者以便于后续管理
        consumerMap.put(groupId, consumer);
        // 创建定时任务,每隔1s拉取消息并处理
        ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
   
            // 每次执行拉取消息之前,先检查订阅者是否已被取消(如果订阅者不存在于订阅者列表中说明被取消了)
            // 因为Kafka消费者对象是非线程安全的,因此在这里把取消订阅的逻辑和拉取并处理消息的逻辑写在一起并放入定时器中,判断列表中是否存在消费者对象来确定是否取消任务
            if (!consumerMap.containsKey(groupId)) {
   
                // 取消订阅并关闭消费者
                consumer.unsubscribe();
                consumer.close();
                // 关闭定时任务
                scheduleMap.remove(groupId).cancel(true);
                return;
            }
            // 拉取消息
            ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<K, V> record : records) {
   
                // 自定义处理每次拉取的消息逻辑
                System.out.println(record.value());
            }
        }, 0, 1, TimeUnit.SECONDS);
        // 将任务存入对应的列表以后续管理
        scheduleMap.put(groupId, future);
    }

    /**
     * 移除Kafka消费者定时任务并关闭消费者订阅
     *
     * @param groupId 消费者的组名
     */
    public static void removeConsumerTask(String groupId) {
   
        if (!consumerMap.containsKey(groupId)) {
   
            return;
        }
        // 从列表中移除消费者
        consumerMap.remove(groupId);
    }

}

在增加消费者定时任务的方法中,调用消费者对象的poll方法能够拉取一次消息,一次通常可能拉取到多条消息,遍历并处理即可。这样在定时任务中,我们每隔一段时间就拉取一次消息并处理,就实现了消费者实时订阅消息的效果。

除此之外,在使用定时任务时,即ScheduledExecutorService对象的scheduleAtFixedRate方法,可以实现每隔一定的时间执行一次任务,上述第一个参数传入Runnable接口的实现类,这里使用匿名内部类传入,即自定义的任务,第二个参数是启动延迟时间,第三个参数是每隔多长时间重复执行任务,第四个参数是时间单位。该方法返回一个任务对象,通过这个对象的cancel方法可以取消掉任务。

可见这里,在定时任务中,每次拉取消息之前先判断消费者是否还存在于列表中,以确定消费者是否被取消。为什么要这么操作呢?

因为Kafka的消费者对象是非线程安全的,而ScheduledExecutorService底层使用的是线程池来完成定时任务,如果说我们把取消消费者订阅的逻辑写在另一个方法中,就会导致有两个线程同时操作Kafka消费者,从而抛出异常(定时器线程一直在操作消费者拉取消息,取消订阅又是从定时器之外的线程操作的,这就有两个线程),使得我们不能正常地关闭消费者。(异常内容:kafkaconsumer is not safe for multi-threaded access

所以这里,我把拉取消息逻辑和取消订阅逻辑都写在了一起放在一个定时任务中,使得拉取消息和取消订阅者的操作都是在同一线程(即定时器中线程)执行,而判断是否要取消订阅者的依据就是检查该订阅者是否从订阅者列表中被移除。

(3) 编写个API测试

现在编写一个API测试一下效果:

package com.gitee.swsk33.kafkadynamicconsumer.api;

import com.gitee.swsk33.kafkadynamicconsumer.context.KafkaConsumerContext;
import com.gitee.swsk33.kafkadynamicconsumer.factory.KafkaDynamicConsumerFactory;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 消息测试api
 */
@RestController
@RequestMapping("/api/kafka")
public class KafkaTestAPI {
   

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private KafkaDynamicConsumerFactory factory;

    @GetMapping("/send")
    public String send() {
   
        kafkaTemplate.send("my-topic", "hello!");
        return "发送完成!";
    }

    @GetMapping("/create/{groupId}")
    public String create(@PathVariable String groupId) throws ClassNotFoundException {
   
        // 这里统一使用一个topic
        KafkaConsumer<String, String> consumer = factory.createConsumer("my-topic", groupId);
        KafkaConsumerContext.addConsumerTask(groupId, consumer);
        return "创建成功!";
    }

    @GetMapping("/remove/{groupId}")
    public String remove(@PathVariable String groupId) {
   
        KafkaConsumerContext.removeConsumerTask(groupId);
        return "移除成功!";
    }

}

现在依次访问/api/kafka/create/a/api/kafka/create/b,就创建了两个消费者,然后访问/api/kafka/send发送消息,结果如下:

image.png

可见,两个消费者都接收到了消息。

2,动态删除Topic

在Spring Boot集成Kafka时,默认情况下向一个Topic发送了消息,若这个Topic不存在则会自动创建。不过如果创建的Topic多了,并且后续不再使用,那会占用服务器资源。

不过,我们也可以通过Kafka库中的AdminClient类实现对Topic的删除。

(1) 配置AdminClient的Bean

创建一个配置类,并在其中配置一个AdminClient类型的Bean,如下:

package com.gitee.swsk33.kafkadynamicconsumer.config;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

@Configuration
public class KafkaAdminConfig {
   

    /**
     * 读取kafka地址配置
     */
    @Value("${spring.kafka.bootstrap-servers}")
    private String kafkaServerURL;

    /**
     * 注入一个kafka管理实例
     *
     * @return kafka管理对象
     */
    @Bean
    public AdminClient adminClient() {
   
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerURL);
        return AdminClient.create(properties);
    }

}

可见这里先是从配置文件中读取了配置的Kafka服务器地址,然后通过这个地址配置,创建一个AdminClient对象作为Bean即可。

(2) 使用AdminClient删除

删除也很简单,在需要删除的地方自动装配一个AdminClient对象即可,这里创建一个API试试:

package com.gitee.swsk33.kafkadynamicconsumer.api;

import org.apache.kafka.clients.admin.AdminClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Collections;

@RestController
@RequestMapping("/api/kafka-topic")
public class KafkaTopicAPI {
   

    /**
     * 在需要删除Topic的地方自动装配AdminClient对象
     */
    @Autowired
    private AdminClient adminClient;

    @GetMapping("/delete/{topicId}")
    public String deleteTopic(@PathVariable String topicId) {
   
        adminClient.deleteTopics(Collections.singleton(topicId));
        return "删除Topic完成!";
    }

}

调用AdminClient对象的deleteTopics方法,即可完成删除Topic操作,参数是传入Topic(主题)名称列表,这里只传入一个Topic名,因此使用Collections.singleton方法将这一个名称变成列表形式。

在上述“动态创建消费者”过程中,我们对my-topic主题中发送了消息,因此现在kafka中,有一个my-topic的主题:

image.png

使用kafka自带的kafka-topics.sh脚本就可以查看kafka中的所有Topic,这里kafka是使用Docker部署的,因此上述命令使用docker exec调用的其中的脚本。

好的,现在启动程序后访问接口/api/kafka-topic/delete/my-topic,然后再次查看kafka中的主题:

image.png

可见我们成功地删除了my-topic这个主题。

需要注意的是,如果某个Topic还正在被至少一个消费者订阅着,这个Topic将无法被删除! 所以要删除一个Topic之前请先确保其现在没有被任何消费者订阅。

3,总结

可见要动态地创建Kafka消费者,只需创建并设置好Kafka消费者对象,并使用定时任务使它们一直拉取消息,就可以实现发布订阅的效果。当然,我们要管理好创建的所有的消费者和定时任务,防止资源浪费。

上述示例仓库地址:传送门

相关文章
|
1月前
|
消息中间件 负载均衡 大数据
揭秘Kafka背后的秘密!再均衡如何上演一场消费者组的‘权力游戏’,让消息处理秒变高能剧情?
【8月更文挑战第24天】Kafka是一款在大数据处理领域备受推崇的产品,以其出色的性能和可扩展性著称。本文通过一个具体案例介绍其核心机制之一——再均衡(Rebalancing)。案例中,“user_activity”主题下10个分区被3个消费者均衡消费。当新消费者加入或原有消费者离开时,Kafka将自动触发再均衡过程,确保所有消费者能有效处理分配给它们的分区。
124 62
|
1月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
113 58
|
1天前
|
XML Java 关系型数据库
springboot 集成 mybatis-plus 代码生成器
本文介绍了如何在Spring Boot项目中集成MyBatis-Plus代码生成器,包括导入相关依赖坐标、配置快速代码生成器以及自定义代码生成器模板的步骤和代码示例,旨在提高开发效率,快速生成Entity、Mapper、Mapper XML、Service、Controller等代码。
springboot 集成 mybatis-plus 代码生成器
|
1天前
|
Java Spring
springboot 集成 swagger 2.x 和 3.0 以及 Failed to start bean ‘documentationPluginsBootstrapper‘问题的解决
本文介绍了如何在Spring Boot项目中集成Swagger 2.x和3.0版本,并提供了解决Swagger在Spring Boot中启动失败问题“Failed to start bean ‘documentationPluginsBootstrapper’; nested exception is java.lang.NullPointerEx”的方法,包括配置yml文件和Spring Boot版本的降级。
springboot 集成 swagger 2.x 和 3.0 以及 Failed to start bean ‘documentationPluginsBootstrapper‘问题的解决
|
30天前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
39 3
|
1月前
|
消息中间件 负载均衡 Kafka
【Kafka消费秘籍】深入了解消费者组与独立模式,掌握消息消费的两种超能力!
【8月更文挑战第24天】Apache Kafka是一款高性能的分布式消息系统,支持灵活多样的消费模型以适应不同的应用场景。消息按主题组织,每个主题可划分为多个分区,确保消息顺序性。本文深入探讨了Kafka中的两大核心消费模式:消费者组(Consumer Group)和独立消费者(Standalone Consumer)。消费者组允许多个消费者协同工作,实现负载均衡及故障恢复,是最常用的消费模式。独立消费者模式则适用于需要高度定制化处理逻辑的场景,如消息重放等。通过对比这两种模式的特点和提供的示例代码,开发者可以根据具体需求选择最合适的消费策略,从而更好地利用Kafka构建高效的数据流应用程序。
36 3
|
1月前
|
消息中间件 Java Kafka
|
1月前
|
NoSQL 关系型数据库 MySQL
SpringBoot 集成 SpringSecurity + MySQL + JWT 附源码,废话不多直接盘
SpringBoot 集成 SpringSecurity + MySQL + JWT 附源码,废话不多直接盘
88 2
|
28天前
|
图形学 C# 开发者
全面掌握Unity游戏开发核心技术:C#脚本编程从入门到精通——详解生命周期方法、事件处理与面向对象设计,助你打造高效稳定的互动娱乐体验
【8月更文挑战第31天】Unity 是一款强大的游戏开发平台,支持多种编程语言,其中 C# 最为常用。本文介绍 C# 在 Unity 中的应用,涵盖脚本生命周期、常用函数、事件处理及面向对象编程等核心概念。通过具体示例,展示如何编写有效的 C# 脚本,包括 Start、Update 和 LateUpdate 等生命周期方法,以及碰撞检测和类继承等高级技巧,帮助开发者掌握 Unity 脚本编程基础,提升游戏开发效率。
38 0
|
28天前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
85 0