Kafka(四)【Kafka 消费者】(3)

简介: Kafka(四)【Kafka 消费者】

Kafka(四)【Kafka 消费者】(2)https://developer.aliyun.com/article/1532345

4.2、RoundRobin 以及再平衡

1)RoundRobin 分区策略原理

RoundRobin 是针对集群中所有 topic 而言的。它会把所有 topic 的 partition 和所有的 consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 都列给消费者。

2)RoundRobin 分区分配策略案例
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");  

消费者0 消费了分区 0,3,6

消费者1 消费了分区 2,5

消费者2 消费了分区 1,4

3)RoundRobin 分区分配再平衡案例

(1)停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。

1号消费者:消费到2、5号分区数据

2号消费者:消费到4、1号分区数据

0号消费者的任务会按照RoundRobin的方式,把数据轮询分成0 、6和3号分区数据,分别由1号消费者或者2号消费者消费。

说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。

(2)再次重新发送消息观看结果(45s以后)。

1号消费者:消费到0、2、4、6号分区数据

2号消费者:消费到1、3、5号分区数据

说明:消费者0已经被踢出消费者组,所以重新按照RoundRobin方式分配。

4.3、Sticky 以及再平衡

       粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。

       粘性分区是Kafka从0.11.x版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。

       粘性分区策略会尽量均匀分配分区并随机分配给每个消费者,比如一共有 0~6 7个分区要分配给3个消费者,那么可能的一种结果就是消费者0:1,4 消费者1:0,3,6 消费者2:2,5

(1)修改分区分配策略为粘性。

注意:3个消费者都应该注释掉,之后重启3个消费者,如果出现报错,全部停止等会再重启,或者修改为全新的消费者组。

 // 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
Sticky 分区分配再平衡案例

(1)停止掉0号消费者(0号消费者消费的是 0,1号分区的数据),快速重新发送消息观看结果(45s以内,越快越好)。

       1号消费者:消费到2、5、3号分区数据。

       2号消费者:消费到4、6号分区数据。

       0号消费者的任务会按照粘性规则,尽可能均衡的随机分成0和1号分区数据,分别由1号消费者或者2号消费者消费。

说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。

(2)再次重新发送消息观看结果(45s以后)。

       1号消费者:消费到2、3、5号分区数据。

       2号消费者:消费到0、1、4、6号分区数据。

说明:消费者0已经被踢出消费者组,所以重新按照粘性方式分配。

5、offset 位移

5.1、offset 的默认维护位置

__consumer_offsets主题里面采用key和value的方式存储数据。key是group.id+topic+分区号,value就是当前offset的值。每隔一段时间,kafka内部会对这个topic进行compact,也就是 key 保持不变,不断更新value。

1)消费offset案例

(0)思想:__consumer_offsets为Kafka中的topic,那就可以通过消费者进行消费。

(1)在配置文件config/consumer.properties中添加配置exclude.internal.topics=false,默认是true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为false。

(2)采用命令行方式,创建一个新的topic

(3)向刚创建的主题 lyh 中发送数据

(4)启动一个消费者来消费 lyh 主题中的数据

消费数据才会有 offset 生成,同时我们需要指定组 id ,因为如果我们不指定,kafka 默认也会给我们指定一个组id,这样我们就不方便查找了。

(5)查看消费者消费主题 __consumer_offsets

bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server  hadoop102:9092 --consumer.config config/consumer.properties  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

5.2、自动提交 offset

为了使我们能够专注自己的业务逻辑,Kafka 提供了自动提交 offset 的功能。

参数名称

描述

enable.auto.commit

默认值为true,消费者会自动周期性地向服务器提交偏移量。

auto.commit.interval.ms

如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。

配置参数:

// 设置为自动提交 默认为true
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        // 设置自动提交间隔 默认5000ms
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);

5.3、手动提交 offset

       虽然自动提交十分方便,但是由于自动提交的频率通常是固定的,这可能不适应所有场景。如果自动提交的间隔设置得过大,当消费者在自动提交偏移量之前异常退出时,可能会导致 Kafka 未提交偏移量,进而出现重复消费的问题。

       所以 Kafka 也提供了基于事件的手动提交,也就是消费完一批数据之后就提交一个 offset,这样就不用像自动提交那样出现一个攒批的过程,就不用担心出现 offset 丢失这种情况了。而手动提交又分为同步提交异步提交。它俩的相同点是都会将一批数据最高的偏移量提交,不同点是,同步提交会阻塞当前线程,直到提交成功才会继续消费,如果失败会进行重试,但是异步提交消费完数据后不会等待提交完 offset 才消费,也没有失败重试机制,所以可能会出现提交失败

