springboot整合kafka消费者最佳实践

简介: springboot整合kafka消费者最佳实践

前言

在当今信息爆炸的时代,实时数据处理已经成为许多应用的核心需求。而Kafka作为一个高性能的消息队列系统,为我们提供了一个实现实时数据处理的利器。而Spring Boot,则是一个强大的框架,可以帮助我们快速搭建应用。本文将带你进入这个充满挑战的领域,探索如何利用Spring Boot整合Kafka实现消费者,实现实时数据的处理和分析。

基础maven包引入

<spring.boot.version>2.7.8</spring.boot.version>
<kafka.version>2.8.2</kafka.version>
<netty.version>4.1.73.Final</netty.version>
<lombok.version>1.18.24</lombok.version>
<!-- 版本号自己添加 -->
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
</dependency>
<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
</dependency>

基于监听实现

KafkaConsumerConfig

package fun.acowbo.config;
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
/**
 * @author xiaobo
 */
@Data
@Configuration
@EnableKafka
@ConfigurationProperties(prefix = "spring.kafka.consumer")
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    private String groupId;
    private String autoCommitInterval;
    private String autoOffsetReset;
    private String enableAutoCommit;
    private String keyDeserializer;
    private String valueDeserializer;
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        return new DefaultKafkaConsumerFactory<>(props);
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 启用批量消费
        factory.setBatchListener(true);
        // 设置并发消费者数量
        factory.setConcurrency(3);
        return factory;
    }
}

现在让我解释一下这个方法的主要内容:

  1. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 创建了一个 ConcurrentKafkaListenerContainerFactory 的实例,用于配置 Kafka 消息监听容器的工厂。
  2. factory.setConsumerFactory(consumerFactory()); 设置了消费者工厂,即将上一步定义的 consumerFactory() 方法返回的消费者工厂设置给消息监听容器工厂,以便创建 Kafka 消费者实例。
  3. factory.setBatchListener(true); 启用了批量消费模式。设置为 true 表示消息监听器将以批量的方式处理接收到的消息,从而提高消费效率。
  4. factory.setConcurrency(3); 设置了并发消费者数量为 3。这表示每个监听容器会创建 3 个消费者实例来并发地处理消息,以提高消息处理的吞吐量。这里要看你的分区数是多少,如果是3那么刚好,如果是2,就有多余,浪费。
  5. return factory; 返回了配置完成的消息监听容器工厂实例。

总的来说,这个方法的作用是创建并配置一个 Kafka 消息监听容器工厂,设置了消费者工厂、启用了批量消费模式,并设置了并发消费者数量,以便提高消息处理的效率和吞吐量。

service实现

package fun.acowbo.service;
import fun.acowbo.utils.BoCommonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
public class BatchKafkaConsumer {
    public static final String CONSUMER_FILE_PATH = "/Users/xiaobo/Downloads/con-group.log";
    @KafkaListener(topics = "acowbo", groupId = "consumer-two-group", containerFactory = "kafkaListenerContainerFactory")
    // , Acknowledgment ack
    public void listen(List<ConsumerRecord<String, String>> records) {
        log.info("Received {} records" , records.size());
        for (ConsumerRecord<String, String> record : records) {
            try {
                // 消息消费处理
                BoCommonUtil.writeFile(record.value(), CONSUMER_FILE_PATH);
            } catch (Exception e) {
                log.error("消费失败!{}",e.getMessage());
            }
        }
        // ack.acknowledge(); // 手动提交位移
    }
}

基于主动提交实现

ConsumerTwoConfig

package fun.acowbo.config;
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
/**
 * @author todoitbo
 * @date 2024/3/15
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.kafka.consumer")
public class ConsumerTwoConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    private String groupId;
    private String autoOffsetReset;
    private String enableAutoCommit;
    private String keyDeserializer;
    private String valueDeserializer;
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

service

package fun.acowbo.service;
import fun.acowbo.utils.BoCommonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.Collections;
/**
 * @author todoitbo
 * @date 2024/3/15
 */
