我正在尝试检查接收大量数据的主题中是否缺少键。由于这项工作是按需运行的,因此它需要一些标准来知道何时搜索了它关心的所有记录。我们确定这将是作业启动时每个分区的最新偏移量。
我的问题首先是如何获取该主题的所有分区信息,而又不实际使用它(我需要使用此信息为每个分区创建单独的使用者,以跟踪其偏移量与最大偏移量)。
其次,当消费者看到达到最大偏移量后如何停止它。
编辑:我发现了一种获取分区的方法,该方法是将单个使用者订阅到该主题,进行虚拟poll,然后使用partitionsFor(...)。不知道这是否是“推荐”的方式。
您可以使用consumer.partitionsFor和consumer.endOffsets来获取分区和上一个偏移量
分区
/*Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it does not already have any metadata about the given topic.*/
public java.util.List<PartitionInfo> partitionsFor(java.lang.String topic)
endOffsets
/*Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1*/
public java.util.Map<TopicPartition,java.lang.Long> endOffsets(java.util.Collection<TopicPartition> partitions)
。
以下是示例代码
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerid");
Consumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProperties);
List<PartitionInfo> parts = consumer.partitionsFor(topic);
consumer.assign(partitions);
Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
for (TopicPartition tp : offsets.keySet()) {
OffsetAndMetadata commitOffset = consumer.committed(new
TopicPartition(tp.topic(), tp.partition()));
//Consumer offset for partition tp
long offset=offsets.get(tp);
//Consumed committed offset
long consumedOffset=commitOffset.offset();
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。