实现
package com.artisan.bootkafka.controller; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.*; public class TopicBacklog { public static int getTotalBacklog(String topic) { // Kafka客户端配置 Properties props = new Properties(); props.put("bootstrap.servers", "ip:port"); props.put("group.id", "attack-consumer"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); // 创建KafkaConsumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅要查询的主题 List<PartitionInfo> partitions = consumer.partitionsFor(topic); List<TopicPartition> topicPartitions = new ArrayList<>(); for (PartitionInfo partition : partitions) { topicPartitions.add(new TopicPartition(partition.topic(), partition.partition())); } // 手动分配分区 consumer.assign(topicPartitions); // 记录未消费消息总数 int totalBacklog = 0; // 遍历每个分区获取其未消费消息数并累加 for (PartitionInfo partition : partitions) { TopicPartition tp = new TopicPartition(partition.topic(), partition.partition()); // 获取消费者的当前偏移量 long latestOffset = consumer.position(tp); long endOffset = consumer.endOffsets(Collections.singleton(tp)).get(tp); int backlog = Math.toIntExact(endOffset - latestOffset); totalBacklog += backlog; } // 返回未消费消息总数 return totalBacklog; } public static Map<String, Integer> getAllTopicsBacklog() { // Kafka客户端配置 Properties props = new Properties(); props.put("bootstrap.servers", "ip:port"); props.put("group.id", "attack-consumer"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 获取所有主题列表 Map<String, List<PartitionInfo>> topicMap = consumer.listTopics(); // 记录每个主题未消费消息总数 Map<String, Integer> backlogMap = new HashMap<>(); // 遍历每个主题,计算其未消费消息数 for (String topic : topicMap.keySet()) { // 订阅要查询的主题 List<PartitionInfo> partitions = consumer.partitionsFor(topic); List<TopicPartition> topicPartitions = new ArrayList<>(); for (PartitionInfo partition : partitions) { topicPartitions.add(new TopicPartition(partition.topic(), partition.partition())); } // 手动分配分区 consumer.assign(topicPartitions); int backlog = 0; for (PartitionInfo partition : partitions) { TopicPartition tp = new TopicPartition(partition.topic(), partition.partition()); long latestOffset = consumer.position(tp); long endOffset = consumer.endOffsets(Collections.singleton(tp)).get(tp); backlog += Math.toIntExact(endOffset - latestOffset); } backlogMap.put(topic, backlog); } // 返回每个主题未消费消息总数 return backlogMap; } public static void main(String[] args) { int backlog = getTotalBacklog("topic-test"); System.out.println(backlog); getAllTopicsBacklog().forEach((topic, backlogCount) -> System.out.println(topic + " - " + backlogCount)); } }
核对一下,23 。
有2个方法,第二个方法 Map<String, Integer> getAllTopicsBacklog()
虽然会返回所有的Topic 的积压量,但只有 对应的 消费组的数据是准确的。