springboot整合kafka消费者最佳实践

简介: springboot整合kafka消费者最佳实践

前言

在当今信息爆炸的时代,实时数据处理已经成为许多应用的核心需求。而Kafka作为一个高性能的消息队列系统,为我们提供了一个实现实时数据处理的利器。而Spring Boot,则是一个强大的框架,可以帮助我们快速搭建应用。本文将带你进入这个充满挑战的领域,探索如何利用Spring Boot整合Kafka实现消费者,实现实时数据的处理和分析。

基础maven包引入

<spring.boot.version>2.7.8</spring.boot.version>
<kafka.version>2.8.2</kafka.version>
<netty.version>4.1.73.Final</netty.version>
<lombok.version>1.18.24</lombok.version>
<!-- 版本号自己添加 -->
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
</dependency>
<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
</dependency>

基于监听实现

KafkaConsumerConfig

package fun.acowbo.config;
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
/**
 * @author xiaobo
 */
@Data
@Configuration
@EnableKafka
@ConfigurationProperties(prefix = "spring.kafka.consumer")
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    private String groupId;
    private String autoCommitInterval;
    private String autoOffsetReset;
    private String enableAutoCommit;
    private String keyDeserializer;
    private String valueDeserializer;
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        return new DefaultKafkaConsumerFactory<>(props);
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 启用批量消费
        factory.setBatchListener(true);
        // 设置并发消费者数量
        factory.setConcurrency(3);
        return factory;
    }
}

现在让我解释一下这个方法的主要内容:

  1. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 创建了一个 ConcurrentKafkaListenerContainerFactory 的实例,用于配置 Kafka 消息监听容器的工厂。
  2. factory.setConsumerFactory(consumerFactory()); 设置了消费者工厂,即将上一步定义的 consumerFactory() 方法返回的消费者工厂设置给消息监听容器工厂,以便创建 Kafka 消费者实例。
  3. factory.setBatchListener(true); 启用了批量消费模式。设置为 true 表示消息监听器将以批量的方式处理接收到的消息,从而提高消费效率。
  4. factory.setConcurrency(3); 设置了并发消费者数量为 3。这表示每个监听容器会创建 3 个消费者实例来并发地处理消息,以提高消息处理的吞吐量。这里要看你的分区数是多少,如果是3那么刚好,如果是2,就有多余,浪费。
  5. return factory; 返回了配置完成的消息监听容器工厂实例。

总的来说,这个方法的作用是创建并配置一个 Kafka 消息监听容器工厂,设置了消费者工厂、启用了批量消费模式,并设置了并发消费者数量,以便提高消息处理的效率和吞吐量。

service实现

package fun.acowbo.service;
import fun.acowbo.utils.BoCommonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
public class BatchKafkaConsumer {
    public static final String CONSUMER_FILE_PATH = "/Users/xiaobo/Downloads/con-group.log";
    @KafkaListener(topics = "acowbo", groupId = "consumer-two-group", containerFactory = "kafkaListenerContainerFactory")
    // , Acknowledgment ack
    public void listen(List<ConsumerRecord<String, String>> records) {
        log.info("Received {} records" , records.size());
        for (ConsumerRecord<String, String> record : records) {
            try {
                // 消息消费处理
                BoCommonUtil.writeFile(record.value(), CONSUMER_FILE_PATH);
            } catch (Exception e) {
                log.error("消费失败!{}",e.getMessage());
            }
        }
        // ack.acknowledge(); // 手动提交位移
    }
}

基于主动提交实现

ConsumerTwoConfig

package fun.acowbo.config;
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
/**
 * @author todoitbo
 * @date 2024/3/15
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.kafka.consumer")
public class ConsumerTwoConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    private String groupId;
    private String autoOffsetReset;
    private String enableAutoCommit;
    private String keyDeserializer;
    private String valueDeserializer;
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

service

package fun.acowbo.service;
import fun.acowbo.utils.BoCommonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.Collections;
/**
 * @author todoitbo
 * @date 2024/3/15
 */
@Slf4j
@Service
public class ConsumerTwoService {
    @Resource
    private ConsumerFactory<String, String> consumerFactory;
    public static final String CONSUMER_FILE_PATH = "/Users/xiaobo/Downloads/con-two.log";
    public void pollMessages() {
        Consumer<String, String> consumer = consumerFactory.createConsumer();
        new Thread(() -> {
            try {
                consumer.subscribe(Collections.singletonList("acowbo"));
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    // 处理拉取到的消息
                    records.forEach(record ->
                    {
                        try {
                            BoCommonUtil.writeFile(record.key() + record.value(), CONSUMER_FILE_PATH);
                        } catch (Exception e) {
                            log.error("Received message: key = {}, value = {}", record.key(), record.value());
                        }
                    });
                    // 使用异步提交规避阻塞
                    consumer.commitAsync();
                }
            } catch (Exception e) {
                log.error("consumer error", e);
            } finally {
                try {
                    // 最后一次提交使用同步阻塞式提交
                    consumer.commitSync();
                } finally {
                    consumer.close();
                }
            }
        }).start();
    }
}

在 Kafka 消费者中,使用 consumer.commitAsync()consumer.commitSync() 来手动提交消费位移(offset)的方法。让我逐步解释它们的作用以及在生产环境中的好处:

