一起来学kafka之整合SpringBoot深入使用(三)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 前言目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~本节给大家讲一下Kafka中ACK的概念并结合代码一起实践一下~好了, 废话不多说直接开整吧~ACK 是什么首先,我们得知道ack到底是啥,上节给大家讲手动提交offset的时候提到了ack,但没有展开讲,下面就给大家好好理一下。

前言

目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~

本节给大家讲一下Kafka中ACK的概念并结合代码一起实践一下~

好了, 废话不多说直接开整吧~

ACK 是什么

首先,我们得知道ack到底是啥,上节给大家讲手动提交offset的时候提到了ack,但没有展开讲,下面就给大家好好理一下。

Kafka中,ack(即Acknowledgement)是指消费者在成功消费一条消息后,向Kafka集群确认消息已经被成功消费的方式。Kafka提供了三种ACK级别

  • acks=0:生产者发送消息后,不等待任何确认,直接发送下一条消息。
  • acks=1:生产者发送消息后,等待leader节点成功写入消息后返回确认,然后发送下一条消息。
  • acks=all:生产者发送消息后,等待所有的follower节点和leader节点都成功写入消息后返回确认,然后发送下一条消息。

其中,acks=all级别提供了最高的消息可靠性,但会有较高的延迟。

大致流程如下:

生产者Kafka集群发送一条消息,并指定acks=allKafka集群中有一个leader节点两个follower节点。当leader节点收到消息后,它会将消息写入本地磁盘,并向follower节点发送写入请求。follower节点收到请求后,也会将消息写入本地磁盘并向leader节点返回确认。只有当所有节点都成功写入消息后,leader节点才会向生产者返回确认,表示消息已经被成功写入Kafka集群。生产者收到确认后,才会发送下一条消息。

这种ack机制保证了消息被成功写入Kafka集群后才会被认为是已经被消费,从而保证了消息的可靠性。

代码实践

下面我们就结合springboot代码给大家实际讲解一下,配置参数方式有很多,你可以在yml配置文件里添加,这里给大家介绍在配置类中进行配置的方式

新建一个KafkaProducerConfig

@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "1"); // 设置ack参数为1
        return props;
    }
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

在上面的代码中,我们创建了一个名为producerConfigsbean,其中包含了bootstrap.servers、key.serializer、value.serializeracks等配置项。然后,我们创建了一个名为producerFactorybean,并将producerConfigs传递给DefaultKafkaProducerFactory的构造方法。最后,我们创建了一个名为kafkaTemplatebean,并将producerFactory传递给它的构造方法。

当我们向Kafka发送消息时,KafkaTemplate将使用producerFactory创建一个KafkaProducer对象,并使用我们在producerConfigs中设置的配置项来配置KafkaProducer。我们将acks参数设置为1,表示生产者只需等待主题的分区副本收到消息即可确认发送成功。

消费者端,我们可以使用enable.auto.commit配置项来控制自动提交消费位移的行为,从而实现对消费消息的确认。

enable.auto.commit配置项为true时,消费者会自动提交消费位移,这意味着消费者无法控制何时确认消费消息。如果消费者在处理消息时发生错误,那么消费位移也会被提交,导致这些消息无法重新消费。因此,建议在需要保证消息可靠性的场景下关闭enable.auto.commit配置项。

enable.auto.commit配置项为false时,消费者需要手动提交消费位移,这样,消费者就可以在成功处理消息后再提交消费位移,从而实现对消费消息的确认。上节有带大家实现过

接着,看消费端的代码,新建一个AckController

@Slf4j
@RestController
public class AckController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    /**
     * 手动提交偏移量
     */
    @GetMapping("/hello")
    public String hello() throws Exception {
        // 发送消息
        for (int i = 0; i < 10; i++) {
            String message = "Message " + i;
            kafkaTemplate.send("topic1", message);
            log.info("Sent message: {}", message);
            Thread.sleep(1000);
        }
        return "hello";
    }
    @KafkaListener(topics = "topic1", groupId = "my-group", containerFactory = "kafkaListenerContainerFactory")
    public void onMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        log.info("listen Received message >>> {}", record);
        acknowledgment.acknowledge();
    }
}

