Kafka(四)【Kafka 消费者】(4)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Kafka(四)【Kafka 消费者】

Kafka(四)【Kafka 消费者】(3)https://developer.aliyun.com/article/1532346

5.4、指定 offset 消费

auto.offset.reset = earliest | latest | none   默认是latest

当Kafka中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

(1)earliest:自动将偏移量重置为最早的偏移量,--from-beginning

(2)latest(默认值):自动将偏移量重置为最新偏移量。

(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。

(4)任意指定offset位移开始消费

public class CustomConsumerSeek {
    public static void main(String[] args) {
        // 1. 创建消费者配置对象
        Properties properties = new Properties();
 
        // 2. 给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        // 配置k,v序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test2");
 
        // 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
 
        // 注册要消费的主题
        consumer.subscribe(Arrays.asList("first"));
 
        // 指定消费位置 offset
        // 获取分区信息 需要时间
        Set<TopicPartition> assignment = consumer.assignment();
 
        // 保证分区分配方案制定完毕
        while (assignment.size() == 0){
            consumer.poll(Duration.ofSeconds(1));
 
            assignment = consumer.assignment();
        }
 
        for (TopicPartition partition : assignment) {
            // 指定从 offset=100 的位置开始消费
            consumer.seek(partition,33);
        }
 
        // 拉取数据打印
        while (true){
            // 设置 1s 消费一批数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
        }
 
    }
}

5.5、指定时间消费

需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?

获取一天前的消息数据:

public class CustomConsumerSeekTime {
    public static void main(String[] args) {
        // 1. 创建消费者配置对象
        Properties properties = new Properties();
 
        // 2. 给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        // 配置k,v序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test2");
 
        // 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
 
        // 注册要消费的主题
        consumer.subscribe(Arrays.asList("first"));
 
        // 指定消费位置 offset
        // 获取分区信息 需要时间
        Set<TopicPartition> assignment = consumer.assignment();
 
        // 保证分区分配方案制定完毕
        while (assignment.size() == 0){
            consumer.poll(Duration.ofSeconds(1));
 
            assignment = consumer.assignment();
        }
 
        // 希望通过时间获得相应的 offset
        HashMap<TopicPartition, Long> map = new HashMap<>();
 
        // 遍历每个分区添加到集合
        for (TopicPartition topicPartition : assignment) {
            map.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
        }
 
        // 通过集合得到 map<分区,offset信息>
        Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(map);
 
        for (TopicPartition partition : assignment) {
            // 指定时间开始消费
            // 把时间转为 offset
            OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(partition);
            consumer.seek(partition,offsetAndTimestamp.offset());
        }
 
        // 拉取数据打印
        while (true){
            // 设置 1s 消费一批数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
        }
 
    }
}

5.6、漏消费和重复消费

重复消费:已经消费了数据,但是offset没提交。

漏消费:先提交offset后消费,有可能会造成数据的漏消费。

  • 重复消费:在自动提交的时候,提交 offset (默认是5s提交一次最大 offset)和消费者是互不影响的,所以提交完 offset 同时,消费者已经又消费了一些大于当前 offset 的数据,所以如果在下一次提交 offset 之前如果消费者挂掉的话,那么这一部分已经被消费的数据由于没有提交 offset 就会被其它消费者重复消费。
  • 漏消费:手动提交的时候,当消费者拿到这个数据的时刻就会提交 offset,但是如果数据在消费者这里还没有被处理就挂机了,那么这个数据就会被漏掉

6、生产经验-消费者事务

       正因为有重复消费和漏消费,所以就引入了消费者事务。就像我们之前学 Flink 容错机制的时候讲的输出端一致性保证时用到的两阶段提交(2PC)我们写入 Kafka 的过程其实是一个两段式的提交处理完毕,得到结果写入 Kafka 是基于事物的“预提交”,等到检查点保存完毕才会提交事务,进行正式提交,如果中间出现故障,事故进行回滚,预提交就会被放弃,恢复状态之后也只能恢复所有已确认提交的操作。

       这里的消费者事务需要下游消费者(比如 Spark、Flink、MySQL)也支持事务才能做到精确一次消费(比如 HBase 就不支持事务),其实我们上面说的 Flink Sink 连接 Kafka 为保证精确一次而提出的两阶段提交、还有 Flink 事务回滚checkpoint恢复,Kafka重置偏移量都是通过事务确保数据精准一次的例子。

7、生产经验-数据积压(消费者如何提高吞吐量)

  • 如果说  Kafka 消费能力不足,则可以考虑增加 topic 的分区数量;并且同时提高消费者组的消费者数量,消费者数 = 分区数(二者缺一不可)
  • 如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压

参数名称

描述

fetch.max.bytes

默认Default: 5242880050 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。

max.poll.records

一次poll拉取数据返回消息的最大条数,默认是500条

回忆之前生产者到 Kafka 提高吞吐量的策略:

  • batch.size :内存队列中每个批次的大小,默认 16K
  • linger.ms:等待时间,修改为 5-100ms
  • compression.type:压缩 snappy
  • RecordAccumulator:缓冲区大小

这里我们又学习了怎么提高 Kafka 到消费者的吞吐量,这两个应该配合起来使用。


总结

       自此,Kafka 的第一遍学习基本上是完成了,之后开学的任务就是在课上把《Kafka 权威指南》看完理解记忆消化,Kafka 是十分重要的内容,需要不断学习加深理解。

相关文章
|
1月前
|
消息中间件 负载均衡 Kafka
【Kafka面试演练】那Kafka消费者手动提交、自动提交有什么区别?
嗯嗯Ok。分区的作用主要就是为了提高Kafka处理消息吞吐量。每一个topic会被分为多个分区。假如同一个topic下有n个分区、n个消费者,这样的话每个分区就会发送消息给对应的一个消费者,这样n个消费者负载均衡地处理消息。同时生产者会发送消息给不同分区,每个分区分给不同的brocker处理,让集群平坦压力,这样大大提高了Kafka的吞吐量。面试官思考中…
122 4
|
1月前
|
消息中间件 安全 Kafka
深度解析Kafka中消费者的奥秘
深度解析Kafka中消费者的奥秘
54 0
|
1月前
|
消息中间件 Java Kafka
关于kafka消费者超时配置
关于kafka消费者超时配置
175 2
|
1月前
|
消息中间件 分布式计算 Java
探究Kafka原理-3.生产者消费者API原理解析(上)
探究Kafka原理-3.生产者消费者API原理解析
48 0
|
15天前
|
消息中间件 Kafka
Kafka生产者和消费者相关命令行操作
Kafka生产者和消费者相关命令行操作
18 1
|
24天前
|
消息中间件 Java Kafka
springboot整合kafka消费者最佳实践
springboot整合kafka消费者最佳实践
56 1
|
24天前
|
消息中间件 负载均衡 监控
Kafka消费者:监听模式VS主动拉取,哪种更适合你?
Kafka消费者:监听模式VS主动拉取,哪种更适合你?
17 1
|
21天前
|
消息中间件 存储 算法
Kafka(四)【Kafka 消费者】(3)
Kafka(四)【Kafka 消费者】
|
21天前
|
消息中间件 Kafka API
Kafka(四)【Kafka 消费者】(2)
Kafka(四)【Kafka 消费者】
|
21天前
|
消息中间件 存储 Java
Kafka(四)【Kafka 消费者】(1)
Kafka(四)【Kafka 消费者】

热门文章

最新文章