consumer.commitAsync() 方法:
  • consumer.commitAsync() 是异步提交消费位移的方法。它会在后台线程中提交消费位移,而不会阻塞当前线程的执行。
  • 异步提交消费位移可以提高消费者的性能,因为它不会在每次消费之后都等待确认提交,而是直接继续处理后续的消息。
  • 使用异步提交可以避免因提交位移而导致的长时间阻塞,从而提高系统的响应速度。
consumer.commitSync() 方法:
  • consumer.commitSync() 是同步阻塞式提交消费位移的方法。它会阻塞当前线程,直到消费位移提交成功或失败,并返回提交结果。
  • 同步阻塞式提交可以确保消费位移提交的可靠性,因为它会等待确认提交后再继续执行后续的操作。
  • 在应用程序即将关闭时,使用同步阻塞式提交可以确保所有的消费位移都已经成功提交,避免因程序异常退出而导致的消息重复消费或丢失。
使用 commitAsync()commitSync() 的好处:
  1. 性能优化: 异步提交避免了每次消费都等待确认提交的性能开销,提高了消费者的吞吐量和处理效率。
  2. 可靠性保证: 同步阻塞式提交确保了消费位移提交的可靠性,避免了因程序异常退出而导致的消息丢失或重复消费的问题。
  3. 灵活性和控制: 可以根据具体的业务场景和需求选择适合的提交方式,平衡系统的性能和可靠性。

在生产环境中,使用 commitAsync()commitSync() 可以避免因消费位移未提交或提交失败而导致的消息丢失或重复消费的问题,同时确保消费者的性能和可靠性。

一起用

在这段代码中,同时使用了 consumer.commitAsync()consumer.commitSync() 是为了兼顾性能和可靠性:

  1. 使用 consumer.commitAsync()
  • 在消息处理过程中,使用异步提交可以提高消费者的性能,因为它不会阻塞当前线程,而是在后台线程中提交消费位移,继续处理后续的消息。
  • 异步提交适用于普通的消息处理场景,可以在一定程度上降低提交位移的延迟,提高系统的吞吐量。
  1. 使用 consumer.commitSync()
  • 在消费者即将关闭时,需要确保所有的消费位移都已经成功提交,避免因程序异常退出而导致的消息丢失或重复消费的问题。
  • 同步阻塞式提交可以确保消费位移提交的可靠性,它会等待确认提交后再继续执行后续的操作,确保消费位移的提交不会因为异常而中断。

在最后一次提交消费位移时,使用了 consumer.commitSync(),是为了确保程序即将关闭时的消费位移提交的可靠性。虽然同步阻塞式提交会阻塞当前线程,但在应用即将关闭的情况下,可以接受这种阻塞以保证数据的完整性和一致性。

相关文章
|
1月前
|
消息中间件 监控 大数据
优化Apache Kafka性能:最佳实践与调优策略
【10月更文挑战第24天】作为一名已经对Apache Kafka有所了解并有实际使用经验的开发者,我深知在大数据处理和实时数据流传输中,Kafka的重要性不言而喻。然而,在面对日益增长的数据量和业务需求时,如何保证系统的高性能和稳定性成为了摆在我们面前的一个挑战。本文将从我的个人视角出发,分享一些关于如何通过合理的配置和调优来提高Kafka性能的经验和建议。
90 4
|
9天前
|
存储 安全 Java
Spring Boot 编写 API 的 10条最佳实践
本文总结了 10 个编写 Spring Boot API 的最佳实践,包括 RESTful API 设计原则、注解使用、依赖注入、异常处理、数据传输对象(DTO)建模、安全措施、版本控制、文档生成、测试策略以及监控和日志记录。每个实践都配有详细的编码示例和解释,帮助开发者像专业人士一样构建高质量的 API。
|
28天前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
66 5
|
1月前
|
安全 JavaScript Java
SpringBoot解决跨域最佳实践
本文介绍了跨域问题的起因及最佳实践,重点讲解了SpringBoot中如何利用`CorsFilter`解决跨域问题。首先解释了由于浏览器的同源策略限制导致的跨域现象,然后提出了在服务端入口处解决跨域问题的建议,最后详细展示了三种SpringBoot中配置跨域的方法:使用默认配置、自定义配置规则以及通过配置文件管理跨域设置,以适应不同的应用场景。
|
1月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
44 1
|
1月前
|
Java 测试技术 数据库连接
使用Spring Boot编写测试用例:实践与最佳实践
使用Spring Boot编写测试用例:实践与最佳实践
70 0
|
1月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
85 2
|
2月前
|
JSON 缓存 Java
优雅至极!Spring Boot 3.3 中 ObjectMapper 的最佳实践
【10月更文挑战第5天】在Spring Boot的开发中,ObjectMapper作为Jackson框架的核心组件,扮演着处理JSON格式数据的核心角色。它不仅能够将Java对象与JSON字符串进行相互转换,还支持复杂的Java类型,如泛型、嵌套对象、集合等。在Spring Boot 3.3中,通过优雅地配置和使用ObjectMapper,我们可以更加高效地处理JSON数据,提升开发效率和代码质量。本文将从ObjectMapper的基本功能、配置方法、最佳实践以及性能优化等方面进行详细探讨。
177 2
|
2月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
47 1
|
2月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
79 2