一起来学kafka之整合SpringBoot深入使用(二)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 前言目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~本节给大家讲一下Kafka中偏移量(offset)的概念并结合经典面试题来看下它的实际应用场景~好了, 废话不多说直接开整吧~什么是分区 & Partition在讲之前呢,先理一下什么是分区,在第一节的时候有给大家提到过在Kafka中,一个主题(topic)可以分成多个分区。每个分区都是一个有序的消息队列,它们可以在不同的服务器上进行复制,以提高可靠性和可扩展性。每个分区都有一个唯一的标识符(partition ID),用于标识该分区。

前言

目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~

本节给大家讲一下Kafka中偏移量(offset)的概念并结合经典面试题来看下它的实际应用场景~

好了, 废话不多说直接开整吧~

什么是分区 & Partition

在讲之前呢,先理一下什么是分区,在第一节的时候有给大家提到过

Kafka中,一个主题(topic)可以分成多个分区。每个分区都是一个有序消息队列,它们可以在不同的服务器上进行复制,以提高可靠性可扩展性。每个分区都有一个唯一的标识符(partition ID),用于标识该分区。

什么是偏移量 & Offset

偏移量Kafka中用于标识消息在分区中位置的一个数字。每个消息都有一个唯一的偏移量,它是由Kafka分配的,并且在分区中是递增的。偏移量可以用于回溯分区中的消息,也可以用于跟踪已经消费的消息。

   +---------+          +---------+
   |         |          |         |
   |Topic    |          |Topic    |
   |         |          |         |
   +---------+          +---------+
   |         |          |         |
   |Partition 1          Partition 2
   |         |          |         |
   +---------+          +---------+
   |         |          |         |
   |Offset 1 |          |Offset 1 |
   |         |          |         |
   +---------+          +---------+
   |         |          |         |
   |Offset 2 |          |Offset 2 |
   |         |          |         |
   +---------+          +---------+
   |         |          |         |
   |   ...   |          |   ...   |
   |         |          |         |
   +---------+          +---------+

在这个示意图中,Kafka中有两个主题(Topic),每个主题都有两个分区(Partition)。每个分区都有一个唯一的分区ID,并且包含一系列有序的消息。每个消息都有一个唯一偏移量(Offset),用于标识它在分区中的位置。

实际应用 & offset

Kafka中的偏移量(Offset)是一个非常重要的概念,它指的是消费者在一个特定分区的消息中的位置Kafka使用偏移量来保证消费者可以从上次离开的地方继续消费,从而保证消息的顺序性可靠性

以下是一些实际应用场景,演示了如何使用偏移量来处理不同的情况:

  • 重新消费消息:假设一个消费者在处理消息时发生了故障或错误,导致它无法处理后续的消息。在这种情况下,我们可以将消费者的偏移量重置为较早的位置,以重新消费之前未能处理的消息。
  • 手动提交偏移量Kafka消费者API支持手动提交偏移量的方式,这可以用于优化消费者的性能控制偏移量的提交。在手动提交偏移量时,消费者可以根据自己的需要决定何时提交偏移量,并且可以根据消息的处理情况进行批量提交或单独提交。
  • 消费者组协调器Kafka消费者API中的消费者组协调器负责管理消费者组中的偏移量。当消费者加入或离开消费者组时,协调器将重新分配偏移量,以确保消费者可以从正确的位置开始消费。
  • 并发消费:在某些情况下,我们可能需要使用多个消费者来并发地消费同一个主题的消息。在这种情况下,每个消费者都可以独立地管理自己的偏移量,并根据自己的需要进行提交,以确保每个消费者都能够独立地处理消息。

总之,偏移量在Kafka中具有重要的作用,它可以帮助我们实现消息的顺序性可靠性,并提供了一些方便的方式来处理不同的应用场景。

Kafka 消息丢失

Kafka一定能保证消息不丢失吗?答案是否定的。前面几节在讲消费者消费消息的时候都是自动提交偏移量,这里说一下自动提交的概念,

默认情况下,Kafka会使用自动提交偏移量的方式来管理偏移量。这意味着,每当消费者Kafka中拉取一批消息并消费完毕后,它将自动提交偏移量,以便下一次消费者拉取消息时能够从上次提交的偏移量处开始消费消息。

接下来,我们就简单的模拟一下消息丢失的场景~

新建一个OffsetController,实现一个小需求,首先发送10条消息到topic1,消费成功后将结果发送到topic2

@Slf4j
@RestController
public class OffsetController {
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;
    @GetMapping("/hello")
    public String hello() throws Exception {
        // 发送消息
        for (int i = 0; i < 10; i++) {
            String message = "Message " + i;
            kafkaTemplate.send("topic1", message);
            log.info("Sent message: {}", message);
            Thread.sleep(1000);
        }
        return "hello";
    }
    @KafkaListener(topics = "topic1", id = "to1")
    public void listen1(String message) {
        log.info("listen1 Received message >>> {}", message);
        kafkaTemplate.send("topic2", message);
    }
    @KafkaListener(topics = "topic2", id = "to2")
    public void listen2(String message) {
        log.info("listen2 Received message >>> {}", message);
    }
}

