Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明

简介: Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明

20191116123525638.png

官方说明


https://kafka.apache.org/documentation/

选择对应的版本,我这里选的是 2.4.X

https://kafka.apache.org/24/documentation.html

选择


20210223002842611.png


https://kafka.apache.org/24/documentation.html#consumerconfigs


查找 auto.offset.reset



20210223002923314.png


让我们来品一品官方的解读


参数解读


What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted)


no initial offset || current offset does not exist


no initial offset


举个例子 在消费组ConsumerGroupA里有个消费者A1, 已经消费到了100条数据, 这个时候你又新起了一个消费者, 但是呢这个新起的消费者的消费组和消费组A的名称不同,我们暂且称之为消费组ConsumerGroupB.

根据kafka的机制, 这个新起的消费组中的消费者再消费分区数据的时候,auto.offset.reset参数就起作用了


current offset does not exist


我们知道kafka提供了API可以按照消费offset记录继续消费,如果指定的offset不存在,那么 这个参数也会生效


earliest: automatically reset the offset to the earliest offset

latest: automatically reset the offset to the latest offset

none: throw exception to the consumer if no previous offset is found for the consumer’s group

anything else: throw exception to the consumer.



啥意思?。。。。。。。


当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费


latest(默认) :只消费自己启动之后发送到主题的消息

earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)


Code


20210223164651466.png


POM依赖

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- 引入 Spring-Kafka 依赖 -->
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>


配置文件

20210223164836742.png


生产者

 package com.artisan.springkafka.producer;
import com.artisan.springkafka.constants.TOPIC;
import com.artisan.springkafka.domain.MessageMock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.Random;
import java.util.concurrent.ExecutionException;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 22:25
 * @mark: show me the code , change the world
 */
@Component
public class ArtisanProducerMock {
    @Autowired
    private KafkaTemplate<Object,Object> kafkaTemplate ;
    /**
     * 同步发送
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public SendResult sendMsgSync() throws ExecutionException, InterruptedException {
        // 模拟发送的消息
        Integer id = new Random().nextInt(100);
        MessageMock messageMock = new MessageMock(id,"artisanTestMessage-" + id);
        // 同步等待
       return  kafkaTemplate.send(TOPIC.TOPIC, messageMock).get();
    }
    public ListenableFuture<SendResult<Object, Object>> sendMsgASync() throws ExecutionException, InterruptedException {
        // 模拟发送的消息
        Integer id = new Random().nextInt(100);
        MessageMock messageMock = new MessageMock(id,"messageSendByAsync-" + id);
        // 异步发送消息
        ListenableFuture<SendResult<Object, Object>> result = kafkaTemplate.send(TOPIC.TOPIC, messageMock);
        return result ;
    }
}


消费者

 package com.artisan.springkafka.consumer;
import com.artisan.springkafka.domain.MessageMock;
import com.artisan.springkafka.constants.TOPIC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 22:33
 * @mark: show me the code , change the world
 */
@Component
public class ArtisanCosumerMock {
    private Logger logger = LoggerFactory.getLogger(getClass());
    private static final String CONSUMER_GROUP_PREFIX = "ConsumerGroup-" ;
    /**
     * 消费者的groupId ,每次启动都通过SPEL 随机一个出来,确保每次都是一个新的消费组 用于测试 auto.offset.reset 参数 设置为 earliest的情况
     * @param messageMock
     */
    @KafkaListener(topics = TOPIC.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPIC.TOPIC +  "-" + "#{T(java.util.UUID).randomUUID()})")
    public void onMessage(MessageMock messageMock){
        logger.info("【接受到消息][线程:{} 消息内容:{}]", Thread.currentThread().getName(), messageMock);
    }
}


看注释


单元测试

