一起来学kafka之整合SpringBoot深入使用(三)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 前言目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~本节给大家讲一下Kafka中ACK的概念并结合代码一起实践一下~好了, 废话不多说直接开整吧~ACK 是什么首先,我们得知道ack到底是啥,上节给大家讲手动提交offset的时候提到了ack,但没有展开讲,下面就给大家好好理一下。

前言

目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~

本节给大家讲一下Kafka中ACK的概念并结合代码一起实践一下~

好了, 废话不多说直接开整吧~

ACK 是什么

首先,我们得知道ack到底是啥,上节给大家讲手动提交offset的时候提到了ack,但没有展开讲,下面就给大家好好理一下。

Kafka中,ack(即Acknowledgement)是指消费者在成功消费一条消息后,向Kafka集群确认消息已经被成功消费的方式。Kafka提供了三种ACK级别

  • acks=0:生产者发送消息后,不等待任何确认,直接发送下一条消息。
  • acks=1:生产者发送消息后,等待leader节点成功写入消息后返回确认,然后发送下一条消息。
  • acks=all:生产者发送消息后,等待所有的follower节点和leader节点都成功写入消息后返回确认,然后发送下一条消息。

其中,acks=all级别提供了最高的消息可靠性,但会有较高的延迟。

大致流程如下:

生产者Kafka集群发送一条消息,并指定acks=allKafka集群中有一个leader节点两个follower节点。当leader节点收到消息后,它会将消息写入本地磁盘,并向follower节点发送写入请求。follower节点收到请求后,也会将消息写入本地磁盘并向leader节点返回确认。只有当所有节点都成功写入消息后,leader节点才会向生产者返回确认,表示消息已经被成功写入Kafka集群。生产者收到确认后,才会发送下一条消息。

这种ack机制保证了消息被成功写入Kafka集群后才会被认为是已经被消费,从而保证了消息的可靠性。

代码实践

下面我们就结合springboot代码给大家实际讲解一下,配置参数方式有很多,你可以在yml配置文件里添加,这里给大家介绍在配置类中进行配置的方式

新建一个KafkaProducerConfig

@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "1"); // 设置ack参数为1
        return props;
    }
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

在上面的代码中,我们创建了一个名为producerConfigsbean,其中包含了bootstrap.servers、key.serializer、value.serializeracks等配置项。然后,我们创建了一个名为producerFactorybean,并将producerConfigs传递给DefaultKafkaProducerFactory的构造方法。最后,我们创建了一个名为kafkaTemplatebean,并将producerFactory传递给它的构造方法。

当我们向Kafka发送消息时,KafkaTemplate将使用producerFactory创建一个KafkaProducer对象,并使用我们在producerConfigs中设置的配置项来配置KafkaProducer。我们将acks参数设置为1,表示生产者只需等待主题的分区副本收到消息即可确认发送成功。

消费者端,我们可以使用enable.auto.commit配置项来控制自动提交消费位移的行为,从而实现对消费消息的确认。

enable.auto.commit配置项为true时,消费者会自动提交消费位移,这意味着消费者无法控制何时确认消费消息。如果消费者在处理消息时发生错误,那么消费位移也会被提交,导致这些消息无法重新消费。因此,建议在需要保证消息可靠性的场景下关闭enable.auto.commit配置项。

enable.auto.commit配置项为false时,消费者需要手动提交消费位移,这样,消费者就可以在成功处理消息后再提交消费位移,从而实现对消费消息的确认。上节有带大家实现过

接着,看消费端的代码,新建一个AckController

@Slf4j
@RestController
public class AckController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    /**
     * 手动提交偏移量
     */
    @GetMapping("/hello")
    public String hello() throws Exception {
        // 发送消息
        for (int i = 0; i < 10; i++) {
            String message = "Message " + i;
            kafkaTemplate.send("topic1", message);
            log.info("Sent message: {}", message);
            Thread.sleep(1000);
        }
        return "hello";
    }
    @KafkaListener(topics = "topic1", groupId = "my-group", containerFactory = "kafkaListenerContainerFactory")
    public void onMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        log.info("listen Received message >>> {}", record);
        acknowledgment.acknowledge();
    }
}

