Kafka(四)【Kafka 消费者】(1)https://developer.aliyun.com/article/1532344
3、消费者 API
我们分三部分来实践消费者 API,一种是用消费者来消费一个主题(一个消费者消费多个分区),另一种是用一个消费者来只消费一个分区,最后一种是用一个消费者组来消费(也就是消费者组内的每个消费者消费一个分区)。
使用 API 的注意事项:
注意:在消费者API代码中必须配置消费者组id。命令行启动消费者不填写消费者组id会被自动填写随机的消费者组id。
不管有没有消费者组,都需要配置消费者id!(因为独立消费者相当于特殊的消费者组,也就是相同消费者组 id 的消费者只有一个)
3.1、独立消费者案例(订阅主题)
public class CustomConsumer { public static void main(String[] args) { // 1. 创建消费者配置对象 Properties properties = new Properties(); // 2. 给消费者配置对象添加参数 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); // 配置k,v反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 配置消费者组id properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); // 创建消费者对象 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // 注册要消费的主题 List<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics); // 拉取数据打印 while (true){ // 设置 1s 消费一批数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); // 打印消费到的数据 for (ConsumerRecord<String, String> record : records) { System.out.println(record); } } } }
注意:消费者这里是反序列化!
测试:
在 hadoop102 生产数据:
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
记得修改分区数为 3:
在 IDEA 消费数据:
3.2、独立消费者案例(订阅分区)
只需要稍微修改一下上面的代码;
List<TopicPartition> topics = new ArrayList<>(); topics.add(new TopicPartition("first",0)); consumer.assign(topics);
可以看到,直接消费主题中所有分区时,我们直接传入一个主题名称即可,而指定消费主题的特定分区时,需要传入一个或多个 TopicPartition 对象。
这次我们使用带回调的生产者来生产消息:
public class CustomProducerCallback { public static void main(String[] args) throws InterruptedException { Properties properties = new Properties(); // 连接集群 bootstrap.servers 多写几个主机地址 防止一个客户端挂掉 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092"); // 指定对应的 key 和 value 的序列化类型 key.serialize properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // 1. 创建 Kafka 生产者对象 // 需要指定键值的类型 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); // 2. 发送数据 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("like", "test" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null){ // 如果异常为空 说明正常执行 System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition()); } } }); // 确保数据发往不同的分区 Thread.sleep(2); } // 3. 关闭资源 kafkaProducer.close(); } }
可以看到我们共往0号分区发了 2 条消息:
观察消费者窗口:
可以看到,消费者只消费到了我们指定的分区数据。
3.3、消费者组案例
要实现消费者组很简单,我们直接复制上面 3.1 中独立消费者代码为 CustomConsumer1,让 CustomConsumer1 去消费分区1的数据,这样两个 main 方法同时执行就实现相当于两个消费者同时消费了。
我们继续使用上面带回调函数的生产者:
可以看到生产者往主题中发送了 5 条数据,我们观察消费者:
可以看到,消费者0 接收了 0 号分区,而消费者 1 接收了 1号和2号分区的数据。
4、生产经验-分区的分配以及再平衡
4.1、Range 以及再平衡
1)Range 分区策略原理
一个主题有多个分区,而一个消费者组有多个消费者,那么每个消费者消费哪一个分区呢?
目前,Kafka 有 4 种主流的分区分配策略:Range、RoundRobin、Sticky、CooperativeStick(Kafka 3.0 新特性)。可以通过配置参数 partition.assignment.strategy ,修改分区的分配策略。默认策略是 Range + CooperativeStick。Kafka 可以同时使用多个分配策略。
参数名称 |
描述 |
heartbeat.interval.ms |
Kafka消费者和coordinator之间的心跳时间,默认3s。 该条目的值必须小于 session.timeout.ms,也不应该高于 session.timeout.ms 的1/3。 |
session.timeout.ms |
Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms |
消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。 |
partition.assignment.strategy |
消费者分区分配策略,默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。可以选择的策略包括:Range、RoundRobin、Sticky、CooperativeSticky |
2)Range 分区分配策略案例
Range 是针对每个 topic 而言的。
- 首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
- 通过 partition数/consumer数 来决定每个消费者应该消费几个分区。如果除不尽,那么前几个消费者会多消费几个分区。
比如上面的 topic 一共 7 个分区,我们的消费者组有 3 个消费者,7/3=2,7%3=1,多 1 个分区没人处理,于是交给消费者0处理。
注意:这种方式容易造成数据倾斜!因为,如果我们有多个 topic 由这一个消费者组来消费,那么每个 topic 如果都把剩余的分区交给前面的消费者,那么我们前面的消费者和后面的消费者的压力差距就会特别大。所以,这种方式只适合于 topic 较少的情况。
1. 我们修改上面创建过的主题 first 的分区数为 7 。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 7
注意:分区数只能增加,不能减少。
2. 复制CustomConsumer类,创建CustomConsumer2。这样可以由三个消费者CustomConsumer、CustomConsumer1、CustomConsumer2组成消费者组,组名都为“test”,同时启动3个消费者。
3. 启动CustomProducer生产者,发送 7 条消息,发送到不同的分区。
public class CustomProducer { public static void main(String[] args) throws InterruptedException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); for (int i = 0; i < 7; i++) { kafkaProducer.send(new ProducerRecord<>("first", i, "test", "lyh")); } kafkaProducer.close(); } }
说明:Kafka默认的分区分配策略就是Range + CooperativeSticky,所以不需要修改策略。
观查3个消费者分别消费哪些分区的数据:
消费者0 消费了 0、1、2号分区的数据
消费者1 消费了 5、6 号数据
消费者 2 消费了 3、4号数据
3)Range 分区分配再平衡案例
1. 停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。
1号消费者:消费到3、4号分区数据。
2号消费者:消费到5、6号分区数据。
0号消费者的任务会整体被分配到1号消费者或者2号消费者。
说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。
2. 再次重新发送消息观看结果(45s以后)。
1号消费者:消费到0、1、2、3号分区数据。
2号消费者:消费到4、5、6号分区数据。
说明:消费者0已经被踢出消费者组,所以重新按照range方式分配。
Kafka(四)【Kafka 消费者】(3)https://developer.aliyun.com/article/1532346