开发者社区> 问答> 正文

如何获取每个分区的最新偏移量,然后仅消耗该偏移量?

我正在尝试检查接收大量数据的主题中是否缺少键。由于这项工作是按需运行的,因此它需要一些标准来知道何时搜索了它关心的所有记录。我们确定这将是作业启动时每个分区的最新偏移量。

我的问题首先是如何获取该主题的所有分区信息,而又不实际使用它(我需要使用此信息为每个分区创建单独的使用者,以跟踪其偏移量与最大偏移量)。

其次,当消费者看到达到最大偏移量后如何停止它。

编辑:我发现了一种获取分区的方法,该方法是将单个使用者订阅到该主题,进行虚拟poll,然后使用partitionsFor(...)。不知道这是否是“推荐”的方式。

展开
收起
垚tutu 2019-12-04 16:54:08 1224 0
1 条回答
写回答
取消 提交回答
  • #include

    您可以使用consumer.partitionsFor和consumer.endOffsets来获取分区和上一个偏移量

    分区

    /*Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it does  not already have any metadata about the given topic.*/ 
        public java.util.List<PartitionInfo> partitionsFor(java.lang.String topic)
    
    

    endOffsets

    /*Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1*/
    public java.util.Map<TopicPartition,java.lang.Long> endOffsets(java.util.Collection<TopicPartition> partitions)
    
    

    以下是示例代码

    Properties consumerProperties = new Properties();
    consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerid");
    Consumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProperties);    
    List<PartitionInfo> parts = consumer.partitionsFor(topic);
    consumer.assign(partitions);
    Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
    for (TopicPartition tp : offsets.keySet()) {
        OffsetAndMetadata commitOffset = consumer.committed(new 
        TopicPartition(tp.topic(), tp.partition()));
        //Consumer offset for partition tp
        long offset=offsets.get(tp);
        //Consumed committed offset
        long consumedOffset=commitOffset.offset();
    }
    
    2019-12-04 16:54:35
    赞同 展开评论 打赏
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载