看下正常情况下的消息消费:

2023-03-21 14:47:14.161  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 0
2023-03-21 14:47:14.177  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 0
2023-03-21 14:47:14.211  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 0
2023-03-21 14:47:15.168  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 1
2023-03-21 14:47:15.173  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 1
2023-03-21 14:47:15.178  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 1
2023-03-21 14:47:16.177  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 2
2023-03-21 14:47:16.184  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 2
2023-03-21 14:47:16.187  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 2
2023-03-21 14:47:17.189  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 3
2023-03-21 14:47:17.193  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 3
2023-03-21 14:47:17.198  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 3
2023-03-21 14:47:18.202  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 4
2023-03-21 14:47:18.205  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 4
2023-03-21 14:47:18.209  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 4
2023-03-21 14:47:19.205  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 5
2023-03-21 14:47:19.210  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 5
2023-03-21 14:47:19.218  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 5
2023-03-21 14:47:20.206  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 6
2023-03-21 14:47:20.210  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 6
2023-03-21 14:47:20.213  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 6
2023-03-21 14:47:21.221  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 7
2023-03-21 14:47:21.226  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 7
2023-03-21 14:47:21.232  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 7
2023-03-21 14:47:22.228  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 8
2023-03-21 14:47:22.232  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 8
2023-03-21 14:47:22.241  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 8
2023-03-21 14:47:23.230  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 9
2023-03-21 14:47:23.234  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 9
2023-03-21 14:47:23.238  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 9

可以看到消息是正常消费, 并且是顺序消费

下面来改造一下消费者,假设消费者to1在某些情况下发生了异常或者宕机了

 @KafkaListener(topics = "topic1", id = "to1")
    public void listen1(String message) {
        if(message.contains("6"))
            throw new RuntimeException("系统异常");
        log.info("listen1 Received message >>> {}", message);
        kafkaTemplate.send("topic2", message);
    }
    @KafkaListener(topics = "topic2", id = "to2")
    public void listen2(String message) {
        if(String.format("Message %d", 6 + 1).equals(message)) {
            log.error("消息丢失, 消息为 >>> {}", 6);
        }
        log.info("listen2 Received message >>> {}", message);
    }

看下异常情况下的输出

2023-03-21 15:21:28.405  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 0
2023-03-21 15:21:28.421  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 0
2023-03-21 15:21:28.430  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 0
2023-03-21 15:21:29.409  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 1
2023-03-21 15:21:29.413  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 1
2023-03-21 15:21:29.417  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 1
2023-03-21 15:21:30.415  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 2
2023-03-21 15:21:30.420  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 2
2023-03-21 15:21:30.424  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 2
2023-03-21 15:21:31.416  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 3
2023-03-21 15:21:31.420  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 3
2023-03-21 15:21:31.424  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 3
2023-03-21 15:21:32.430  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 4
2023-03-21 15:21:32.434  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 4
2023-03-21 15:21:32.438  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 4
2023-03-21 15:21:33.432  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 5
2023-03-21 15:21:33.436  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 5
2023-03-21 15:21:33.441  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 5
2023-03-21 15:21:34.435  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 6
2023-03-21 15:21:34.445 ERROR 130336 --- [      to1-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = topic1, partition = 0, offset = 73, CreateTime = 1679383294435, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 6)
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.kafka.study.controller.OffsetController.listen1(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: 系统异常; nested exception is java.lang.RuntimeException: 系统异常
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1272) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1261) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1188) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1159) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1099) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:934) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:750) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_191]
  at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_191]
  at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.lang.RuntimeException: 系统异常
  at com.kafka.study.controller.OffsetController.listen1(OffsetController.java:36) ~[classes/:na]
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
  at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
  at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
  at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
  at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1224) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1217) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1178) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  ... 8 common frames omitted
2023-03-21 15:21:35.439  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 7
2023-03-21 15:21:35.442  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 7
2023-03-21 15:21:35.445 ERROR 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : 消息丢失, 消息为 >>> 6
2023-03-21 15:21:35.445  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 7
2023-03-21 15:21:36.448  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 8
2023-03-21 15:21:36.451  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 8
2023-03-21 15:21:36.455  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 8
2023-03-21 15:21:37.449  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 9
2023-03-21 15:21:37.455  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 9
2023-03-21 15:21:37.460  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 9

从结果来看当消费者to1消费到Message 6这条消息的时候报了异常导致消息没有被消费成功,但是还是正常提交了offset,接着继续消费,这就导致了消息的丢失。理论上讲没有消费成功的消息应当重新消费,然后提交offset

那么如何不自动提交offset呢? 这是一道比较经典的面试题

Kafka 手动提交 offset

由于我们使用了自动提交偏移量的方式,而这个新的消息是在消费者提交偏移量之后发送的,因此消费者不会收到这条新的消息,这就导致了消息丢失的情况。为了避免这种情况,我们应该使用手动提交偏移量的方式,以便在消费者完成所有消息的消费后手动提交偏移量。这样一来,即使在消费者消费消息的过程中出现异常或者消费者应用程序被关闭,我们也能够确保消息的完整性和可靠性。