2023-03-24 10:22:03.240  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 0
2023-03-24 10:22:03.250  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 87, CreateTime = 1679624523236, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 0)
2023-03-24 10:22:04.241  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 1
2023-03-24 10:22:04.255  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 88, CreateTime = 1679624524241, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 1)
2023-03-24 10:22:05.254  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 2
2023-03-24 10:22:05.258  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 89, CreateTime = 1679624525254, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 2)
2023-03-24 10:22:06.269  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 3
2023-03-24 10:22:06.274  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 90, CreateTime = 1679624526269, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 3)
2023-03-24 10:22:07.277  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 4
2023-03-24 10:22:07.285  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 91, CreateTime = 1679624527277, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 4)
2023-03-24 10:22:08.292  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 5
2023-03-24 10:22:08.296  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 92, CreateTime = 1679624528292, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 5)
2023-03-24 10:22:09.304  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 6
2023-03-24 10:22:09.308  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 93, CreateTime = 1679624529304, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 6)
2023-03-24 10:22:10.317  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 7
2023-03-24 10:22:10.321  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 94, CreateTime = 1679624530317, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 7)
2023-03-24 10:22:11.331  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 8
2023-03-24 10:22:11.335  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 95, CreateTime = 1679624531331, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 8)
2023-03-24 10:22:12.346  INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController   : Sent message: Message 9
2023-03-24 10:22:12.349  INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController   : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 96, CreateTime = 1679624532345, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 9)

需要注意的是,手动提交消费位移会降低消费者的处理速度,因为每次处理完一条消息后都需要提交位移。因此,在需要高吞吐量的场景下,建议使用批量提交消费位移的方式来提高消费者的处理速度。

批量提交消费位移

其实很简单,首先需要改下KafkaConfig,在kafkaListenerContainerFactory添加如下代码:

//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
// 设置消息转换器
factory.setMessageConverter(new StringJsonMessageConverter());

在上述代码中,设置了一个消息转换器StringJsonMessageConverter,用于将JSON格式的消息转换为Java对象。这里我们使用了Spring Kafka提供的默认消息转换器

更改消费端:


/**
     * 批量提交消费位移
     * @param records
     * @param acknowledgment
     */
    @KafkaListener(topics = "topic1", groupId = "my-group", containerFactory = "kafkaListenerContainerFactory")
    public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) {
        for (ConsumerRecord<String, String> record : records) {
            String message = record.value();
            // 处理消息
            log.info("onMessage: {}", message);
        }
        acknowledgment.acknowledge();
    }

需要注意的是,使用批量处理消息时,需要确保在处理完所有消息后再调用acknowledge方法提交消费位移,否则可能会出现一些消息重复消费丢失的情况。

结束语

Kafka专题讲解到这里就结束了,还有一些细节性的东西留给大家自己去探索。下期给大家讲Java设计模式,很多同学写代码都是大量的if/else要么就是switch/case都没有想过运用设计模式来解决问题,导致代码后期难以维护和扩展设计模式还有一个好处就是代码重构,前期你可以说业务敢,急着上线,等业务稳定下来了,回过头来可以考虑重构部分的代码实现,如果你一直抱着能不动就不动`的心里,成长的将会很慢,因为你思考的少~

本着把自己知道的都告诉大家,如果本文对您有所帮助,点赞+关注鼓励一下呗~

相关文章

项目源码(源码已更新 欢迎star⭐️)

ElasticSearch 专题学习

项目源码(源码已更新 欢迎star⭐️)

往期并发编程内容推荐

推荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)

博客(阅读体验较佳)






















相关文章
|
17天前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
48 5
|
19天前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
32 1
|
2月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
74 2
|
4月前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
162 3
|
4月前
|
消息中间件 Java Kafka
|
4月前
|
消息中间件 Java Kafka
|
4月前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
172 4
|
4月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
116 0
|
5月前
|
消息中间件 Java Kafka
spring boot 整合kafka
spring boot 整合kafka
64 8
|
4月前
|
消息中间件 Java Kafka
SpringBoot Kafka SSL接入点PLAIN机制收发消息
SpringBoot Kafka SSL接入点PLAIN机制收发消息
40 0