@Slf4j
@Service
public class ConsumerTwoService {
    @Resource
    private ConsumerFactory<String, String> consumerFactory;
    public static final String CONSUMER_FILE_PATH = "/Users/xiaobo/Downloads/con-two.log";
    public void pollMessages() {
        Consumer<String, String> consumer = consumerFactory.createConsumer();
        new Thread(() -> {
            try {
                consumer.subscribe(Collections.singletonList("acowbo"));
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    // 处理拉取到的消息
                    records.forEach(record ->
                    {
                        try {
                            BoCommonUtil.writeFile(record.key() + record.value(), CONSUMER_FILE_PATH);
                        } catch (Exception e) {
                            log.error("Received message: key = {}, value = {}", record.key(), record.value());
                        }
                    });
                    // 使用异步提交规避阻塞
                    consumer.commitAsync();
                }
            } catch (Exception e) {
                log.error("consumer error", e);
            } finally {
                try {
                    // 最后一次提交使用同步阻塞式提交
                    consumer.commitSync();
                } finally {
                    consumer.close();
                }
            }
        }).start();
    }
}

在 Kafka 消费者中,使用 consumer.commitAsync()consumer.commitSync() 来手动提交消费位移(offset)的方法。让我逐步解释它们的作用以及在生产环境中的好处:

consumer.commitAsync() 方法:
  • consumer.commitAsync() 是异步提交消费位移的方法。它会在后台线程中提交消费位移,而不会阻塞当前线程的执行。
  • 异步提交消费位移可以提高消费者的性能,因为它不会在每次消费之后都等待确认提交,而是直接继续处理后续的消息。
  • 使用异步提交可以避免因提交位移而导致的长时间阻塞,从而提高系统的响应速度。
consumer.commitSync() 方法:
  • consumer.commitSync() 是同步阻塞式提交消费位移的方法。它会阻塞当前线程,直到消费位移提交成功或失败,并返回提交结果。
  • 同步阻塞式提交可以确保消费位移提交的可靠性,因为它会等待确认提交后再继续执行后续的操作。
  • 在应用程序即将关闭时,使用同步阻塞式提交可以确保所有的消费位移都已经成功提交,避免因程序异常退出而导致的消息重复消费或丢失。
使用 commitAsync()commitSync() 的好处:
  1. 性能优化: 异步提交避免了每次消费都等待确认提交的性能开销,提高了消费者的吞吐量和处理效率。
  2. 可靠性保证: 同步阻塞式提交确保了消费位移提交的可靠性,避免了因程序异常退出而导致的消息丢失或重复消费的问题。
  3. 灵活性和控制: 可以根据具体的业务场景和需求选择适合的提交方式,平衡系统的性能和可靠性。

在生产环境中,使用 commitAsync()commitSync() 可以避免因消费位移未提交或提交失败而导致的消息丢失或重复消费的问题,同时确保消费者的性能和可靠性。

一起用

在这段代码中,同时使用了 consumer.commitAsync()consumer.commitSync() 是为了兼顾性能和可靠性:

  1. 使用 consumer.commitAsync()
  • 在消息处理过程中,使用异步提交可以提高消费者的性能,因为它不会阻塞当前线程,而是在后台线程中提交消费位移,继续处理后续的消息。
  • 异步提交适用于普通的消息处理场景,可以在一定程度上降低提交位移的延迟,提高系统的吞吐量。
  1. 使用 consumer.commitSync()
  • 在消费者即将关闭时,需要确保所有的消费位移都已经成功提交,避免因程序异常退出而导致的消息丢失或重复消费的问题。
  • 同步阻塞式提交可以确保消费位移提交的可靠性,它会等待确认提交后再继续执行后续的操作,确保消费位移的提交不会因为异常而中断。

在最后一次提交消费位移时,使用了 consumer.commitSync(),是为了确保程序即将关闭时的消费位移提交的可靠性。虽然同步阻塞式提交会阻塞当前线程,但在应用即将关闭的情况下,可以接受这种阻塞以保证数据的完整性和一致性。

