Kafka(四)【Kafka 消费者】(2)

简介: Kafka(四)【Kafka 消费者】

Kafka(四)【Kafka 消费者】(1)https://developer.aliyun.com/article/1532344

3、消费者 API

我们分三部分来实践消费者 API,一种是用消费者来消费一个主题(一个消费者消费多个分区),另一种是用一个消费者来只消费一个分区,最后一种是用一个消费者组来消费(也就是消费者组内的每个消费者消费一个分区)。

使用 API 的注意事项:

注意:在消费者API代码中必须配置消费者组id。命令行启动消费者不填写消费者组id会被自动填写随机的消费者组id。

不管有没有消费者组,都需要配置消费者id!(因为独立消费者相当于特殊的消费者组,也就是相同消费者组 id 的消费者只有一个)

3.1、独立消费者案例(订阅主题)

public class CustomConsumer {
    public static void main(String[] args) {
        // 1. 创建消费者配置对象
        Properties properties = new Properties();
 
        // 2. 给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        // 配置k,v反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        
        // 配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
 
        // 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
 
        // 注册要消费的主题
        List<String> topics = new ArrayList<>();
        topics.add("first");
        consumer.subscribe(topics);
 
        // 拉取数据打印
        while (true){
            // 设置 1s 消费一批数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
        }
 
    }
}

注意:消费者这里是反序列化!

测试:

在 hadoop102 生产数据:

bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first

记得修改分区数为 3:

在 IDEA 消费数据:

3.2、独立消费者案例(订阅分区)

只需要稍微修改一下上面的代码;

List<TopicPartition> topics = new ArrayList<>();
topics.add(new TopicPartition("first",0));
consumer.assign(topics);

可以看到,直接消费主题中所有分区时,我们直接传入一个主题名称即可,而指定消费主题的特定分区时,需要传入一个或多个 TopicPartition 对象。

这次我们使用带回调的生产者来生产消息:

public class CustomProducerCallback {
 
    public static void main(String[] args) throws InterruptedException {
 
        Properties properties = new Properties();
        // 连接集群 bootstrap.servers 多写几个主机地址 防止一个客户端挂掉
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
 
        // 指定对应的 key 和 value 的序列化类型 key.serialize
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
 
        // 1. 创建 Kafka 生产者对象
        // 需要指定键值的类型
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
 
        // 2. 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("like", "test" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){ // 如果异常为空 说明正常执行
                        System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
                    }
                }
            });
            // 确保数据发往不同的分区
            Thread.sleep(2);
        }
 
        // 3. 关闭资源
        kafkaProducer.close();
    }
}

可以看到我们共往0号分区发了 2 条消息:

观察消费者窗口:

可以看到,消费者只消费到了我们指定的分区数据。

3.3、消费者组案例

      要实现消费者组很简单,我们直接复制上面 3.1 中独立消费者代码为 CustomConsumer1,让 CustomConsumer1 去消费分区1的数据,这样两个 main 方法同时执行就实现相当于两个消费者同时消费了。

我们继续使用上面带回调函数的生产者:

可以看到生产者往主题中发送了 5 条数据,我们观察消费者:

可以看到,消费者0 接收了 0 号分区,而消费者 1 接收了 1号和2号分区的数据。

4、生产经验-分区的分配以及再平衡

4.1、Range 以及再平衡

1)Range 分区策略原理

一个主题有多个分区,而一个消费者组有多个消费者,那么每个消费者消费哪一个分区呢?

目前,Kafka 有 4 种主流的分区分配策略:Range、RoundRobin、Sticky、CooperativeStick(Kafka 3.0 新特性)。可以通过配置参数 partition.assignment.strategy ,修改分区的分配策略。默认策略是 Range + CooperativeStick。Kafka 可以同时使用多个分配策略。

参数名称

描述

heartbeat.interval.ms

Kafka消费者和coordinator之间的心跳时间,默认3s。

该条目的值必须小于 session.timeout.ms,也不应该高于 session.timeout.ms 的1/3。

session.timeout.ms

Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。

max.poll.interval.ms

消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。

partition.assignment.strategy

消费者分区分配策略,默认策略是Range +  CooperativeSticky。Kafka可以同时使用多个分区分配策略。可以选择的策略包括:Range、RoundRobin、Sticky、CooperativeSticky

2)Range 分区分配策略案例

Range 是针对每个 topic 而言的。

  • 首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序
  • 通过 partition数/consumer数 来决定每个消费者应该消费几个分区。如果除不尽,那么前几个消费者会多消费几个分区。

比如上面的 topic 一共 7 个分区,我们的消费者组有 3 个消费者,7/3=2,7%3=1,多 1 个分区没人处理,于是交给消费者0处理。

注意:这种方式容易造成数据倾斜!因为,如果我们有多个 topic 由这一个消费者组来消费,那么每个 topic 如果都把剩余的分区交给前面的消费者,那么我们前面的消费者和后面的消费者的压力差距就会特别大。所以,这种方式只适合于 topic 较少的情况。

