Apache Kafka-通过API获取主题所有分区的积压消息数量

简介: Apache Kafka-通过API获取主题所有分区的积压消息数量

20191116123525638.png

实现

package com.artisan.bootkafka.controller;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
public class TopicBacklog {
    public static int getTotalBacklog(String topic) {
        // Kafka客户端配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "ip:port");
        props.put("group.id", "attack-consumer");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        // 创建KafkaConsumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 订阅要查询的主题
        List<PartitionInfo> partitions = consumer.partitionsFor(topic);
        List<TopicPartition> topicPartitions = new ArrayList<>();
        for (PartitionInfo partition : partitions) {
            topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
        }
        // 手动分配分区
        consumer.assign(topicPartitions);
        // 记录未消费消息总数
        int totalBacklog = 0;
        // 遍历每个分区获取其未消费消息数并累加
        for (PartitionInfo partition : partitions) {
            TopicPartition tp = new TopicPartition(partition.topic(), partition.partition());
            // 获取消费者的当前偏移量
            long latestOffset = consumer.position(tp);
            long endOffset = consumer.endOffsets(Collections.singleton(tp)).get(tp);
            int backlog = Math.toIntExact(endOffset - latestOffset);
            totalBacklog += backlog;
        }
        // 返回未消费消息总数
        return totalBacklog;
    }
    public static Map<String, Integer> getAllTopicsBacklog() {
        // Kafka客户端配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "ip:port");
        props.put("group.id", "attack-consumer");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 获取所有主题列表
        Map<String, List<PartitionInfo>> topicMap = consumer.listTopics();
        // 记录每个主题未消费消息总数
        Map<String, Integer> backlogMap = new HashMap<>();
        // 遍历每个主题,计算其未消费消息数
        for (String topic : topicMap.keySet()) {
            // 订阅要查询的主题
            List<PartitionInfo> partitions = consumer.partitionsFor(topic);
            List<TopicPartition> topicPartitions = new ArrayList<>();
            for (PartitionInfo partition : partitions) {
                topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
            }
            // 手动分配分区
            consumer.assign(topicPartitions);
            int backlog = 0;
            for (PartitionInfo partition : partitions) {
                TopicPartition tp = new TopicPartition(partition.topic(), partition.partition());
                long latestOffset = consumer.position(tp);
                long endOffset = consumer.endOffsets(Collections.singleton(tp)).get(tp);
                backlog += Math.toIntExact(endOffset - latestOffset);
            }
            backlogMap.put(topic, backlog);
        }
        // 返回每个主题未消费消息总数
        return backlogMap;
    }
    public static void main(String[] args) {
        int backlog = getTotalBacklog("topic-test");
        System.out.println(backlog);
        getAllTopicsBacklog().forEach((topic, backlogCount) -> System.out.println(topic + " - " + backlogCount));
    }
}

4d08fb7fffb243838f66eb739a37c8c4.png

ceb4522816634ff88d317d181e9bd9bc.png


核对一下,23 。


有2个方法,第二个方法 Map<String, Integer> getAllTopicsBacklog() 虽然会返回所有的Topic 的积压量,但只有 对应的 消费组的数据是准确的。


相关文章
|
4月前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
225 7
|
4月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
171 5
|
4月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
163 4
|
4月前
|
消息中间件 监控 大数据
优化Apache Kafka性能:最佳实践与调优策略
【10月更文挑战第24天】作为一名已经对Apache Kafka有所了解并有实际使用经验的开发者,我深知在大数据处理和实时数据流传输中,Kafka的重要性不言而喻。然而,在面对日益增长的数据量和业务需求时,如何保证系统的高性能和稳定性成为了摆在我们面前的一个挑战。本文将从我的个人视角出发,分享一些关于如何通过合理的配置和调优来提高Kafka性能的经验和建议。
141 4
|
4月前
|
监控 负载均衡 API
Apache Apisix轻松打造亿级流量Api网关
Apache APISIX 是一个动态、实时、高性能的 API 网关,提供负载均衡、动态上行、灰度发布、熔断、鉴权、可观测等丰富的流量管理功能。适用于处理传统南北向流量、服务间东西向流量及 k8s 入口控制。Airflow 是一个可编程、调度和监控的工作流平台,基于有向无环图 (DAG) 定义和执行任务,提供丰富的命令行工具和 Web 管理界面,方便系统运维和管理。
Apache Apisix轻松打造亿级流量Api网关
|
4月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
143 5
|
4月前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
149 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
4月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
102 1
|
4月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
143 2
|
4月前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka

推荐镜像

更多