Kafka(四)【Kafka 消费者】(3)

简介: Kafka(四)【Kafka 消费者】

Kafka(四)【Kafka 消费者】(2)https://developer.aliyun.com/article/1532345

4.2、RoundRobin 以及再平衡

1)RoundRobin 分区策略原理

RoundRobin 是针对集群中所有 topic 而言的。它会把所有 topic 的 partition 和所有的 consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 都列给消费者。

2)RoundRobin 分区分配策略案例
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");  

消费者0 消费了分区 0,3,6

消费者1 消费了分区 2,5

消费者2 消费了分区 1,4

3)RoundRobin 分区分配再平衡案例

(1)停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。

1号消费者:消费到2、5号分区数据

2号消费者:消费到4、1号分区数据

0号消费者的任务会按照RoundRobin的方式,把数据轮询分成0 、6和3号分区数据,分别由1号消费者或者2号消费者消费。

说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。

(2)再次重新发送消息观看结果(45s以后)。

1号消费者:消费到0、2、4、6号分区数据

2号消费者:消费到1、3、5号分区数据

说明:消费者0已经被踢出消费者组,所以重新按照RoundRobin方式分配。

4.3、Sticky 以及再平衡

       粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。

       粘性分区是Kafka从0.11.x版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。

       粘性分区策略会尽量均匀分配分区并随机分配给每个消费者,比如一共有 0~6 7个分区要分配给3个消费者,那么可能的一种结果就是消费者0:1,4 消费者1:0,3,6 消费者2:2,5

(1)修改分区分配策略为粘性。

注意:3个消费者都应该注释掉,之后重启3个消费者,如果出现报错,全部停止等会再重启,或者修改为全新的消费者组。

 // 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
Sticky 分区分配再平衡案例

(1)停止掉0号消费者(0号消费者消费的是 0,1号分区的数据),快速重新发送消息观看结果(45s以内,越快越好)。

       1号消费者:消费到2、5、3号分区数据。

       2号消费者:消费到4、6号分区数据。

       0号消费者的任务会按照粘性规则,尽可能均衡的随机分成0和1号分区数据,分别由1号消费者或者2号消费者消费。

说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。

(2)再次重新发送消息观看结果(45s以后)。

       1号消费者:消费到2、3、5号分区数据。

       2号消费者:消费到0、1、4、6号分区数据。

说明:消费者0已经被踢出消费者组,所以重新按照粘性方式分配。

5、offset 位移

5.1、offset 的默认维护位置

__consumer_offsets主题里面采用key和value的方式存储数据。key是group.id+topic+分区号,value就是当前offset的值。每隔一段时间,kafka内部会对这个topic进行compact,也就是 key 保持不变,不断更新value。

1)消费offset案例

(0)思想:__consumer_offsets为Kafka中的topic,那就可以通过消费者进行消费。

(1)在配置文件config/consumer.properties中添加配置exclude.internal.topics=false,默认是true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为false。

(2)采用命令行方式,创建一个新的topic

(3)向刚创建的主题 lyh 中发送数据

(4)启动一个消费者来消费 lyh 主题中的数据

消费数据才会有 offset 生成,同时我们需要指定组 id ,因为如果我们不指定,kafka 默认也会给我们指定一个组id,这样我们就不方便查找了。

(5)查看消费者消费主题 __consumer_offsets

bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server  hadoop102:9092 --consumer.config config/consumer.properties  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

5.2、自动提交 offset

为了使我们能够专注自己的业务逻辑,Kafka 提供了自动提交 offset 的功能。

参数名称

描述

enable.auto.commit

默认值为true,消费者会自动周期性地向服务器提交偏移量。

auto.commit.interval.ms

如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。

配置参数:

// 设置为自动提交 默认为true
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        // 设置自动提交间隔 默认5000ms
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);