相关文章
|
17天前
|
消息中间件 监控 大数据
优化Apache Kafka性能:最佳实践与调优策略
【10月更文挑战第24天】作为一名已经对Apache Kafka有所了解并有实际使用经验的开发者,我深知在大数据处理和实时数据流传输中,Kafka的重要性不言而喻。然而,在面对日益增长的数据量和业务需求时,如何保证系统的高性能和稳定性成为了摆在我们面前的一个挑战。本文将从我的个人视角出发,分享一些关于如何通过合理的配置和调优来提高Kafka性能的经验和建议。
49 4
|
17天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
49 2
|
3月前
|
消息中间件 负载均衡 大数据
揭秘Kafka背后的秘密!再均衡如何上演一场消费者组的‘权力游戏’,让消息处理秒变高能剧情?
【8月更文挑战第24天】Kafka是一款在大数据处理领域备受推崇的产品,以其出色的性能和可扩展性著称。本文通过一个具体案例介绍其核心机制之一——再均衡(Rebalancing)。案例中,“user_activity”主题下10个分区被3个消费者均衡消费。当新消费者加入或原有消费者离开时,Kafka将自动触发再均衡过程,确保所有消费者能有效处理分配给它们的分区。
136 62
|
1月前
|
JSON 缓存 Java
优雅至极!Spring Boot 3.3 中 ObjectMapper 的最佳实践
【10月更文挑战第5天】在Spring Boot的开发中,ObjectMapper作为Jackson框架的核心组件,扮演着处理JSON格式数据的核心角色。它不仅能够将Java对象与JSON字符串进行相互转换,还支持复杂的Java类型,如泛型、嵌套对象、集合等。在Spring Boot 3.3中,通过优雅地配置和使用ObjectMapper,我们可以更加高效地处理JSON数据,提升开发效率和代码质量。本文将从ObjectMapper的基本功能、配置方法、最佳实践以及性能优化等方面进行详细探讨。
63 2
|
1月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
32 1
|
1月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
65 2
|
23天前
|
消息中间件 监控 Java
Spring Boot 3.3 后台任务处理:最佳实践与高效策略
【10月更文挑战第10天】 在现代应用程序中,后台任务处理对于提高应用程序的响应性和吞吐量至关重要。Spring Boot 3.3提供了多种机制来实现高效的后台任务处理,包括异步方法、任务调度和使用消息队列等。本文将探讨这些机制的最佳实践和高效策略。
66 0
|
3月前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
90 3
|
3月前
|
消息中间件 负载均衡 Kafka
【Kafka消费秘籍】深入了解消费者组与独立模式,掌握消息消费的两种超能力!
【8月更文挑战第24天】Apache Kafka是一款高性能的分布式消息系统,支持灵活多样的消费模型以适应不同的应用场景。消息按主题组织,每个主题可划分为多个分区,确保消息顺序性。本文深入探讨了Kafka中的两大核心消费模式:消费者组(Consumer Group)和独立消费者(Standalone Consumer)。消费者组允许多个消费者协同工作,实现负载均衡及故障恢复,是最常用的消费模式。独立消费者模式则适用于需要高度定制化处理逻辑的场景,如消息重放等。通过对比这两种模式的特点和提供的示例代码,开发者可以根据具体需求选择最合适的消费策略,从而更好地利用Kafka构建高效的数据流应用程序。
84 3
|
3月前
|
图形学 C# 开发者
全面掌握Unity游戏开发核心技术:C#脚本编程从入门到精通——详解生命周期方法、事件处理与面向对象设计,助你打造高效稳定的互动娱乐体验
【8月更文挑战第31天】Unity 是一款强大的游戏开发平台,支持多种编程语言,其中 C# 最为常用。本文介绍 C# 在 Unity 中的应用,涵盖脚本生命周期、常用函数、事件处理及面向对象编程等核心概念。通过具体示例,展示如何编写有效的 C# 脚本,包括 Start、Update 和 LateUpdate 等生命周期方法,以及碰撞检测和类继承等高级技巧,帮助开发者掌握 Unity 脚本编程基础,提升游戏开发效率。
76 0