Kafka(四)【Kafka 消费者】(2)https://developer.aliyun.com/article/1532345
4.2、RoundRobin 以及再平衡
1)RoundRobin 分区策略原理
RoundRobin 是针对集群中所有 topic 而言的。它会把所有 topic 的 partition 和所有的 consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 都列给消费者。
2)RoundRobin 分区分配策略案例
// 修改分区分配策略 properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
消费者0 消费了分区 0,3,6
消费者1 消费了分区 2,5
消费者2 消费了分区 1,4
3)RoundRobin 分区分配再平衡案例
(1)停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。
1号消费者:消费到2、5号分区数据
2号消费者:消费到4、1号分区数据
0号消费者的任务会按照RoundRobin的方式,把数据轮询分成0 、6和3号分区数据,分别由1号消费者或者2号消费者消费。
说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。
(2)再次重新发送消息观看结果(45s以后)。
1号消费者:消费到0、2、4、6号分区数据
2号消费者:消费到1、3、5号分区数据
说明:消费者0已经被踢出消费者组,所以重新按照RoundRobin方式分配。
4.3、Sticky 以及再平衡
粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是Kafka从0.11.x版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
粘性分区策略会尽量均匀分配分区并随机分配给每个消费者,比如一共有 0~6 7个分区要分配给3个消费者,那么可能的一种结果就是消费者0:1,4 消费者1:0,3,6 消费者2:2,5
(1)修改分区分配策略为粘性。
注意:3个消费者都应该注释掉,之后重启3个消费者,如果出现报错,全部停止等会再重启,或者修改为全新的消费者组。
// 修改分区分配策略 properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
Sticky 分区分配再平衡案例
(1)停止掉0号消费者(0号消费者消费的是 0,1号分区的数据),快速重新发送消息观看结果(45s以内,越快越好)。
1号消费者:消费到2、5、3号分区数据。
2号消费者:消费到4、6号分区数据。
0号消费者的任务会按照粘性规则,尽可能均衡的随机分成0和1号分区数据,分别由1号消费者或者2号消费者消费。
说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。
(2)再次重新发送消息观看结果(45s以后)。
1号消费者:消费到2、3、5号分区数据。
2号消费者:消费到0、1、4、6号分区数据。
说明:消费者0已经被踢出消费者组,所以重新按照粘性方式分配。
5、offset 位移
5.1、offset 的默认维护位置
__consumer_offsets主题里面采用key和value的方式存储数据。key是group.id+topic+分区号,value就是当前offset的值。每隔一段时间,kafka内部会对这个topic进行compact,也就是 key 保持不变,不断更新value。
1)消费offset案例
(0)思想:__consumer_offsets为Kafka中的topic,那就可以通过消费者进行消费。
(1)在配置文件config/consumer.properties中添加配置exclude.internal.topics=false,默认是true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为false。
(2)采用命令行方式,创建一个新的topic
(3)向刚创建的主题 lyh 中发送数据
(4)启动一个消费者来消费 lyh 主题中的数据
消费数据才会有 offset 生成,同时我们需要指定组 id ,因为如果我们不指定,kafka 默认也会给我们指定一个组id,这样我们就不方便查找了。
(5)查看消费者消费主题 __consumer_offsets
bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
5.2、自动提交 offset
为了使我们能够专注自己的业务逻辑,Kafka 提供了自动提交 offset 的功能。
参数名称 |
描述 |
默认值为true,消费者会自动周期性地向服务器提交偏移量。 |
|
auto.commit.interval.ms |
如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。 |
配置参数:
// 设置为自动提交 默认为true properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); // 设置自动提交间隔 默认5000ms properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
5.3、手动提交 offset
虽然自动提交十分方便,但是由于自动提交的频率通常是固定的,这可能不适应所有场景。如果自动提交的间隔设置得过大,当消费者在自动提交偏移量之前异常退出时,可能会导致 Kafka 未提交偏移量,进而出现重复消费的问题。
所以 Kafka 也提供了基于事件的手动提交,也就是消费完一批数据之后就提交一个 offset,这样就不用像自动提交那样出现一个攒批的过程,就不用担心出现 offset 丢失这种情况了。而手动提交又分为同步提交和异步提交。它俩的相同点是都会将一批数据最高的偏移量提交,不同点是,同步提交会阻塞当前线程,直到提交成功才会继续消费,如果失败会进行重试,但是异步提交消费完数据后不会等待提交完 offset 才消费,也没有失败重试机制,所以可能会出现提交失败。
5.3.1、同步提交offset
由于同步提交offset有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。以下为同步提交offset的示例。
public class CustomConsumerByHand { public static void main(String[] args) { // 1. 创建消费者配置对象 Properties properties = new Properties(); // 2. 给消费者配置对象添加参数 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); // 配置k,v序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 配置消费者组id properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test2"); // 修改分区分配策略 properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor"); // 设置为手动提交 默认为自动提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); // 创建消费者对象 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // 注册要消费的主题 List<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics); // 拉取数据打印 while (true){ // 设置 1s 消费一批数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); // 打印消费到的数据 for (ConsumerRecord<String, String> record : records) { System.out.println(record); } // 手动提交 offset consumer.commitSync();//同步提交 } } }
5.3.2、异步提交offset
虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交offset的方式。
修改为异步提交 offset 只需要修改上面的代码:
consumer.commitAsync();
通常我们用异步发送多一点,因为这样效率高一点。
Kafka(四)【Kafka 消费者】(4)https://developer.aliyun.com/article/1532347