前言
目前正在出一个Kafka专题
系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~
本节给大家讲一下Kafka中偏移量(offset)的概念
并结合经典面试题来看下它的实际应用场景~
好了, 废话不多说直接开整吧~
什么是分区 & Partition
在讲之前呢,先理一下什么是分区
,在第一节的时候有给大家提到过
在Kafka
中,一个主题(topic)
可以分成多个分区
。每个分区都是一个有序
的消息队列
,它们可以在不同的服务器上进行复制,以提高可靠性
和可扩展性
。每个分区都有一个唯一的标识符(partition ID)
,用于标识该分区。
什么是偏移量 & Offset
偏移量
是Kafka
中用于标识消息在分区中位置
的一个数字
。每个消息都有一个唯一
的偏移量,它是由Kafka
分配的,并且在分区
中是递增
的。偏移量
可以用于回溯
分区中的消息,也可以用于跟踪已经消费
的消息。
+---------+ +---------+ | | | | |Topic | |Topic | | | | | +---------+ +---------+ | | | | |Partition 1 Partition 2 | | | | +---------+ +---------+ | | | | |Offset 1 | |Offset 1 | | | | | +---------+ +---------+ | | | | |Offset 2 | |Offset 2 | | | | | +---------+ +---------+ | | | | | ... | | ... | | | | | +---------+ +---------+
在这个示意图中,Kafka
中有两个主题(Topic)
,每个主题都有两个分区(Partition)
。每个分区都有一个唯一的分区ID
,并且包含一系列有序
的消息。每个消息都有一个唯一
的偏移量(Offset)
,用于标识它在分区中的位置。
实际应用 & offset
Kafka
中的偏移量(Offset)
是一个非常重要的概念,它指的是消费者在一个特定分区的消息中的位置
。Kafka
使用偏移量
来保证消费者可以从上次离开的地方继续
消费,从而保证消息的顺序性
和可靠性
。
以下是一些实际应用场景,演示了如何使用偏移量来处理不同的情况:
重新消费消息
:假设一个消费者在处理消息时发生了故障或错误
,导致它无法处理后续的消息。在这种情况下,我们可以将消费者的偏移量重置
为较早的位置,以重新消费
之前未能处理的消息。手动提交偏移量
:Kafka消费者API
支持手动
提交偏移量
的方式,这可以用于优化消费者的性能
和控制偏移量的提交
。在手动提交偏移量时,消费者可以根据自己的需要决定何时提交偏移量,并且可以根据消息的处理情况进行批量提交或单独提交。消费者组协调器
:Kafka消费者API
中的消费者组协调器负责管理消费者组中的偏移量。当消费者加入或离开消费者组时,协调器将重新分配偏移量,以确保消费者可以从正确
的位置开始消费。并发消费
:在某些情况下,我们可能需要使用多个消费者来并发地消费同一个主题的消息。在这种情况下,每个消费者都可以独立地管理自己的偏移量,并根据自己的需要进行提交,以确保每个消费者都能够独立地处理消息。
总之,偏移量在Kafka中具有重要的作用,它可以帮助我们实现消息的顺序性
和可靠性
,并提供了一些方便的方式来处理不同的应用场景。
Kafka 消息丢失
Kafka
一定能保证消息不丢失吗?答案是否定的。前面几节在讲消费者消费消息的时候都是自动提交偏移量
,这里说一下自动提交
的概念,
默认情况下,Kafka
会使用自动提交偏移量
的方式来管理偏移量
。这意味着,每当消费者
从Kafka
中拉取一批消息并消费完毕
后,它将自动
提交偏移量,以便下一次消费者拉取消息时能够从上次提交
的偏移量处开始消费消息。
接下来,我们就简单的模拟一下消息丢失
的场景~
新建一个OffsetController
,实现一个小需求,首先发送10
条消息到topic1
,消费成功后将结果发送到topic2
@Slf4j @RestController public class OffsetController { @Autowired private KafkaTemplate<Object, Object> kafkaTemplate; @GetMapping("/hello") public String hello() throws Exception { // 发送消息 for (int i = 0; i < 10; i++) { String message = "Message " + i; kafkaTemplate.send("topic1", message); log.info("Sent message: {}", message); Thread.sleep(1000); } return "hello"; } @KafkaListener(topics = "topic1", id = "to1") public void listen1(String message) { log.info("listen1 Received message >>> {}", message); kafkaTemplate.send("topic2", message); } @KafkaListener(topics = "topic2", id = "to2") public void listen2(String message) { log.info("listen2 Received message >>> {}", message); } }
看下正常情况下的消息消费:
2023-03-21 14:47:14.161 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 0 2023-03-21 14:47:14.177 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 0 2023-03-21 14:47:14.211 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 0 2023-03-21 14:47:15.168 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 1 2023-03-21 14:47:15.173 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 1 2023-03-21 14:47:15.178 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 1 2023-03-21 14:47:16.177 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 2 2023-03-21 14:47:16.184 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 2 2023-03-21 14:47:16.187 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 2 2023-03-21 14:47:17.189 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 3 2023-03-21 14:47:17.193 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 3 2023-03-21 14:47:17.198 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 3 2023-03-21 14:47:18.202 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 4 2023-03-21 14:47:18.205 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 4 2023-03-21 14:47:18.209 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 4 2023-03-21 14:47:19.205 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 5 2023-03-21 14:47:19.210 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 5 2023-03-21 14:47:19.218 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 5 2023-03-21 14:47:20.206 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 6 2023-03-21 14:47:20.210 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 6 2023-03-21 14:47:20.213 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 6 2023-03-21 14:47:21.221 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 7 2023-03-21 14:47:21.226 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 7 2023-03-21 14:47:21.232 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 7 2023-03-21 14:47:22.228 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 8 2023-03-21 14:47:22.232 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 8 2023-03-21 14:47:22.241 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 8 2023-03-21 14:47:23.230 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 9 2023-03-21 14:47:23.234 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 9 2023-03-21 14:47:23.238 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 9
可以看到消息是正常消费, 并且是顺序消费
下面来改造一下消费者,假设消费者to1
在某些情况下发生了异常或者宕机了
@KafkaListener(topics = "topic1", id = "to1") public void listen1(String message) { if(message.contains("6")) throw new RuntimeException("系统异常"); log.info("listen1 Received message >>> {}", message); kafkaTemplate.send("topic2", message); } @KafkaListener(topics = "topic2", id = "to2") public void listen2(String message) { if(String.format("Message %d", 6 + 1).equals(message)) { log.error("消息丢失, 消息为 >>> {}", 6); } log.info("listen2 Received message >>> {}", message); }
看下异常情况下的输出
2023-03-21 15:21:28.405 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 0 2023-03-21 15:21:28.421 INFO 130336 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 0 2023-03-21 15:21:28.430 INFO 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 0 2023-03-21 15:21:29.409 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 1 2023-03-21 15:21:29.413 INFO 130336 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 1 2023-03-21 15:21:29.417 INFO 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 1 2023-03-21 15:21:30.415 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 2 2023-03-21 15:21:30.420 INFO 130336 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 2 2023-03-21 15:21:30.424 INFO 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 2 2023-03-21 15:21:31.416 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 3 2023-03-21 15:21:31.420 INFO 130336 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 3 2023-03-21 15:21:31.424 INFO 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 3 2023-03-21 15:21:32.430 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 4 2023-03-21 15:21:32.434 INFO 130336 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 4 2023-03-21 15:21:32.438 INFO 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 4 2023-03-21 15:21:33.432 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 5 2023-03-21 15:21:33.436 INFO 130336 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 5 2023-03-21 15:21:33.441 INFO 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 5 2023-03-21 15:21:34.435 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 6 2023-03-21 15:21:34.445 ERROR 130336 --- [ to1-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = topic1, partition = 0, offset = 73, CreateTime = 1679383294435, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 6) org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.kafka.study.controller.OffsetController.listen1(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: 系统异常; nested exception is java.lang.RuntimeException: 系统异常 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1272) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1261) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1188) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1159) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1099) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:934) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:750) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_191] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_191] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191] Caused by: java.lang.RuntimeException: 系统异常 at com.kafka.study.controller.OffsetController.listen1(OffsetController.java:36) ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1224) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1217) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1178) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE] ... 8 common frames omitted 2023-03-21 15:21:35.439 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 7 2023-03-21 15:21:35.442 INFO 130336 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 7 2023-03-21 15:21:35.445 ERROR 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : 消息丢失, 消息为 >>> 6 2023-03-21 15:21:35.445 INFO 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 7 2023-03-21 15:21:36.448 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 8 2023-03-21 15:21:36.451 INFO 130336 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 8 2023-03-21 15:21:36.455 INFO 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 8 2023-03-21 15:21:37.449 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 9 2023-03-21 15:21:37.455 INFO 130336 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 9 2023-03-21 15:21:37.460 INFO 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 9
从结果来看当消费者to1
消费到Message 6
这条消息的时候报了异常导致消息没有被消费成功,但是还是正常提交了offset
,接着继续消费,这就导致了消息的丢失
。理论上讲没有消费成功的消息应当重新消费,然后提交offset
那么如何不自动提交offset
呢? 这是一道比较经典的面试题
Kafka 手动提交 offset
由于我们使用了自动提交偏移量
的方式,而这个新的消息是在消费者提交偏移量之后
发送的,因此消费者不会收到这条新的消息,这就导致了消息丢失的情况。为了避免这种情况,我们应该使用手动提交偏移量
的方式,以便在消费者完成所有消息的消费后手动提交偏移量。这样一来,即使在消费者消费消息的过程中出现异常或者消费者应用程序被关闭,我们也能够确保消息的完整性和可靠性。
接下来,我们就去解决上述的问题。
首先更改一下KafkaConfig
, 上节给大家讲过,直接在这上边改, 在kafkaListenerContainerFactory()
中添加如下代码
// 开启手动提交偏移量 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
接着修改consumerConfigs()
, 添加如下代码, 将自动提交关闭
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
然后更改下我们的controller
代码
/** * 手动提交偏移量 */ @GetMapping("/hello1") public String hello1() throws Exception { // 发送消息 for (int i = 0; i < 10; i++) { String message = "Message " + i; kafkaTemplate.send("topic3", message); log.info("Sent message: {}", message); Thread.sleep(1000); } return "hello1"; } @KafkaListener(topics = "topic3", groupId = "my-group", containerFactory = "kafkaListenerContainerFactory") public void onMessage(String message, Acknowledgment acknowledgment) { log.info("listen1 Received message >>> {}", message); }
这个时候我并没有在监听器里手动提交offset
, 运行之后请求下,看下控制台信息
2023-03-21 16:45:38.408 INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 1 2023-03-21 16:45:38.411 INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 1 2023-03-21 16:45:39.415 INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 2 2023-03-21 16:45:39.418 INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 2 2023-03-21 16:45:40.419 INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 3 2023-03-21 16:45:40.423 INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 3 2023-03-21 16:45:41.434 INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 4 2023-03-21 16:45:41.438 INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 4 2023-03-21 16:45:42.447 INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 5 2023-03-21 16:45:42.452 INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 5 2023-03-21 16:45:43.460 INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 6 2023-03-21 16:45:43.463 INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 6 2023-03-21 16:45:44.462 INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 7 2023-03-21 16:45:44.465 INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 7 2023-03-21 16:45:45.474 INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 8 2023-03-21 16:45:45.478 INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 8 2023-03-21 16:45:46.484 INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 9 2023-03-21 16:45:46.488 INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 9
如果符合预期的话,消费者再次启动的时候,应该从上次消费的位置开始消费,下面我们重启应用,过几秒后看下控制台信息
2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 0 2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 1 2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 2 2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 3 2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 4 2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 5 2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 6 2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 7 2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 8 2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 9
从结果来看,符合我们的预期,因为Message 0
这条消息并没有手动提交offset
所以下次进来的时候还是从这个位置开始消费消息
那怎么手动提交呢?其实很简单,添加如下代码即可
acknowledgment.acknowledge();
这样,在消息被成功消费后,由应用本身手动提交offset
这样可以保证我们的消息不会被丢失
结束语
本节提到了ack
的概念,它是kafka
的一种确认机制,它用于确定消息是否已经被成功处理,下节就结合实际应用场景给大家顺一下这个概念~
本着把自己知道的都告诉大家,如果本文对您有所帮助,点赞+关注
鼓励一下呗~
相关文章
项目源码(源码已更新 欢迎star⭐️)
ElasticSearch 专题学习
- 利用docker搭建es集群
- 一起来学ElasticSearch(一)
- 一起来学ElasticSearch(二)
- 一起来学ElasticSearch(三)
- 一起来学ElasticSearch(四)
- 一起来学ElasticSearch(五)
- 一起来学ElasticSearch(六)
- 一起来学ElasticSearch(七)
- 一起来学ElasticSearch(八)
- 一起来学ElasticSearch(九)
- 一起来学ElasticSearch(十)
- 一起来学ElasticSearch之整合SpringBoot(一)
- 一起来学ElasticSearch之整合SpringBoot(二)
- 一起来学ElasticSearch之整合SpringBoot(三)
项目源码(源码已更新 欢迎star⭐️)
往期并发编程内容推荐
- Java多线程专题之线程与进程概述
- Java多线程专题之线程类和接口入门
- Java多线程专题之进阶学习Thread(含源码分析)
- Java多线程专题之Callable、Future与FutureTask(含源码分析)
- 面试官: 有了解过线程组和线程优先级吗
- 面试官: 说一下线程的生命周期过程
- 面试官: 说一下线程间的通信
- 面试官: 说一下Java的共享内存模型
- 面试官: 有了解过指令重排吗,什么是happens-before
- 面试官: 有了解过volatile关键字吗 说说看
- 面试官: 有了解过Synchronized吗 说说看
- Java多线程专题之Lock锁的使用
- 面试官: 有了解过ReentrantLock的底层实现吗?说说看
- 面试官: 有了解过CAS和原子操作吗?说说看
- Java多线程专题之线程池的基本使用
- 面试官: 有了解过线程池的工作原理吗?说说看
- 面试官: 线程池是如何做到线程复用的?有了解过吗,说说看
- 面试官: 阻塞队列有了解过吗?说说看
- 面试官: 阻塞队列的底层实现有了解过吗? 说说看
- 面试官: 同步容器和并发容器有用过吗? 说说看
- 面试官: CopyOnWrite容器有了解过吗? 说说看
- 面试官: Semaphore在项目中有使用过吗?说说看(源码剖析)
- 面试官: Exchanger在项目中有使用过吗?说说看(源码剖析)
- 面试官: CountDownLatch有了解过吗?说说看(源码剖析)
- 面试官: CyclicBarrier有了解过吗?说说看(源码剖析)
- 面试官: Phaser有了解过吗?说说看
- 面试官: Fork/Join 有了解过吗?说说看(含源码分析)
- 面试官: Stream并行流有了解过吗?说说看
推荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)
- springboot-all
地址
: github.com/qiuChenglei…- SpringBoot系列教程合集
- 一起来学SpringCloud合集
- SpringCloud整合 Oauth2+Gateway+Jwt+Nacos 实现授权码模式的服务认证(一)
- SpringCloud整合 Oauth2+Gateway+Jwt+Nacos 实现授权码模式的服务认证(二)