前言
目前正在出一个Kafka专题
系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~
本节给大家讲一下Kafka中ACK的概念
并结合代码一起实践一下~
好了, 废话不多说直接开整吧~
ACK 是什么
首先,我们得知道ack
到底是啥,上节给大家讲手动提交offset
的时候提到了ack
,但没有展开讲,下面就给大家好好理一下。
在Kafka
中,ack(即Acknowledgement)
是指消费者在成功消费一条消息后,向Kafka集群确认消息
已经被成功消费的方式。Kafka
提供了三种ACK级别
:
acks=0
:生产者发送消息后,不等待任何确认,直接发送下一条消息。acks=1
:生产者发送消息后,等待leader
节点成功写入消息后返回确认,然后发送下一条消息。acks=all
:生产者发送消息后,等待所有的follower
节点和leader
节点都成功写入消息后返回确认,然后发送下一条消息。
其中,acks=all级别提供了最高的消息可靠性,但会有较高的延迟。
大致流程如下:
生产者
向Kafka集群
发送一条消息,并指定acks=all
。Kafka集群
中有一个leader节点
和两个follower节点
。当leader
节点收到消息后,它会将消息写入本地磁盘,并向follower
节点发送写入请求。follower
节点收到请求后,也会将消息写入本地磁盘并向leader
节点返回确认。只有当所有节点
都成功写入消息后,leader
节点才会向生产者返回确认,表示消息已经被成功写入Kafka集群
。生产者收到确认后,才会发送下一条消息。
这种ack
机制保证了消息被成功写入Kafka集群
后才会被认为是已经被消费,从而保证了消息的可靠性。
代码实践
下面我们就结合springboot
代码给大家实际讲解一下,配置参数方式有很多,你可以在yml
配置文件里添加,这里给大家介绍在配置类中进行配置的方式
新建一个KafkaProducerConfig
类
@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.ACKS_CONFIG, "1"); // 设置ack参数为1 return props; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
在上面的代码中,我们创建了一个名为producerConfigs
的bean
,其中包含了bootstrap.servers、key.serializer、value.serializer
和acks
等配置项。然后,我们创建了一个名为producerFactory
的bean
,并将producerConfigs
传递给DefaultKafkaProducerFactory
的构造方法。最后,我们创建了一个名为kafkaTemplate
的bean
,并将producerFactory
传递给它的构造方法。
当我们向Kafka
发送消息时,KafkaTemplate
将使用producerFactory
创建一个KafkaProducer
对象,并使用我们在producerConfigs
中设置的配置项来配置KafkaProducer
。我们将acks
参数设置为1
,表示生产者只需等待主题的分区副本收到消息即可确认发送成功。
在消费者端
,我们可以使用enable.auto.commit
配置项来控制自动提交
消费位移的行为,从而实现对消费消息的确认。
当enable.auto.commit
配置项为true
时,消费者会自动提交
消费位移,这意味着消费者无法控制
何时确认消费消息。如果消费者在处理消息时发生错误
,那么消费位移也会被提交
,导致这些消息无法重新消费。因此,建议在需要保证消息可靠性的场景下关闭enable.auto.commit
配置项。
当enable.auto.commit
配置项为false
时,消费者需要手动提交
消费位移,这样,消费者就可以在成功处理消息后再提交消费位移,从而实现对消费消息的确认。上节有带大家实现过
接着,看消费端的代码,新建一个AckController
类
@Slf4j @RestController public class AckController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; /** * 手动提交偏移量 */ @GetMapping("/hello") public String hello() throws Exception { // 发送消息 for (int i = 0; i < 10; i++) { String message = "Message " + i; kafkaTemplate.send("topic1", message); log.info("Sent message: {}", message); Thread.sleep(1000); } return "hello"; } @KafkaListener(topics = "topic1", groupId = "my-group", containerFactory = "kafkaListenerContainerFactory") public void onMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) { log.info("listen Received message >>> {}", record); acknowledgment.acknowledge(); } }
2023-03-24 10:22:03.240 INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController : Sent message: Message 0 2023-03-24 10:22:03.250 INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 87, CreateTime = 1679624523236, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 0) 2023-03-24 10:22:04.241 INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController : Sent message: Message 1 2023-03-24 10:22:04.255 INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 88, CreateTime = 1679624524241, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 1) 2023-03-24 10:22:05.254 INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController : Sent message: Message 2 2023-03-24 10:22:05.258 INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 89, CreateTime = 1679624525254, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 2) 2023-03-24 10:22:06.269 INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController : Sent message: Message 3 2023-03-24 10:22:06.274 INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 90, CreateTime = 1679624526269, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 3) 2023-03-24 10:22:07.277 INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController : Sent message: Message 4 2023-03-24 10:22:07.285 INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 91, CreateTime = 1679624527277, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 4) 2023-03-24 10:22:08.292 INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController : Sent message: Message 5 2023-03-24 10:22:08.296 INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 92, CreateTime = 1679624528292, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 5) 2023-03-24 10:22:09.304 INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController : Sent message: Message 6 2023-03-24 10:22:09.308 INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 93, CreateTime = 1679624529304, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 6) 2023-03-24 10:22:10.317 INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController : Sent message: Message 7 2023-03-24 10:22:10.321 INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 94, CreateTime = 1679624530317, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 7) 2023-03-24 10:22:11.331 INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController : Sent message: Message 8 2023-03-24 10:22:11.335 INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 95, CreateTime = 1679624531331, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 8) 2023-03-24 10:22:12.346 INFO 24584 --- [nio-8081-exec-1] c.kafka.study.controller.AckController : Sent message: Message 9 2023-03-24 10:22:12.349 INFO 24584 --- [ntainer#0-0-C-1] c.kafka.study.controller.AckController : listen Received message >>> ConsumerRecord(topic = topic1, partition = 0, offset = 96, CreateTime = 1679624532345, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 9)
需要注意的是,手动提交
消费位移会降低消费者的处理速度,因为每次处理完一条消息后都需要提交位移。因此,在需要高吞吐量的场景下,建议使用批量提交消费位移
的方式来提高消费者的处理速度。
批量提交消费位移
其实很简单,首先需要改下KafkaConfig
,在kafkaListenerContainerFactory
添加如下代码:
//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG factory.setBatchListener(true); // 设置消息转换器 factory.setMessageConverter(new StringJsonMessageConverter());
在上述代码中,设置了一个消息转换器StringJsonMessageConverter
,用于将JSON格式
的消息转换为Java对象
。这里我们使用了Spring Kafka
提供的默认消息转换器
。
更改消费端:
/** * 批量提交消费位移 * @param records * @param acknowledgment */ @KafkaListener(topics = "topic1", groupId = "my-group", containerFactory = "kafkaListenerContainerFactory") public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) { for (ConsumerRecord<String, String> record : records) { String message = record.value(); // 处理消息 log.info("onMessage: {}", message); } acknowledgment.acknowledge(); }
需要注意的是,使用批量处理消息时,需要确保在处理完所有消息后再调用acknowledge
方法提交消费位移,否则可能会出现一些消息重复消费
或丢失
的情况。
结束语
Kafka
专题讲解到这里就结束了,还有一些细节性的东西留给大家自己去探索。下期给大家讲Java设计模式
,很多同学写代码都是大量的if/else
要么就是switch/case
都没有想过运用设计模式来解决问题,导致代码后期难以维护和扩展,
设计模式还有一个好处就是
代码重构,前期你可以说业务敢,急着上线,等业务稳定下来了,回过头来可以考虑重构部分的代码实现,如果你一直抱着
能不动就不动`的心里,成长的将会很慢,因为你思考的少~
本着把自己知道的都告诉大家,如果本文对您有所帮助,点赞+关注
鼓励一下呗~
相关文章
- 一起来学kafka之Kafka集群搭建
- 一起来学kafka之整合SpringBoot基本使用
- 一起来学kafka之整合SpringBoot深入使用(一)
- 一起来学kafka之整合SpringBoot深入使用(二)
项目源码(源码已更新 欢迎star⭐️)
ElasticSearch 专题学习
- 利用docker搭建es集群
- 一起来学ElasticSearch(一)
- 一起来学ElasticSearch(二)
- 一起来学ElasticSearch(三)
- 一起来学ElasticSearch(四)
- 一起来学ElasticSearch(五)
- 一起来学ElasticSearch(六)
- 一起来学ElasticSearch(七)
- 一起来学ElasticSearch(八)
- 一起来学ElasticSearch(九)
- 一起来学ElasticSearch(十)
- 一起来学ElasticSearch之整合SpringBoot(一)
- 一起来学ElasticSearch之整合SpringBoot(二)
- 一起来学ElasticSearch之整合SpringBoot(三)
项目源码(源码已更新 欢迎star⭐️)
往期并发编程内容推荐
- Java多线程专题之线程与进程概述
- Java多线程专题之线程类和接口入门
- Java多线程专题之进阶学习Thread(含源码分析)
- Java多线程专题之Callable、Future与FutureTask(含源码分析)
- 面试官: 有了解过线程组和线程优先级吗
- 面试官: 说一下线程的生命周期过程
- 面试官: 说一下线程间的通信
- 面试官: 说一下Java的共享内存模型
- 面试官: 有了解过指令重排吗,什么是happens-before
- 面试官: 有了解过volatile关键字吗 说说看
- 面试官: 有了解过Synchronized吗 说说看
- Java多线程专题之Lock锁的使用
- 面试官: 有了解过ReentrantLock的底层实现吗?说说看
- 面试官: 有了解过CAS和原子操作吗?说说看
- Java多线程专题之线程池的基本使用
- 面试官: 有了解过线程池的工作原理吗?说说看
- 面试官: 线程池是如何做到线程复用的?有了解过吗,说说看
- 面试官: 阻塞队列有了解过吗?说说看
- 面试官: 阻塞队列的底层实现有了解过吗? 说说看
- 面试官: 同步容器和并发容器有用过吗? 说说看
- 面试官: CopyOnWrite容器有了解过吗? 说说看
- 面试官: Semaphore在项目中有使用过吗?说说看(源码剖析)
- 面试官: Exchanger在项目中有使用过吗?说说看(源码剖析)
- 面试官: CountDownLatch有了解过吗?说说看(源码剖析)
- 面试官: CyclicBarrier有了解过吗?说说看(源码剖析)
- 面试官: Phaser有了解过吗?说说看
- 面试官: Fork/Join 有了解过吗?说说看(含源码分析)
- 面试官: Stream并行流有了解过吗?说说看
推荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)
- springboot-all
地址
: github.com/qiuChenglei…- SpringBoot系列教程合集
- 一起来学SpringCloud合集
- SpringCloud整合 Oauth2+Gateway+Jwt+Nacos 实现授权码模式的服务认证(一)
- SpringCloud整合 Oauth2+Gateway+Jwt+Nacos 实现授权码模式的服务认证(二)