Kafka(四)【Kafka 消费者】(3)https://developer.aliyun.com/article/1532346
5.4、指定 offset 消费
auto.offset.reset = earliest | latest | none 默认是latest。
当Kafka中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
(1)earliest:自动将偏移量重置为最早的偏移量,--from-beginning。
(2)latest(默认值):自动将偏移量重置为最新偏移量。
(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
(4)任意指定offset位移开始消费
public class CustomConsumerSeek { public static void main(String[] args) { // 1. 创建消费者配置对象 Properties properties = new Properties(); // 2. 给消费者配置对象添加参数 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); // 配置k,v序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 配置消费者组id properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test2"); // 创建消费者对象 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // 注册要消费的主题 consumer.subscribe(Arrays.asList("first")); // 指定消费位置 offset // 获取分区信息 需要时间 Set<TopicPartition> assignment = consumer.assignment(); // 保证分区分配方案制定完毕 while (assignment.size() == 0){ consumer.poll(Duration.ofSeconds(1)); assignment = consumer.assignment(); } for (TopicPartition partition : assignment) { // 指定从 offset=100 的位置开始消费 consumer.seek(partition,33); } // 拉取数据打印 while (true){ // 设置 1s 消费一批数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); // 打印消费到的数据 for (ConsumerRecord<String, String> record : records) { System.out.println(record); } } } }
5.5、指定时间消费
需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?
获取一天前的消息数据:
public class CustomConsumerSeekTime { public static void main(String[] args) { // 1. 创建消费者配置对象 Properties properties = new Properties(); // 2. 给消费者配置对象添加参数 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); // 配置k,v序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 配置消费者组id properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test2"); // 创建消费者对象 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // 注册要消费的主题 consumer.subscribe(Arrays.asList("first")); // 指定消费位置 offset // 获取分区信息 需要时间 Set<TopicPartition> assignment = consumer.assignment(); // 保证分区分配方案制定完毕 while (assignment.size() == 0){ consumer.poll(Duration.ofSeconds(1)); assignment = consumer.assignment(); } // 希望通过时间获得相应的 offset HashMap<TopicPartition, Long> map = new HashMap<>(); // 遍历每个分区添加到集合 for (TopicPartition topicPartition : assignment) { map.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000); } // 通过集合得到 map<分区,offset信息> Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(map); for (TopicPartition partition : assignment) { // 指定时间开始消费 // 把时间转为 offset OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(partition); consumer.seek(partition,offsetAndTimestamp.offset()); } // 拉取数据打印 while (true){ // 设置 1s 消费一批数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); // 打印消费到的数据 for (ConsumerRecord<String, String> record : records) { System.out.println(record); } } } }
5.6、漏消费和重复消费
重复消费:已经消费了数据,但是offset没提交。
漏消费:先提交offset后消费,有可能会造成数据的漏消费。
- 重复消费:在自动提交的时候,提交 offset (默认是5s提交一次最大 offset)和消费者是互不影响的,所以提交完 offset 同时,消费者已经又消费了一些大于当前 offset 的数据,所以如果在下一次提交 offset 之前如果消费者挂掉的话,那么这一部分已经被消费的数据由于没有提交 offset 就会被其它消费者重复消费。
- 漏消费:手动提交的时候,当消费者拿到这个数据的时刻就会提交 offset,但是如果数据在消费者这里还没有被处理就挂机了,那么这个数据就会被漏掉
6、生产经验-消费者事务
正因为有重复消费和漏消费,所以就引入了消费者事务。就像我们之前学 Flink 容错机制的时候讲的输出端一致性保证时用到的两阶段提交(2PC)我们写入 Kafka 的过程其实是一个两段式的提交处理完毕,得到结果写入 Kafka 是基于事物的“预提交”,等到检查点保存完毕才会提交事务,进行正式提交,如果中间出现故障,事故进行回滚,预提交就会被放弃,恢复状态之后也只能恢复所有已确认提交的操作。
这里的消费者事务需要下游消费者(比如 Spark、Flink、MySQL)也支持事务才能做到精确一次消费(比如 HBase 就不支持事务),其实我们上面说的 Flink Sink 连接 Kafka 为保证精确一次而提出的两阶段提交、还有 Flink 事务回滚checkpoint恢复,Kafka重置偏移量都是通过事务确保数据精准一次的例子。
7、生产经验-数据积压(消费者如何提高吞吐量)
- 如果说 Kafka 消费能力不足,则可以考虑增加 topic 的分区数量;并且同时提高消费者组的消费者数量,消费者数 = 分区数(二者缺一不可)
- 如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压
参数名称 |
描述 |
fetch.max.bytes |
默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。 |
max.poll.records |
一次poll拉取数据返回消息的最大条数,默认是500条 |
回忆之前生产者到 Kafka 提高吞吐量的策略:
- batch.size :内存队列中每个批次的大小,默认 16K
- linger.ms:等待时间,修改为 5-100ms
- compression.type:压缩 snappy
- RecordAccumulator:缓冲区大小
这里我们又学习了怎么提高 Kafka 到消费者的吞吐量,这两个应该配合起来使用。
总结
自此,Kafka 的第一遍学习基本上是完成了,之后开学的任务就是在课上把《Kafka 权威指南》看完理解记忆消化,Kafka 是十分重要的内容,需要不断学习加深理解。