一起来学kafka之整合SpringBoot深入使用(二)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 前言目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~本节给大家讲一下Kafka中偏移量(offset)的概念并结合经典面试题来看下它的实际应用场景~好了, 废话不多说直接开整吧~什么是分区 & Partition在讲之前呢,先理一下什么是分区,在第一节的时候有给大家提到过在Kafka中,一个主题(topic)可以分成多个分区。每个分区都是一个有序的消息队列,它们可以在不同的服务器上进行复制,以提高可靠性和可扩展性。每个分区都有一个唯一的标识符(partition ID),用于标识该分区。

前言

目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~

本节给大家讲一下Kafka中偏移量(offset)的概念并结合经典面试题来看下它的实际应用场景~

好了, 废话不多说直接开整吧~

什么是分区 & Partition

在讲之前呢,先理一下什么是分区,在第一节的时候有给大家提到过

Kafka中,一个主题(topic)可以分成多个分区。每个分区都是一个有序消息队列,它们可以在不同的服务器上进行复制,以提高可靠性可扩展性。每个分区都有一个唯一的标识符(partition ID),用于标识该分区。

什么是偏移量 & Offset

偏移量Kafka中用于标识消息在分区中位置的一个数字。每个消息都有一个唯一的偏移量,它是由Kafka分配的,并且在分区中是递增的。偏移量可以用于回溯分区中的消息,也可以用于跟踪已经消费的消息。

   +---------+          +---------+
   |         |          |         |
   |Topic    |          |Topic    |
   |         |          |         |
   +---------+          +---------+
   |         |          |         |
   |Partition 1          Partition 2
   |         |          |         |
   +---------+          +---------+
   |         |          |         |
   |Offset 1 |          |Offset 1 |
   |         |          |         |
   +---------+          +---------+
   |         |          |         |
   |Offset 2 |          |Offset 2 |
   |         |          |         |
   +---------+          +---------+
   |         |          |         |
   |   ...   |          |   ...   |
   |         |          |         |
   +---------+          +---------+

在这个示意图中,Kafka中有两个主题(Topic),每个主题都有两个分区(Partition)。每个分区都有一个唯一的分区ID,并且包含一系列有序的消息。每个消息都有一个唯一偏移量(Offset),用于标识它在分区中的位置。

实际应用 & offset

Kafka中的偏移量(Offset)是一个非常重要的概念,它指的是消费者在一个特定分区的消息中的位置Kafka使用偏移量来保证消费者可以从上次离开的地方继续消费,从而保证消息的顺序性可靠性

以下是一些实际应用场景,演示了如何使用偏移量来处理不同的情况:

  • 重新消费消息:假设一个消费者在处理消息时发生了故障或错误,导致它无法处理后续的消息。在这种情况下,我们可以将消费者的偏移量重置为较早的位置,以重新消费之前未能处理的消息。
  • 手动提交偏移量Kafka消费者API支持手动提交偏移量的方式,这可以用于优化消费者的性能控制偏移量的提交。在手动提交偏移量时,消费者可以根据自己的需要决定何时提交偏移量,并且可以根据消息的处理情况进行批量提交或单独提交。
  • 消费者组协调器Kafka消费者API中的消费者组协调器负责管理消费者组中的偏移量。当消费者加入或离开消费者组时,协调器将重新分配偏移量,以确保消费者可以从正确的位置开始消费。
  • 并发消费:在某些情况下,我们可能需要使用多个消费者来并发地消费同一个主题的消息。在这种情况下,每个消费者都可以独立地管理自己的偏移量,并根据自己的需要进行提交,以确保每个消费者都能够独立地处理消息。

总之,偏移量在Kafka中具有重要的作用,它可以帮助我们实现消息的顺序性可靠性,并提供了一些方便的方式来处理不同的应用场景。

Kafka 消息丢失

Kafka一定能保证消息不丢失吗?答案是否定的。前面几节在讲消费者消费消息的时候都是自动提交偏移量,这里说一下自动提交的概念,

默认情况下,Kafka会使用自动提交偏移量的方式来管理偏移量。这意味着,每当消费者Kafka中拉取一批消息并消费完毕后,它将自动提交偏移量,以便下一次消费者拉取消息时能够从上次提交的偏移量处开始消费消息。

接下来,我们就简单的模拟一下消息丢失的场景~

新建一个OffsetController,实现一个小需求,首先发送10条消息到topic1,消费成功后将结果发送到topic2