接下来,我们就去解决上述的问题。

首先更改一下KafkaConfig, 上节给大家讲过,直接在这上边改, 在kafkaListenerContainerFactory()中添加如下代码

// 开启手动提交偏移量
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

接着修改consumerConfigs(), 添加如下代码, 将自动提交关闭

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

然后更改下我们的controller代码

 /**
    * 手动提交偏移量
    */
@GetMapping("/hello1")
public String hello1() throws Exception {
    // 发送消息
    for (int i = 0; i < 10; i++) {
        String message = "Message " + i;
        kafkaTemplate.send("topic3", message);
        log.info("Sent message: {}", message);
        Thread.sleep(1000);
    }
    return "hello1";
}
@KafkaListener(topics = "topic3", groupId = "my-group", containerFactory = "kafkaListenerContainerFactory")
public void onMessage(String message, Acknowledgment acknowledgment) {
    log.info("listen1 Received message >>> {}", message);
}

这个时候我并没有在监听器里手动提交offset, 运行之后请求下,看下控制台信息

2023-03-21 16:45:38.408  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 1
2023-03-21 16:45:38.411  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 1
2023-03-21 16:45:39.415  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 2
2023-03-21 16:45:39.418  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 2
2023-03-21 16:45:40.419  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 3
2023-03-21 16:45:40.423  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 3
2023-03-21 16:45:41.434  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 4
2023-03-21 16:45:41.438  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 4
2023-03-21 16:45:42.447  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 5
2023-03-21 16:45:42.452  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 5
2023-03-21 16:45:43.460  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 6
2023-03-21 16:45:43.463  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 6
2023-03-21 16:45:44.462  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 7
2023-03-21 16:45:44.465  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 7
2023-03-21 16:45:45.474  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 8
2023-03-21 16:45:45.478  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 8
2023-03-21 16:45:46.484  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 9
2023-03-21 16:45:46.488  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 9

如果符合预期的话,消费者再次启动的时候,应该从上次消费的位置开始消费,下面我们重启应用,过几秒后看下控制台信息

2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 0
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 1
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 2
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 3
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 4
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 5
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 6
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 7
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 8
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 9

从结果来看,符合我们的预期,因为Message 0这条消息并没有手动提交offset所以下次进来的时候还是从这个位置开始消费消息

那怎么手动提交呢?其实很简单,添加如下代码即可

acknowledgment.acknowledge();

这样,在消息被成功消费后,由应用本身手动提交offset这样可以保证我们的消息不会被丢失

结束语

本节提到了ack的概念,它是kafka的一种确认机制,它用于确定消息是否已经被成功处理,下节就结合实际应用场景给大家顺一下这个概念~

本着把自己知道的都告诉大家,如果本文对您有所帮助,点赞+关注鼓励一下呗~

相关文章

项目源码(源码已更新 欢迎star⭐️)

ElasticSearch 专题学习

项目源码(源码已更新 欢迎star⭐️)

往期并发编程内容推荐

推荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)

博客(阅读体验较佳)




相关文章
|
3天前
|
消息中间件 Java Kafka
Springboot集成高低版本kafka
Springboot集成高低版本kafka
|
8月前
|
消息中间件 负载均衡 Java
Kafka与SpringBoot的整合使用
Kafka与SpringBoot的整合使用
174 0
|
9月前
|
消息中间件 存储 NoSQL
kafka整合springboot以及核心参数的使用
kafka整合springboot以及核心参数的使用
304 0
|
10月前
|
消息中间件 缓存 NoSQL
手把手教你云相册项目简易开发 day1 Kafka+IDEA+Springboot+Redis+MySQL+libvips 简单运行和使用
手把手教你云相册项目简易开发 day1 Kafka+IDEA+Springboot+Redis+MySQL+libvips 简单运行和使用
145 0
|
6月前
|
消息中间件 数据可视化 Java
消息中间件系列教程(22) -Kafka- SpringBoot集成Kafka
消息中间件系列教程(22) -Kafka- SpringBoot集成Kafka
72 0
|
3天前
|
消息中间件 SQL druid
最新版 springboot集成kafka
最新版 springboot集成kafka
34 0
|
9月前
|
消息中间件 数据可视化 Java
SpringBoot3集成Kafka
SpringBoot3集成KafkaKafka是一个开源的分布式事件流平台,常被用于高性能数据管道、流分析、数据集成和关键任务应用,基于Zookeeper协调的处理平台,也是一种消息系统,具有更好的吞吐量、内置分区、复制和容错。
216 0
|
3天前
|
消息中间件 Java Kafka
spring boot 集成kafka
spring boot 集成kafka
56 0
spring boot 集成kafka
|
3天前
|
消息中间件 Java 关系型数据库
【Spring Boot+Kafka+Mysql+HBase】实现分布式优惠券后台应用系统(附源码)
【Spring Boot+Kafka+Mysql+HBase】实现分布式优惠券后台应用系统(附源码)
102 2
|
8月前
|
消息中间件 Java Kafka
SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)
SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)
935 0