springboot整合kafka消费者最佳实践

简介: springboot整合kafka消费者最佳实践

前言

在当今信息爆炸的时代,实时数据处理已经成为许多应用的核心需求。而Kafka作为一个高性能的消息队列系统,为我们提供了一个实现实时数据处理的利器。而Spring Boot,则是一个强大的框架,可以帮助我们快速搭建应用。本文将带你进入这个充满挑战的领域,探索如何利用Spring Boot整合Kafka实现消费者,实现实时数据的处理和分析。

基础maven包引入

<spring.boot.version>2.7.8</spring.boot.version>
<kafka.version>2.8.2</kafka.version>
<netty.version>4.1.73.Final</netty.version>
<lombok.version>1.18.24</lombok.version>
<!-- 版本号自己添加 -->
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
</dependency>
<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
</dependency>

基于监听实现

KafkaConsumerConfig

package fun.acowbo.config;
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
/**
 * @author xiaobo
 */
@Data
@Configuration
@EnableKafka
@ConfigurationProperties(prefix = "spring.kafka.consumer")
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    private String groupId;
    private String autoCommitInterval;
    private String autoOffsetReset;
    private String enableAutoCommit;
    private String keyDeserializer;
    private String valueDeserializer;
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        return new DefaultKafkaConsumerFactory<>(props);
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 启用批量消费
        factory.setBatchListener(true);
        // 设置并发消费者数量
        factory.setConcurrency(3);
        return factory;
    }
}

现在让我解释一下这个方法的主要内容:

  1. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 创建了一个 ConcurrentKafkaListenerContainerFactory 的实例,用于配置 Kafka 消息监听容器的工厂。
  2. factory.setConsumerFactory(consumerFactory()); 设置了消费者工厂,即将上一步定义的 consumerFactory() 方法返回的消费者工厂设置给消息监听容器工厂,以便创建 Kafka 消费者实例。
  3. factory.setBatchListener(true); 启用了批量消费模式。设置为 true 表示消息监听器将以批量的方式处理接收到的消息,从而提高消费效率。
  4. factory.setConcurrency(3); 设置了并发消费者数量为 3。这表示每个监听容器会创建 3 个消费者实例来并发地处理消息,以提高消息处理的吞吐量。这里要看你的分区数是多少,如果是3那么刚好,如果是2,就有多余,浪费。
  5. return factory; 返回了配置完成的消息监听容器工厂实例。

总的来说,这个方法的作用是创建并配置一个 Kafka 消息监听容器工厂,设置了消费者工厂、启用了批量消费模式,并设置了并发消费者数量,以便提高消息处理的效率和吞吐量。

service实现

package fun.acowbo.service;
import fun.acowbo.utils.BoCommonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
public class BatchKafkaConsumer {
    public static final String CONSUMER_FILE_PATH = "/Users/xiaobo/Downloads/con-group.log";
    @KafkaListener(topics = "acowbo", groupId = "consumer-two-group", containerFactory = "kafkaListenerContainerFactory")
    // , Acknowledgment ack
    public void listen(List<ConsumerRecord<String, String>> records) {
        log.info("Received {} records" , records.size());
        for (ConsumerRecord<String, String> record : records) {
            try {
                // 消息消费处理
                BoCommonUtil.writeFile(record.value(), CONSUMER_FILE_PATH);
            } catch (Exception e) {
                log.error("消费失败!{}",e.getMessage());
            }
        }
        // ack.acknowledge(); // 手动提交位移
    }
}

基于主动提交实现

ConsumerTwoConfig

package fun.acowbo.config;
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
/**
 * @author todoitbo
 * @date 2024/3/15
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.kafka.consumer")
public class ConsumerTwoConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    private String groupId;
    private String autoOffsetReset;
    private String enableAutoCommit;
    private String keyDeserializer;
    private String valueDeserializer;
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

service

package fun.acowbo.service;
import fun.acowbo.utils.BoCommonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.Collections;
/**
 * @author todoitbo
 * @date 2024/3/15
 */
@Slf4j
@Service
public class ConsumerTwoService {
    @Resource
    private ConsumerFactory<String, String> consumerFactory;
    public static final String CONSUMER_FILE_PATH = "/Users/xiaobo/Downloads/con-two.log";
    public void pollMessages() {
        Consumer<String, String> consumer = consumerFactory.createConsumer();
        new Thread(() -> {
            try {
                consumer.subscribe(Collections.singletonList("acowbo"));
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    // 处理拉取到的消息
                    records.forEach(record ->
                    {
                        try {
                            BoCommonUtil.writeFile(record.key() + record.value(), CONSUMER_FILE_PATH);
                        } catch (Exception e) {
                            log.error("Received message: key = {}, value = {}", record.key(), record.value());
                        }
                    });
                    // 使用异步提交规避阻塞
                    consumer.commitAsync();
                }
            } catch (Exception e) {
                log.error("consumer error", e);
            } finally {
                try {
                    // 最后一次提交使用同步阻塞式提交
                    consumer.commitSync();
                } finally {
                    consumer.close();
                }
            }
        }).start();
    }
}