@Slf4j
@RestController
public class OffsetController {
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;
    @GetMapping("/hello")
    public String hello() throws Exception {
        // 发送消息
        for (int i = 0; i < 10; i++) {
            String message = "Message " + i;
            kafkaTemplate.send("topic1", message);
            log.info("Sent message: {}", message);
            Thread.sleep(1000);
        }
        return "hello";
    }
    @KafkaListener(topics = "topic1", id = "to1")
    public void listen1(String message) {
        log.info("listen1 Received message >>> {}", message);
        kafkaTemplate.send("topic2", message);
    }
    @KafkaListener(topics = "topic2", id = "to2")
    public void listen2(String message) {
        log.info("listen2 Received message >>> {}", message);
    }
}

看下正常情况下的消息消费:

2023-03-21 14:47:14.161  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 0
2023-03-21 14:47:14.177  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 0
2023-03-21 14:47:14.211  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 0
2023-03-21 14:47:15.168  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 1
2023-03-21 14:47:15.173  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 1
2023-03-21 14:47:15.178  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 1
2023-03-21 14:47:16.177  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 2
2023-03-21 14:47:16.184  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 2
2023-03-21 14:47:16.187  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 2
2023-03-21 14:47:17.189  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 3
2023-03-21 14:47:17.193  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 3
2023-03-21 14:47:17.198  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 3
2023-03-21 14:47:18.202  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 4
2023-03-21 14:47:18.205  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 4
2023-03-21 14:47:18.209  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 4
2023-03-21 14:47:19.205  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 5
2023-03-21 14:47:19.210  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 5
2023-03-21 14:47:19.218  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 5
2023-03-21 14:47:20.206  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 6
2023-03-21 14:47:20.210  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 6
2023-03-21 14:47:20.213  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 6
2023-03-21 14:47:21.221  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 7
2023-03-21 14:47:21.226  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 7
2023-03-21 14:47:21.232  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 7
2023-03-21 14:47:22.228  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 8
2023-03-21 14:47:22.232  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 8
2023-03-21 14:47:22.241  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 8
2023-03-21 14:47:23.230  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 9
2023-03-21 14:47:23.234  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 9
2023-03-21 14:47:23.238  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 9

可以看到消息是正常消费, 并且是顺序消费

下面来改造一下消费者,假设消费者to1在某些情况下发生了异常或者宕机了

 @KafkaListener(topics = "topic1", id = "to1")
    public void listen1(String message) {
        if(message.contains("6"))
            throw new RuntimeException("系统异常");
        log.info("listen1 Received message >>> {}", message);
        kafkaTemplate.send("topic2", message);
    }
    @KafkaListener(topics = "topic2", id = "to2")
    public void listen2(String message) {
        if(String.format("Message %d", 6 + 1).equals(message)) {
            log.error("消息丢失, 消息为 >>> {}", 6);
        }
        log.info("listen2 Received message >>> {}", message);
    }

看下异常情况下的输出

2023-03-21 15:21:28.405  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 0
2023-03-21 15:21:28.421  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 0
2023-03-21 15:21:28.430  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 0
2023-03-21 15:21:29.409  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 1
2023-03-21 15:21:29.413  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 1
2023-03-21 15:21:29.417  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 1
2023-03-21 15:21:30.415  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 2
2023-03-21 15:21:30.420  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 2
2023-03-21 15:21:30.424  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 2
2023-03-21 15:21:31.416  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 3
2023-03-21 15:21:31.420  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 3
2023-03-21 15:21:31.424  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 3
2023-03-21 15:21:32.430  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 4
2023-03-21 15:21:32.434  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 4
2023-03-21 15:21:32.438  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 4
2023-03-21 15:21:33.432  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 5
2023-03-21 15:21:33.436  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 5
2023-03-21 15:21:33.441  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 5
2023-03-21 15:21:34.435  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 6
2023-03-21 15:21:34.445 ERROR 130336 --- [      to1-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = topic1, partition = 0, offset = 73, CreateTime = 1679383294435, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 6)
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.kafka.study.controller.OffsetController.listen1(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: 系统异常; nested exception is java.lang.RuntimeException: 系统异常
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1272) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1261) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1188) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1159) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1099) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:934) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:750) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_191]
  at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_191]
  at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.lang.RuntimeException: 系统异常
  at com.kafka.study.controller.OffsetController.listen1(OffsetController.java:36) ~[classes/:na]
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
  at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
  at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
  at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
  at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1224) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1217) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1178) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
  ... 8 common frames omitted
2023-03-21 15:21:35.439  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 7
2023-03-21 15:21:35.442  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 7
2023-03-21 15:21:35.445 ERROR 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : 消息丢失, 消息为 >>> 6
2023-03-21 15:21:35.445  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 7
2023-03-21 15:21:36.448  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 8
2023-03-21 15:21:36.451  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 8
2023-03-21 15:21:36.455  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 8
2023-03-21 15:21:37.449  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 9
2023-03-21 15:21:37.455  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 9
2023-03-21 15:21:37.460  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 9