 package com.artisan.springkafka.produceTest;
import com.artisan.springkafka.SpringkafkaApplication;
import com.artisan.springkafka.producer.ArtisanProducerMock;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.support.SendResult;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 22:40
 * @mark: show me the code , change the world
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringkafkaApplication.class)
public class ProduceMockTest {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    private ArtisanProducerMock artisanProducerMock;
    @Test
    public void testSyncSend() throws ExecutionException, InterruptedException {
        SendResult sendResult = artisanProducerMock.sendMsgSync();
        logger.info("testSyncSend Result =  topic:[{}] , partition:[{}], offset:[{}]",
                sendResult.getRecordMetadata().topic(),
                sendResult.getRecordMetadata().partition(),
                sendResult.getRecordMetadata().offset());
        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }
    @Test
    public void testAsynSend() throws ExecutionException, InterruptedException {
            artisanProducerMock.sendMsgASync().addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
                @Override
                public void onFailure(Throwable throwable) {
                    logger.info(" 发送异常{}]]", throwable);
                }
                @Override
                public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
                    logger.info("回调结果 Result =  topic:[{}] , partition:[{}], offset:[{}]",
                            objectObjectSendResult.getRecordMetadata().topic(),
                            objectObjectSendResult.getRecordMetadata().partition(),
                            objectObjectSendResult.getRecordMetadata().offset());
                }
            });
        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }
}


测试

看结果之前,先看看当前topic中的数据


image.png

20210223165332129.png


earliest

earliest 按照预期,新起了一个ConsumerGroup , 肯定会从头消费 ,让我们来验证下

2021-02-23 16:54:38.014  INFO 5684 --- [           main] c.a.s.produceTest.ProduceMockTest        : testSyncSend Result =  topic:[MOCK_TOPIC] , partition:[0], offset:[51]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage9'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=68, name='artisanTestMessage-68'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=48, name='artisanTestMessage-48'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=49, name='artisanTestMessage-49'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=81, name='artisanTestMessage-81'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=60, name='artisanTestMessage-60'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=51, name='artisanTestMessage-51'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=51, name='artisanTestMessage-51'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=60, name='artisanTestMessage-60'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=6, name='artisanTestMessage-6'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=23, name='artisanTestMessage-23'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=37, name='messageSendByAsync-37'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=25, name='messageSendByAsync-25'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=16, name='messageSendByAsync-16'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=57, name='messageSendByAsync-57'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=79, name='messageSendByAsync-79'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=19, name='messageSendByAsync-19'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=42, name='messageSendByAsync-42'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=70, name='messageSendByAsync-70'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=24, name='messageSendByAsync-24'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=84, name='messageSendByAsync-84'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=56, name='messageSendByAsync-56'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=82, name='messageSendByAsync-82'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=25, name='messageSendByAsync-25'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=6, name='messageSendByAsync-6'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=94, name='messageSendByAsync-94'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=20, name='messageSendByAsync-20'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=52, name='messageSendByAsync-52'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=24, name='messageSendByAsync-24'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=32, name='messageSendByAsync-32'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=13, name='messageSendByAsync-13'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=45, name='messageSendByAsync-45'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=29, name='messageSendByAsync-29'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=18, name='messageSendByAsync-18'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=13, name='messageSendByAsync-13'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=57, name='messageSendByAsync-57'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=53, name='messageSendByAsync-53'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=46, name='messageSendByAsync-46'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=33, name='messageSendByAsync-33'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=49, name='artisanTestMessage-49'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=99, name='artisanTestMessage-99'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=83, name='artisanTestMessage-83'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=61, name='artisanTestMessage-61'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=79, name='artisanTestMessage-79'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=99, name='artisanTestMessage-99'}]
2021-02-23 16:54:38.075  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=43, name='artisanTestMessage-43'}]


latest(默认)

2021-02-23 16:55:21.541  INFO 21472 --- [           main] c.a.s.produceTest.ProduceMockTest        : testSyncSend Result =  topic:[MOCK_TOPIC] , partition:[0], offset:[52]
2021-02-23 16:55:21.587  INFO 21472 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=79, name='artisanTestMessage-79'}]


none

