Kafka(四)【Kafka 消费者】(4)

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Kafka(四)【Kafka 消费者】

Kafka(四)【Kafka 消费者】(3)https://developer.aliyun.com/article/1532346

5.4、指定 offset 消费

auto.offset.reset = earliest | latest | none   默认是latest

当Kafka中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

(1)earliest:自动将偏移量重置为最早的偏移量,--from-beginning

(2)latest(默认值):自动将偏移量重置为最新偏移量。

(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。

(4)任意指定offset位移开始消费

public class CustomConsumerSeek {
    public static void main(String[] args) {
        // 1. 创建消费者配置对象
        Properties properties = new Properties();
 
        // 2. 给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        // 配置k,v序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test2");
 
        // 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
 
        // 注册要消费的主题
        consumer.subscribe(Arrays.asList("first"));
 
        // 指定消费位置 offset
        // 获取分区信息 需要时间
        Set<TopicPartition> assignment = consumer.assignment();
 
        // 保证分区分配方案制定完毕
        while (assignment.size() == 0){
            consumer.poll(Duration.ofSeconds(1));
 
            assignment = consumer.assignment();
        }
 
        for (TopicPartition partition : assignment) {
            // 指定从 offset=100 的位置开始消费
            consumer.seek(partition,33);
        }
 
        // 拉取数据打印
        while (true){
            // 设置 1s 消费一批数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
        }
 
    }
}

5.5、指定时间消费

需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?

获取一天前的消息数据:

public class CustomConsumerSeekTime {
    public static void main(String[] args) {
        // 1. 创建消费者配置对象
        Properties properties = new Properties();
 
        // 2. 给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        // 配置k,v序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test2");
 
        // 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
 
        // 注册要消费的主题
        consumer.subscribe(Arrays.asList("first"));
 
        // 指定消费位置 offset
        // 获取分区信息 需要时间
        Set<TopicPartition> assignment = consumer.assignment();
 
        // 保证分区分配方案制定完毕
        while (assignment.size() == 0){
            consumer.poll(Duration.ofSeconds(1));
 
            assignment = consumer.assignment();
        }
 
        // 希望通过时间获得相应的 offset
        HashMap<TopicPartition, Long> map = new HashMap<>();
 
        // 遍历每个分区添加到集合
        for (TopicPartition topicPartition : assignment) {
            map.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
        }
 
        // 通过集合得到 map<分区,offset信息>
        Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(map);
 
        for (TopicPartition partition : assignment) {
            // 指定时间开始消费
            // 把时间转为 offset
            OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(partition);
            consumer.seek(partition,offsetAndTimestamp.offset());
        }
 
        // 拉取数据打印
        while (true){
            // 设置 1s 消费一批数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
        }
 
    }
}

5.6、漏消费和重复消费

重复消费:已经消费了数据,但是offset没提交。

漏消费:先提交offset后消费,有可能会造成数据的漏消费。

  • 重复消费:在自动提交的时候,提交 offset (默认是5s提交一次最大 offset)和消费者是互不影响的,所以提交完 offset 同时,消费者已经又消费了一些大于当前 offset 的数据,所以如果在下一次提交 offset 之前如果消费者挂掉的话,那么这一部分已经被消费的数据由于没有提交 offset 就会被其它消费者重复消费。
  • 漏消费:手动提交的时候,当消费者拿到这个数据的时刻就会提交 offset,但是如果数据在消费者这里还没有被处理就挂机了,那么这个数据就会被漏掉

6、生产经验-消费者事务

       正因为有重复消费和漏消费,所以就引入了消费者事务。就像我们之前学 Flink 容错机制的时候讲的输出端一致性保证时用到的两阶段提交(2PC)我们写入 Kafka 的过程其实是一个两段式的提交处理完毕,得到结果写入 Kafka 是基于事物的“预提交”,等到检查点保存完毕才会提交事务,进行正式提交,如果中间出现故障,事故进行回滚,预提交就会被放弃,恢复状态之后也只能恢复所有已确认提交的操作。

       这里的消费者事务需要下游消费者(比如 Spark、Flink、MySQL)也支持事务才能做到精确一次消费(比如 HBase 就不支持事务),其实我们上面说的 Flink Sink 连接 Kafka 为保证精确一次而提出的两阶段提交、还有 Flink 事务回滚checkpoint恢复,Kafka重置偏移量都是通过事务确保数据精准一次的例子。

7、生产经验-数据积压(消费者如何提高吞吐量)

  • 如果说  Kafka 消费能力不足,则可以考虑增加 topic 的分区数量;并且同时提高消费者组的消费者数量,消费者数 = 分区数(二者缺一不可)
  • 如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压

参数名称

描述

fetch.max.bytes

默认Default: 5242880050 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。

max.poll.records

一次poll拉取数据返回消息的最大条数,默认是500条

回忆之前生产者到 Kafka 提高吞吐量的策略:

  • batch.size :内存队列中每个批次的大小,默认 16K
  • linger.ms:等待时间,修改为 5-100ms
  • compression.type:压缩 snappy
  • RecordAccumulator:缓冲区大小

这里我们又学习了怎么提高 Kafka 到消费者的吞吐量,这两个应该配合起来使用。


总结

       自此,Kafka 的第一遍学习基本上是完成了,之后开学的任务就是在课上把《Kafka 权威指南》看完理解记忆消化,Kafka 是十分重要的内容,需要不断学习加深理解。

相关文章
|
6月前
|
消息中间件 Linux Kafka
linux命令使用消费kafka的生产者、消费者
linux命令使用消费kafka的生产者、消费者
313 16
|
8月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
413 10
|
9月前
|
消息中间件 Kafka
【赵渝强老师】Kafka的消费者与消费者组
Kafka消费者是从Kafka集群中消费数据的客户端。单消费者模型在数据生产速度超过消费速度时会导致数据堆积。为解决此问题,Kafka引入了消费者组的概念,允许多个消费者共同消费同一主题的消息。消费者组由一个或多个消费者组成,它们动态分配和重新分配主题分区,确保消息处理的高效性和可靠性。视频讲解及示意图详细展示了这一机制。
210 1
|
消息中间件 负载均衡 大数据
揭秘Kafka背后的秘密!再均衡如何上演一场消费者组的‘权力游戏’,让消息处理秒变高能剧情?
【8月更文挑战第24天】Kafka是一款在大数据处理领域备受推崇的产品,以其出色的性能和可扩展性著称。本文通过一个具体案例介绍其核心机制之一——再均衡(Rebalancing)。案例中,“user_activity”主题下10个分区被3个消费者均衡消费。当新消费者加入或原有消费者离开时,Kafka将自动触发再均衡过程,确保所有消费者能有效处理分配给它们的分区。
214 62
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
279 58
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
445 2
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
163 1
|
消息中间件 负载均衡 Kafka
【Kafka消费秘籍】深入了解消费者组与独立模式,掌握消息消费的两种超能力!
【8月更文挑战第24天】Apache Kafka是一款高性能的分布式消息系统,支持灵活多样的消费模型以适应不同的应用场景。消息按主题组织,每个主题可划分为多个分区,确保消息顺序性。本文深入探讨了Kafka中的两大核心消费模式:消费者组(Consumer Group)和独立消费者(Standalone Consumer)。消费者组允许多个消费者协同工作,实现负载均衡及故障恢复,是最常用的消费模式。独立消费者模式则适用于需要高度定制化处理逻辑的场景,如消息重放等。通过对比这两种模式的特点和提供的示例代码,开发者可以根据具体需求选择最合适的消费策略,从而更好地利用Kafka构建高效的数据流应用程序。
597 3
|
消息中间件 存储 负载均衡
深入理解Kafka核心设计及原理(三):消费者
深入理解Kafka核心设计及原理(三):消费者
337 8
|
图形学 C# 开发者
全面掌握Unity游戏开发核心技术:C#脚本编程从入门到精通——详解生命周期方法、事件处理与面向对象设计,助你打造高效稳定的互动娱乐体验
【8月更文挑战第31天】Unity 是一款强大的游戏开发平台,支持多种编程语言,其中 C# 最为常用。本文介绍 C# 在 Unity 中的应用,涵盖脚本生命周期、常用函数、事件处理及面向对象编程等核心概念。通过具体示例,展示如何编写有效的 C# 脚本,包括 Start、Update 和 LateUpdate 等生命周期方法,以及碰撞检测和类继承等高级技巧,帮助开发者掌握 Unity 脚本编程基础,提升游戏开发效率。
663 0

热门文章

最新文章