5.3.1、同步提交offset

由于同步提交offset有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。以下为同步提交offset的示例。

public class CustomConsumerByHand {
    public static void main(String[] args) {
        // 1. 创建消费者配置对象
        Properties properties = new Properties();
 
        // 2. 给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        // 配置k,v序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test2");
        // 修改分区分配策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
 
        // 设置为手动提交 默认为自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
 
        // 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
 
        // 注册要消费的主题
        List<String> topics = new ArrayList<>();
        topics.add("first");
        consumer.subscribe(topics);
 
        // 拉取数据打印
        while (true){
            // 设置 1s 消费一批数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
            // 手动提交 offset
            consumer.commitSync();//同步提交
        }
 
    }
}
5.3.2、异步提交offset

虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交offset的方式。

修改为异步提交 offset 只需要修改上面的代码:

consumer.commitAsync();

通常我们用异步发送多一点,因为这样效率高一点。

Kafka(四)【Kafka 消费者】(4)https://developer.aliyun.com/article/1532347

相关文章
|
13天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
44 2
|
3月前
|
消息中间件 负载均衡 大数据
揭秘Kafka背后的秘密!再均衡如何上演一场消费者组的‘权力游戏’,让消息处理秒变高能剧情?
【8月更文挑战第24天】Kafka是一款在大数据处理领域备受推崇的产品,以其出色的性能和可扩展性著称。本文通过一个具体案例介绍其核心机制之一——再均衡(Rebalancing)。案例中,“user_activity”主题下10个分区被3个消费者均衡消费。当新消费者加入或原有消费者离开时,Kafka将自动触发再均衡过程,确保所有消费者能有效处理分配给它们的分区。
134 62
|
3月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
129 58
|
1月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
32 1
|
5月前
|
消息中间件 分布式计算 Kafka
Kafka(四)【Kafka 消费者】(4)
Kafka(四)【Kafka 消费者】
|
3月前
|
消息中间件 负载均衡 Kafka
【Kafka消费秘籍】深入了解消费者组与独立模式,掌握消息消费的两种超能力!
【8月更文挑战第24天】Apache Kafka是一款高性能的分布式消息系统,支持灵活多样的消费模型以适应不同的应用场景。消息按主题组织,每个主题可划分为多个分区,确保消息顺序性。本文深入探讨了Kafka中的两大核心消费模式:消费者组(Consumer Group)和独立消费者(Standalone Consumer)。消费者组允许多个消费者协同工作,实现负载均衡及故障恢复,是最常用的消费模式。独立消费者模式则适用于需要高度定制化处理逻辑的场景,如消息重放等。通过对比这两种模式的特点和提供的示例代码,开发者可以根据具体需求选择最合适的消费策略,从而更好地利用Kafka构建高效的数据流应用程序。
78 3
|
3月前
|
图形学 C# 开发者
全面掌握Unity游戏开发核心技术:C#脚本编程从入门到精通——详解生命周期方法、事件处理与面向对象设计,助你打造高效稳定的互动娱乐体验
【8月更文挑战第31天】Unity 是一款强大的游戏开发平台,支持多种编程语言,其中 C# 最为常用。本文介绍 C# 在 Unity 中的应用,涵盖脚本生命周期、常用函数、事件处理及面向对象编程等核心概念。通过具体示例,展示如何编写有效的 C# 脚本,包括 Start、Update 和 LateUpdate 等生命周期方法,以及碰撞检测和类继承等高级技巧,帮助开发者掌握 Unity 脚本编程基础,提升游戏开发效率。
75 0
|
4月前
|
消息中间件 存储 负载均衡
深入理解Kafka核心设计及原理(三):消费者
深入理解Kafka核心设计及原理(三):消费者
89 8
|
4月前
|
消息中间件 存储 Kafka
面试题Kafka问题之Kafka的消费者(Consumer)跟踪消息如何解决
面试题Kafka问题之Kafka的消费者(Consumer)跟踪消息如何解决
57 0
|
4月前
|
消息中间件 存储 资源调度
实时计算 Flink版产品使用问题之在消费Kafka的Avro消息,如何配置FlinkKafka消费者的相关参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
下一篇
无影云桌面