2023-03-24 10:22:03.240  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 0
2023-03-24 10:22:03.250  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 87, CreateTime = 1679624523236, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 0)
2023-03-24 10:22:04.241  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 1
2023-03-24 10:22:04.255  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 88, CreateTime = 1679624524241, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 1)
2023-03-24 10:22:05.254  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 2
2023-03-24 10:22:05.258  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 89, CreateTime = 1679624525254, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 2)
2023-03-24 10:22:06.269  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 3
2023-03-24 10:22:06.274  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 90, CreateTime = 1679624526269, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 3)
2023-03-24 10:22:07.277  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 4
2023-03-24 10:22:07.285  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 91, CreateTime = 1679624527277, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 4)
2023-03-24 10:22:08.292  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 5
2023-03-24 10:22:08.296  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 92, CreateTime = 1679624528292, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 5)
2023-03-24 10:22:09.304  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 6
2023-03-24 10:22:09.308  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 93, CreateTime = 1679624529304, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 6)
2023-03-24 10:22:10.317  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 7
2023-03-24 10:22:10.321  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 94, CreateTime = 1679624530317, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 7)
2023-03-24 10:22:11.331  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 8
2023-03-24 10:22:11.335  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 95, CreateTime = 1679624531331, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 8)
2023-03-24 10:22:12.346  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 9
2023-03-24 10:22:12.349  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 96, CreateTime = 1679624532345, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 9)

需要注意的是,手动提交消费位移会降低消费者的处理速度,因为每次处理完一条消息后都需要提交位移。因此,在需要高吞吐量的场景下,建议使用批量提交消费位移的方式来提高消费者的处理速度。

批量提交消费位移

其实很简单,首先需要改下KafkaConfig,在kafkaListenerContainerFactory添加如下代码:

//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
// 设置消息转换器
factory.setMessageConverter(new StringJsonMessageConverter());

在上述代码中,设置了一个消息转换器StringJsonMessageConverter,用于将JSON格式的消息转换为Java对象。这里我们使用了Spring Kafka提供的默认消息转换器

更改消费端:


/**
     * 批量提交消费位移
     * @param records
     * @param acknowledgment
     */
    @KafkaListener(topics = "topic1", groupId = "my-group", containerFactory = "kafkaListenerContainerFactory")
    public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) {
        for (ConsumerRecord<String, String> record : records) {
            String message = record.value();
            // 处理消息
            log.info("onMessage: {}", message);
        }
        acknowledgment.acknowledge();
    }

需要注意的是,使用批量处理消息时,需要确保在处理完所有消息后再调用acknowledge方法提交消费位移,否则可能会出现一些消息重复消费丢失的情况。

结束语

Kafka专题讲解到这里就结束了,还有一些细节性的东西留给大家自己去探索。下期给大家讲Java设计模式,很多同学写代码都是大量的if/else要么就是switch/case都没有想过运用设计模式来解决问题,导致代码后期难以维护和扩展设计模式还有一个好处就是代码重构,前期你可以说业务敢,急着上线,等业务稳定下来了,回过头来可以考虑重构部分的代码实现,如果你一直抱着能不动就不动`的心里,成长的将会很慢,因为你思考的少~

本着把自己知道的都告诉大家,如果本文对您有所帮助,点赞+关注鼓励一下呗~

相关文章

项目源码(源码已更新 欢迎star⭐️)

ElasticSearch 专题学习

项目源码(源码已更新 欢迎star⭐️)

往期并发编程内容推荐

推荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)

博客(阅读体验较佳)






















相关文章
|
4天前
|
消息中间件 Java Kafka
Springboot集成高低版本kafka
Springboot集成高低版本kafka
|
8月前
|
消息中间件 负载均衡 Java
Kafka与SpringBoot的整合使用
Kafka与SpringBoot的整合使用
174 0
|
9月前
|
消息中间件 存储 NoSQL
kafka整合springboot以及核心参数的使用
kafka整合springboot以及核心参数的使用
304 0
|
10月前
|
消息中间件 缓存 NoSQL
手把手教你云相册项目简易开发 day1 Kafka+IDEA+Springboot+Redis+MySQL+libvips 简单运行和使用
手把手教你云相册项目简易开发 day1 Kafka+IDEA+Springboot+Redis+MySQL+libvips 简单运行和使用
145 0
|
6月前
|
消息中间件 数据可视化 Java
消息中间件系列教程(22) -Kafka- SpringBoot集成Kafka
消息中间件系列教程(22) -Kafka- SpringBoot集成Kafka
72 0
|
4天前
|
消息中间件 SQL druid
最新版 springboot集成kafka
最新版 springboot集成kafka
36 0
|
9月前
|
消息中间件 数据可视化 Java
SpringBoot3集成Kafka
SpringBoot3集成KafkaKafka是一个开源的分布式事件流平台,常被用于高性能数据管道、流分析、数据集成和关键任务应用,基于Zookeeper协调的处理平台,也是一种消息系统,具有更好的吞吐量、内置分区、复制和容错。
216 0
|
4天前
|
消息中间件 Java Kafka
spring boot 集成kafka
spring boot 集成kafka
56 0
spring boot 集成kafka
|
4天前
|
消息中间件 Java 关系型数据库
【Spring Boot+Kafka+Mysql+HBase】实现分布式优惠券后台应用系统(附源码)
【Spring Boot+Kafka+Mysql+HBase】实现分布式优惠券后台应用系统(附源码)
102 2
|
8月前
|
消息中间件 Java Kafka
SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)
SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)
935 0