5.3、手动提交 offset

       虽然自动提交十分方便,但是由于自动提交的频率通常是固定的,这可能不适应所有场景。如果自动提交的间隔设置得过大,当消费者在自动提交偏移量之前异常退出时,可能会导致 Kafka 未提交偏移量,进而出现重复消费的问题。

       所以 Kafka 也提供了基于事件的手动提交,也就是消费完一批数据之后就提交一个 offset,这样就不用像自动提交那样出现一个攒批的过程,就不用担心出现 offset 丢失这种情况了。而手动提交又分为同步提交异步提交。它俩的相同点是都会将一批数据最高的偏移量提交,不同点是,同步提交会阻塞当前线程,直到提交成功才会继续消费,如果失败会进行重试,但是异步提交消费完数据后不会等待提交完 offset 才消费,也没有失败重试机制,所以可能会出现提交失败

5.3.1、同步提交offset

由于同步提交offset有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。以下为同步提交offset的示例。

public class CustomConsumerByHand {
    public static void main(String[] args) {
        // 1. 创建消费者配置对象
        Properties properties = new Properties();
 
        // 2. 给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        // 配置k,v序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test2");
        // 修改分区分配策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
 
        // 设置为手动提交 默认为自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
 
        // 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
 
        // 注册要消费的主题
        List<String> topics = new ArrayList<>();
        topics.add("first");
        consumer.subscribe(topics);
 
        // 拉取数据打印
        while (true){
            // 设置 1s 消费一批数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
            // 手动提交 offset
            consumer.commitSync();//同步提交
        }
 
    }
}
5.3.2、异步提交offset

虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交offset的方式。

修改为异步提交 offset 只需要修改上面的代码:

consumer.commitAsync();

通常我们用异步发送多一点,因为这样效率高一点。

Kafka(四)【Kafka 消费者】(4)https://developer.aliyun.com/article/1532347

相关文章
|
1月前
|
消息中间件 负载均衡 Kafka
【Kafka面试演练】那Kafka消费者手动提交、自动提交有什么区别?
嗯嗯Ok。分区的作用主要就是为了提高Kafka处理消息吞吐量。每一个topic会被分为多个分区。假如同一个topic下有n个分区、n个消费者,这样的话每个分区就会发送消息给对应的一个消费者,这样n个消费者负载均衡地处理消息。同时生产者会发送消息给不同分区,每个分区分给不同的brocker处理,让集群平坦压力,这样大大提高了Kafka的吞吐量。面试官思考中…
122 4
|
1月前
|
消息中间件 安全 Kafka
深度解析Kafka中消费者的奥秘
深度解析Kafka中消费者的奥秘
54 0
|
1月前
|
消息中间件 Java Kafka
关于kafka消费者超时配置
关于kafka消费者超时配置
175 2
|
1月前
|
消息中间件 分布式计算 Java
探究Kafka原理-3.生产者消费者API原理解析(上)
探究Kafka原理-3.生产者消费者API原理解析
48 0
|
21天前
|
消息中间件 分布式计算 Kafka
Kafka(四)【Kafka 消费者】(4)
Kafka(四)【Kafka 消费者】
|
15天前
|
消息中间件 Kafka
Kafka生产者和消费者相关命令行操作
Kafka生产者和消费者相关命令行操作
18 1
|
24天前
|
消息中间件 Java Kafka
springboot整合kafka消费者最佳实践
springboot整合kafka消费者最佳实践
56 1
|
24天前
|
消息中间件 负载均衡 监控
Kafka消费者:监听模式VS主动拉取,哪种更适合你?
Kafka消费者:监听模式VS主动拉取,哪种更适合你?
17 1
|
21天前
|
消息中间件 Kafka API
Kafka(四)【Kafka 消费者】(2)
Kafka(四)【Kafka 消费者】
|
21天前
|
消息中间件 存储 Java
Kafka(四)【Kafka 消费者】(1)
Kafka(四)【Kafka 消费者】

热门文章

最新文章