在 Kafka 消费者中,使用 consumer.commitAsync()consumer.commitSync() 来手动提交消费位移(offset)的方法。让我逐步解释它们的作用以及在生产环境中的好处:

consumer.commitAsync() 方法:
  • consumer.commitAsync() 是异步提交消费位移的方法。它会在后台线程中提交消费位移,而不会阻塞当前线程的执行。
  • 异步提交消费位移可以提高消费者的性能,因为它不会在每次消费之后都等待确认提交,而是直接继续处理后续的消息。
  • 使用异步提交可以避免因提交位移而导致的长时间阻塞,从而提高系统的响应速度。
consumer.commitSync() 方法:
  • consumer.commitSync() 是同步阻塞式提交消费位移的方法。它会阻塞当前线程,直到消费位移提交成功或失败,并返回提交结果。
  • 同步阻塞式提交可以确保消费位移提交的可靠性,因为它会等待确认提交后再继续执行后续的操作。
  • 在应用程序即将关闭时,使用同步阻塞式提交可以确保所有的消费位移都已经成功提交,避免因程序异常退出而导致的消息重复消费或丢失。
使用 commitAsync()commitSync() 的好处:
  1. 性能优化: 异步提交避免了每次消费都等待确认提交的性能开销,提高了消费者的吞吐量和处理效率。
  2. 可靠性保证: 同步阻塞式提交确保了消费位移提交的可靠性,避免了因程序异常退出而导致的消息丢失或重复消费的问题。
  3. 灵活性和控制: 可以根据具体的业务场景和需求选择适合的提交方式,平衡系统的性能和可靠性。

在生产环境中,使用 commitAsync()commitSync() 可以避免因消费位移未提交或提交失败而导致的消息丢失或重复消费的问题,同时确保消费者的性能和可靠性。

一起用

在这段代码中,同时使用了 consumer.commitAsync()consumer.commitSync() 是为了兼顾性能和可靠性:

  1. 使用 consumer.commitAsync()
  • 在消息处理过程中,使用异步提交可以提高消费者的性能,因为它不会阻塞当前线程,而是在后台线程中提交消费位移,继续处理后续的消息。
  • 异步提交适用于普通的消息处理场景,可以在一定程度上降低提交位移的延迟,提高系统的吞吐量。
  1. 使用 consumer.commitSync()
  • 在消费者即将关闭时,需要确保所有的消费位移都已经成功提交,避免因程序异常退出而导致的消息丢失或重复消费的问题。
  • 同步阻塞式提交可以确保消费位移提交的可靠性,它会等待确认提交后再继续执行后续的操作,确保消费位移的提交不会因为异常而中断。

在最后一次提交消费位移时,使用了 consumer.commitSync(),是为了确保程序即将关闭时的消费位移提交的可靠性。虽然同步阻塞式提交会阻塞当前线程,但在应用即将关闭的情况下,可以接受这种阻塞以保证数据的完整性和一致性。

相关文章
|
1天前
|
消息中间件 Java Kafka
springboot集成kafka
springboot集成kafka
9 2
|
19小时前
|
消息中间件 Java Kafka
Spring Boot与Kafka的集成应用
Spring Boot与Kafka的集成应用
|
20小时前
|
存储 Java 数据库
Spring Boot与分布式事务的最佳实践
Spring Boot与分布式事务的最佳实践
|
20小时前
|
消息中间件 Java Kafka
使用Spring Boot和Kafka实现高效消息队列
使用Spring Boot和Kafka实现高效消息队列
|
20小时前
|
缓存 NoSQL Java
Spring Boot整合Redis缓存的最佳实践
Spring Boot整合Redis缓存的最佳实践
|
20小时前
|
消息中间件 Java Kafka
Spring Boot与Apache Kafka集成的深度指南
Spring Boot与Apache Kafka集成的深度指南
|
20小时前
|
Java 机器人 程序员
Spring Boot中集成ZooKeeper的最佳实践
Spring Boot中集成ZooKeeper的最佳实践
|
20小时前
|
消息中间件 Java Kafka
教程:Spring Boot集成Kafka Streams流处理框架
教程:Spring Boot集成Kafka Streams流处理框架
|
4天前
|
消息中间件 存储 Java
后端开发Spring框架之消息介绍 同步异步 JMS AMQP MQTT Kafka介绍
后端开发Spring框架之消息介绍 同步异步 JMS AMQP MQTT Kafka介绍
6 0
|
12天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之通过flink同步kafka数据进到doris,decimal数值类型的在kafka是正常显示数值,但是同步到doris表之后数据就变成了整数,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章