从结果来看当消费者to1消费到Message 6这条消息的时候报了异常导致消息没有被消费成功,但是还是正常提交了offset,接着继续消费,这就导致了消息的丢失。理论上讲没有消费成功的消息应当重新消费,然后提交offset

那么如何不自动提交offset呢? 这是一道比较经典的面试题

Kafka 手动提交 offset

由于我们使用了自动提交偏移量的方式,而这个新的消息是在消费者提交偏移量之后发送的,因此消费者不会收到这条新的消息,这就导致了消息丢失的情况。为了避免这种情况,我们应该使用手动提交偏移量的方式,以便在消费者完成所有消息的消费后手动提交偏移量。这样一来,即使在消费者消费消息的过程中出现异常或者消费者应用程序被关闭,我们也能够确保消息的完整性和可靠性。

接下来,我们就去解决上述的问题。

首先更改一下KafkaConfig, 上节给大家讲过,直接在这上边改, 在kafkaListenerContainerFactory()中添加如下代码

// 开启手动提交偏移量
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

接着修改consumerConfigs(), 添加如下代码, 将自动提交关闭

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

然后更改下我们的controller代码

 /**
    * 手动提交偏移量
    */
@GetMapping("/hello1")
public String hello1() throws Exception {
    // 发送消息
    for (int i = 0; i < 10; i++) {
        String message = "Message " + i;
        kafkaTemplate.send("topic3", message);
        log.info("Sent message: {}", message);
        Thread.sleep(1000);
    }
    return "hello1";
}
@KafkaListener(topics = "topic3", groupId = "my-group", containerFactory = "kafkaListenerContainerFactory")
public void onMessage(String message, Acknowledgment acknowledgment) {
    log.info("listen1 Received message >>> {}", message);
}

这个时候我并没有在监听器里手动提交offset, 运行之后请求下,看下控制台信息

2023-03-21 16:45:38.408  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 1
2023-03-21 16:45:38.411  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 1
2023-03-21 16:45:39.415  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 2
2023-03-21 16:45:39.418  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 2
2023-03-21 16:45:40.419  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 3
2023-03-21 16:45:40.423  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 3
2023-03-21 16:45:41.434  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 4
2023-03-21 16:45:41.438  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 4
2023-03-21 16:45:42.447  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 5
2023-03-21 16:45:42.452  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 5
2023-03-21 16:45:43.460  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 6
2023-03-21 16:45:43.463  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 6
2023-03-21 16:45:44.462  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 7
2023-03-21 16:45:44.465  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 7
2023-03-21 16:45:45.474  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 8
2023-03-21 16:45:45.478  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 8
2023-03-21 16:45:46.484  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 9
2023-03-21 16:45:46.488  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 9

如果符合预期的话,消费者再次启动的时候,应该从上次消费的位置开始消费,下面我们重启应用,过几秒后看下控制台信息

2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 0
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 1
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 2
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 3
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 4
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 5
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 6
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 7
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 8
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 9

从结果来看,符合我们的预期,因为Message 0这条消息并没有手动提交offset所以下次进来的时候还是从这个位置开始消费消息

那怎么手动提交呢?其实很简单,添加如下代码即可

acknowledgment.acknowledge();

这样,在消息被成功消费后,由应用本身手动提交offset这样可以保证我们的消息不会被丢失

结束语

本节提到了ack的概念,它是kafka的一种确认机制,它用于确定消息是否已经被成功处理,下节就结合实际应用场景给大家顺一下这个概念~

本着把自己知道的都告诉大家,如果本文对您有所帮助,点赞+关注鼓励一下呗~

相关文章

项目源码(源码已更新 欢迎star⭐️)

ElasticSearch 专题学习

项目源码(源码已更新 欢迎star⭐️)

往期并发编程内容推荐

推荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)

博客(阅读体验较佳)




相关文章
|
17天前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
48 5
|
19天前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
32 1
|
2月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
74 2
|
4月前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
162 3
|
4月前
|
消息中间件 Java Kafka
|
4月前
|
消息中间件 Java Kafka
|
4月前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
172 4
|
4月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
116 0
|
5月前
|
消息中间件 Java Kafka
spring boot 整合kafka
spring boot 整合kafka
64 8
|
4月前
|
消息中间件 Java Kafka
SpringBoot Kafka SSL接入点PLAIN机制收发消息
SpringBoot Kafka SSL接入点PLAIN机制收发消息
40 0