Apache Kafka-通过API获取主题所有分区的积压消息数量

简介: Apache Kafka-通过API获取主题所有分区的积压消息数量

20191116123525638.png

实现

package com.artisan.bootkafka.controller;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
public class TopicBacklog {
    public static int getTotalBacklog(String topic) {
        // Kafka客户端配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "ip:port");
        props.put("group.id", "attack-consumer");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        // 创建KafkaConsumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 订阅要查询的主题
        List<PartitionInfo> partitions = consumer.partitionsFor(topic);
        List<TopicPartition> topicPartitions = new ArrayList<>();
        for (PartitionInfo partition : partitions) {
            topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
        }
        // 手动分配分区
        consumer.assign(topicPartitions);
        // 记录未消费消息总数
        int totalBacklog = 0;
        // 遍历每个分区获取其未消费消息数并累加
        for (PartitionInfo partition : partitions) {
            TopicPartition tp = new TopicPartition(partition.topic(), partition.partition());
            // 获取消费者的当前偏移量
            long latestOffset = consumer.position(tp);
            long endOffset = consumer.endOffsets(Collections.singleton(tp)).get(tp);
            int backlog = Math.toIntExact(endOffset - latestOffset);
            totalBacklog += backlog;
        }
        // 返回未消费消息总数
        return totalBacklog;
    }
    public static Map<String, Integer> getAllTopicsBacklog() {
        // Kafka客户端配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "ip:port");
        props.put("group.id", "attack-consumer");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 获取所有主题列表
        Map<String, List<PartitionInfo>> topicMap = consumer.listTopics();
        // 记录每个主题未消费消息总数
        Map<String, Integer> backlogMap = new HashMap<>();
        // 遍历每个主题,计算其未消费消息数
        for (String topic : topicMap.keySet()) {
            // 订阅要查询的主题
            List<PartitionInfo> partitions = consumer.partitionsFor(topic);
            List<TopicPartition> topicPartitions = new ArrayList<>();
            for (PartitionInfo partition : partitions) {
                topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
            }
            // 手动分配分区
            consumer.assign(topicPartitions);
            int backlog = 0;
            for (PartitionInfo partition : partitions) {
                TopicPartition tp = new TopicPartition(partition.topic(), partition.partition());
                long latestOffset = consumer.position(tp);
                long endOffset = consumer.endOffsets(Collections.singleton(tp)).get(tp);
                backlog += Math.toIntExact(endOffset - latestOffset);
            }
            backlogMap.put(topic, backlog);
        }
        // 返回每个主题未消费消息总数
        return backlogMap;
    }
    public static void main(String[] args) {
        int backlog = getTotalBacklog("topic-test");
        System.out.println(backlog);
        getAllTopicsBacklog().forEach((topic, backlogCount) -> System.out.println(topic + " - " + backlogCount));
    }
}

4d08fb7fffb243838f66eb739a37c8c4.png

ceb4522816634ff88d317d181e9bd9bc.png


核对一下,23 。


有2个方法,第二个方法 Map<String, Integer> getAllTopicsBacklog() 虽然会返回所有的Topic 的积压量,但只有 对应的 消费组的数据是准确的。


相关文章
|
5月前
|
消息中间件 分布式计算 算法
大数据-63 Kafka 高级特性 分区 副本机制 宕机恢复 Leader选举
大数据-63 Kafka 高级特性 分区 副本机制 宕机恢复 Leader选举
83 5
大数据-63 Kafka 高级特性 分区 副本机制 宕机恢复 Leader选举
|
5月前
|
消息中间件 SQL 分布式计算
大数据-64 Kafka 高级特性 分区Partition 分区重新分配 实机实测重分配
大数据-64 Kafka 高级特性 分区Partition 分区重新分配 实机实测重分配
214 7
|
4月前
|
消息中间件 负载均衡 Kafka
【赵渝强老师】Kafka的主题与分区
Kafka 中的消息按主题分类,生产者发送消息到特定主题,消费者订阅主题消费。主题可分多个分区,每个分区仅属一个主题。消息追加到分区时,Broker 分配唯一偏移量地址,确保消息在分区内的顺序性。Kafka 保证分区有序而非主题有序。示例中,Topic A 有 3 个分区,分区可分布于不同 Broker 上,支持负载均衡和容错。视频讲解及图示详见原文。
104 2
|
4月前
|
消息中间件 监控 负载均衡
在Kafka中,如何进行主题的分区和复制?
在Kafka中,如何进行主题的分区和复制?
|
5月前
|
消息中间件 监控 负载均衡
在Kafka中,如何进行主题的分区和复制?
在Kafka中,如何进行主题的分区和复制?
|
4月前
|
消息中间件 Kafka
【赵渝强老师】Kafka分区的副本机制
在Kafka中,每个主题可有多个分区,每个分区有多个副本。其中仅有一个副本为Leader,负责对外服务,其余为Follower。当Leader所在Broker宕机时,Follower可被选为新的Leader,实现高可用。文中附有示意图及视频讲解。
157 0
|
5月前
|
消息中间件 JSON 大数据
大数据-65 Kafka 高级特性 分区 Broker自动再平衡 ISR 副本 宕机恢复再重平衡 实测
大数据-65 Kafka 高级特性 分区 Broker自动再平衡 ISR 副本 宕机恢复再重平衡 实测
136 4
|
5月前
|
消息中间件 分布式计算 算法
大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
92 3
|
5月前
|
消息中间件 JSON 大数据
大数据-66 Kafka 高级特性 分区Partition 副本因子Replication Factor replicas动态修改 线上动态修改副本数
大数据-66 Kafka 高级特性 分区Partition 副本因子Replication Factor replicas动态修改 线上动态修改副本数
161 1
|
5月前
|
存储 分布式计算 大数据
大数据-145 Apache Kudu 架构解读 Master Table 分区 读写
大数据-145 Apache Kudu 架构解读 Master Table 分区 读写
75 0

推荐镜像

更多