1. 我们修改上面创建过的主题 first 的分区数为 7 。

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 7

注意:分区数只能增加,不能减少。

2. 复制CustomConsumer类,创建CustomConsumer2。这样可以由三个消费者CustomConsumer、CustomConsumer1、CustomConsumer2组成消费者组,组名都为“test”,同时启动3个消费者。

3. 启动CustomProducer生产者,发送 7 条消息,发送到不同的分区。

public class CustomProducer {
    public static void main(String[] args) throws InterruptedException {
 
        Properties properties = new Properties();
 
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
 
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
 
        for (int i = 0; i < 7; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", i, "test", "lyh"));
        }
 
        kafkaProducer.close();
    }
}

说明:Kafka默认的分区分配策略就是Range + CooperativeSticky,所以不需要修改策略。

观查3个消费者分别消费哪些分区的数据:

消费者0 消费了 0、1、2号分区的数据

消费者1 消费了 5、6 号数据

消费者 2 消费了 3、4号数据

3)Range 分区分配再平衡案例

1. 停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。

       1号消费者:消费到3、4号分区数据。

       2号消费者:消费到5、6号分区数据。

       0号消费者的任务会整体被分配到1号消费者或者2号消费者。

说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。

2. 再次重新发送消息观看结果(45s以后)。

       1号消费者:消费到0、1、2、3号分区数据。

       2号消费者:消费到4、5、6号分区数据。

说明:消费者0已经被踢出消费者组,所以重新按照range方式分配。

Kafka(四)【Kafka 消费者】(3)https://developer.aliyun.com/article/1532346

相关文章
|
1月前
|
消息中间件 负载均衡 大数据
揭秘Kafka背后的秘密!再均衡如何上演一场消费者组的‘权力游戏’,让消息处理秒变高能剧情?
【8月更文挑战第24天】Kafka是一款在大数据处理领域备受推崇的产品,以其出色的性能和可扩展性著称。本文通过一个具体案例介绍其核心机制之一——再均衡(Rebalancing)。案例中,“user_activity”主题下10个分区被3个消费者均衡消费。当新消费者加入或原有消费者离开时,Kafka将自动触发再均衡过程,确保所有消费者能有效处理分配给它们的分区。
124 62
|
1月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
113 58
|
3月前
|
消息中间件 分布式计算 Kafka
Kafka(四)【Kafka 消费者】(4)
Kafka(四)【Kafka 消费者】
|
1月前
|
消息中间件 负载均衡 Kafka
【Kafka消费秘籍】深入了解消费者组与独立模式,掌握消息消费的两种超能力!
【8月更文挑战第24天】Apache Kafka是一款高性能的分布式消息系统,支持灵活多样的消费模型以适应不同的应用场景。消息按主题组织,每个主题可划分为多个分区,确保消息顺序性。本文深入探讨了Kafka中的两大核心消费模式:消费者组(Consumer Group)和独立消费者(Standalone Consumer)。消费者组允许多个消费者协同工作,实现负载均衡及故障恢复,是最常用的消费模式。独立消费者模式则适用于需要高度定制化处理逻辑的场景,如消息重放等。通过对比这两种模式的特点和提供的示例代码,开发者可以根据具体需求选择最合适的消费策略,从而更好地利用Kafka构建高效的数据流应用程序。
36 3
|
28天前
|
图形学 C# 开发者
全面掌握Unity游戏开发核心技术:C#脚本编程从入门到精通——详解生命周期方法、事件处理与面向对象设计,助你打造高效稳定的互动娱乐体验
【8月更文挑战第31天】Unity 是一款强大的游戏开发平台,支持多种编程语言,其中 C# 最为常用。本文介绍 C# 在 Unity 中的应用,涵盖脚本生命周期、常用函数、事件处理及面向对象编程等核心概念。通过具体示例,展示如何编写有效的 C# 脚本,包括 Start、Update 和 LateUpdate 等生命周期方法,以及碰撞检测和类继承等高级技巧,帮助开发者掌握 Unity 脚本编程基础,提升游戏开发效率。
38 0
|
2月前
|
消息中间件 存储 负载均衡
深入理解Kafka核心设计及原理(三):消费者
深入理解Kafka核心设计及原理(三):消费者
69 8
|
2月前
|
消息中间件 存储 Kafka
面试题Kafka问题之Kafka的消费者(Consumer)跟踪消息如何解决
面试题Kafka问题之Kafka的消费者(Consumer)跟踪消息如何解决
48 0
|
2月前
|
消息中间件 存储 资源调度
实时计算 Flink版产品使用问题之在消费Kafka的Avro消息,如何配置FlinkKafka消费者的相关参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 Kafka
Kafka生产者和消费者相关命令行操作
Kafka生产者和消费者相关命令行操作
75 1
|
2月前
|
消息中间件 Java Kafka
使用Java编写Kafka生产者和消费者示例
使用Java编写Kafka生产者和消费者示例
45 0