2021-02-23 16:55:44.726 ERROR 19956 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-ConsumerGroup-MOCK_TOPIC-d52dc4e5-535f-4151-ad5a-85b7c0488b9d)-1, groupId=ConsumerGroup-MOCK_TOPIC-d52dc4e5-535f-4151-ad5a-85b7c0488b9d)] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on invocation of onPartitionsAssigned for partitions [MOCK_TOPIC-0]
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [MOCK_TOPIC-0]
  at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:671) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2422) [kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1759) [kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1718) [kafka-clients-2.6.0.jar:na]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.seekPartitions(KafkaMessageListenerContainer.java:1031) [spring-kafka-2.6.4.jar:2.6.4]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$3800(KafkaMessageListenerContainer.java:462) [spring-kafka-2.6.4.jar:2.6.4]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener.onPartitionsAssigned(KafkaMessageListenerContainer.java:2631) ~[spring-kafka-2.6.4.jar:2.6.4]
  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293) [kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430) [kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440) [kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) [kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513) [kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268) [kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) [kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) [kafka-clients-2.6.0.jar:na]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1269) [spring-kafka-2.6.4.jar:2.6.4]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1160) [spring-kafka-2.6.4.jar:2.6.4]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1073) [spring-kafka-2.6.4.jar:2.6.4]
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_261]
  at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_261]
  at java.lang.Thread.run(Thread.java:748) [na:1.8.0_261]
2021-02-23 16:55:44.726 ERROR 19956 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : No offset and no reset policy
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [MOCK_TOPIC-0]
  at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:671) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2422) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1759) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1718) ~[kafka-clients-2.6.0.jar:na]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.seekPartitions(KafkaMessageListenerContainer.java:1031) ~[spring-kafka-2.6.4.jar:2.6.4]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$3800(KafkaMessageListenerContainer.java:462) ~[spring-kafka-2.6.4.jar:2.6.4]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener.onPartitionsAssigned(KafkaMessageListenerContainer.java:2631) ~[spring-kafka-2.6.4.jar:2.6.4]
  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.0.jar:na]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1269) ~[spring-kafka-2.6.4.jar:2.6.4]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1160) ~[spring-kafka-2.6.4.jar:2.6.4]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1073) ~[spring-kafka-2.6.4.jar:2.6.4]
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_261]
  at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_261]
  at java.lang.Thread.run(Thread.java:748) [na:1.8.0_261]
2021-02-23 16:55:44.726 ERROR 19956 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Fatal consumer exception; stopping container
2021-02-23 16:55:44.730  INFO 19956 --- [ntainer#0-0-C-1] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService
2021-02-23 16:55:44.749  INFO 19956 --- [           main] c.a.s.produceTest.ProduceMockTest        : testSyncSend Result =  topic:[MOCK_TOPIC] , partition:[0], offset:[53]

exception


20210223165718384.png


后两种设置,看看就行

懂了么,老兄 ~


源码地址

https://github.com/yangshangwei/boot2/tree/master/springkafka_auto.offset.reset


相关文章
|
1月前
|
消息中间件 缓存 关系型数据库
Flink CDC产品常见问题之upsert-kafka增加参数报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
消息中间件 存储 大数据
Apache Kafka: 强大消息队列系统的介绍与使用
Apache Kafka: 强大消息队列系统的介绍与使用
|
3月前
|
消息中间件 Kafka Linux
Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群
Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群
68 0
|
1月前
|
消息中间件 Java Kafka
Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
【2月更文挑战第19天】Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
55 1
|
13天前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【4月更文挑战第17天】本文介绍了在Java环境下使用Apache Kafka进行消息队列处理的方法。Kafka是一个分布式流处理平台,采用发布/订阅模型,支持高效的消息生产和消费。文章详细讲解了Kafka的核心概念,包括主题、生产者和消费者,以及消息的存储和消费流程。此外,还展示了Java代码示例,说明如何创建生产者和消费者。最后,讨论了在高并发场景下的优化策略,如分区、消息压缩和批处理。通过理解和应用这些策略,可以构建高性能的消息系统。
|
19天前
|
消息中间件 存储 运维
Kafka重要配置参数全面解读(重要)
Kafka重要配置参数全面解读(重要)
41 0
|
3月前
|
消息中间件 Java Kafka
Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka
Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka
31 0
|
3月前
|
消息中间件 Java Kafka
Apache Kafka-初体验Kafka(02)-Centos7下搭建单节点kafka_配置参数详解_基本命令实操
Apache Kafka-初体验Kafka(02)-Centos7下搭建单节点kafka_配置参数详解_基本命令实操
61 0
|
1月前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1425 1
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
1月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1367 1
官宣|Apache Flink 1.19 发布公告

推荐镜像

更多