Kafka监控框架介绍

简介: Kafka监控框架介绍

CMAK

CMAK(原Kafka Manager) 是雅虎公司于 2015 年开源的一个 Kafka 监控框架。这个框架用 Scala 语言开发而成,主要用于管理和监控 Kafka 集群。github地址:https://github.com/yahoo/CMAK ,安装前提要求Java版本在11以上。

在其 Github 官网上下载 tar.gz 包之后,我们执行解压缩,可以得到CMAK目录。

之后,我们需要运行 sbt 工具来编译CMAK。sbt 是专门用于构建 Scala 项目的编译构建工具,类似于我们熟知的 Maven 和 Gradle。CMAK 自带了 sbt 命令,我们直接运行它构建项目就可以了:

./sbt clean dist

构建完成后,到target/universal 目录下找到生成的 zip 文件,把它解压,然后修改里面的 conf/application.conf 文件中的 kafka-manager.zkhosts 项,让它指向你环境中的 ZooKeeper 地址,比如:

cmak.zkhosts="kafka1:2181,kafka2:2181,kafka3:2181"

之后进入target/universal/cmak-3.0.0.5/bin目录,运行以下命令启动 CMAK:

bin/cmak

然后访问http://ip:9000就可以访问CMAK了。

点击Add Cluster,填写Zookeeper的地址;选择Kafka版本;勾选上 Enable JMX Polling,这样你才能监控 Kafka 的各种 JMX 指标,其余参数可以保持默认。image.png从这张图中,我们可以发现,CMAK 清晰地列出了当前监控的 Kafka 集群的主题数量、Broker 数量等信息。你可以点击顶部菜单栏的各个条目去查看或者设置具体功能。image.pngimage.png

JMX

JMX的全称为Java Management Extensions。顾名思义,是管理Java的一种扩展。这种机制可以方便的管理、监控正在运行中的Java程序。常用于管理线程,内存,日志Level,服务重启,系统环境等。

Kafka系统默认是没有开启JMX端口的,设置该端口在$KAFKA_HOME/bin/kafka-server-start.sh脚本中,设置内容如下:

#vim bin/kafka-server-start.sh:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    # 添加下面这行,这里的端口不一定非要设置成9999,端口只要可用,均可。
    export JMX_PORT="9999" 
fi

JConsole监控kafka

通过Jconsole连接:image.pngimage.pngimage.png

Java API监控kafka

Kafka Mbean定义参考:http://kafka.apache.org/documentation.html#monitoring

package monitor;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;
public class kafkaMonitor {
    //获取的Kafka指标数据
    private static final String MESSAGE_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec";
    private static final String BYTES_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec";
    private static final String BYTES_OUT_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec";
    private static final String BYTES_REJECTED_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec";
    private static final String FAILED_FETCH_REQUESTS_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec";
    private static final String FAILED_PRODUCE_REQUESTS_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec";
    private static final String PRODUCE_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=8";
    //JXM连接信息
    private static final String JXM_URL = "service:jmx:rmi:///jndi/rmi://11.8.36.125:9999/jmxrmi";
    public void extractMonitorData() {
        try {
            MBeanServerConnection jmxConnection = this.getMBeanServerConnection(JXM_URL);
            ObjectName messageCountObj = new ObjectName(MESSAGE_IN_PER_SEC);
            ObjectName bytesInPerSecObj = new ObjectName(BYTES_IN_PER_SEC);
            ObjectName bytesOutPerSecObj = new ObjectName(BYTES_OUT_PER_SEC);
            ObjectName bytesRejectedPerSecObj = new ObjectName(BYTES_REJECTED_PER_SEC);
            ObjectName failedFetchRequestsPerSecObj = new ObjectName(FAILED_FETCH_REQUESTS_PER_SEC);
            ObjectName failedProduceRequestsPerSecObj = new ObjectName(FAILED_PRODUCE_REQUESTS_PER_SEC);
            ObjectName produceRequestPerSecObj = new ObjectName(PRODUCE_REQUEST_PER_SEC);
            printObjectNameDetails(messageCountObj, "Messages in /sec", jmxConnection);
            printObjectNameDetails(bytesInPerSecObj, "Bytes in /sec", jmxConnection);
            printObjectNameDetails(bytesOutPerSecObj, "Bytes out /sec", jmxConnection);
            printObjectNameDetails(bytesRejectedPerSecObj, "Bytes rejected /sec", jmxConnection);
            printObjectNameDetails(failedFetchRequestsPerSecObj, "Failed fetch request /sec", jmxConnection);
            printObjectNameDetails(failedProduceRequestsPerSecObj, "Failed produce request /sec", jmxConnection);
            printObjectNameDetails(produceRequestPerSecObj, "Produce request in /sec", jmxConnection);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        new kafkaMonitor().extractMonitorData();
    }
    /**
     * 获得 MBeanServer 的连接
     *
     * @param jmxUrl
     * @return
     * @throws IOException
     */
    private MBeanServerConnection getMBeanServerConnection(String jmxUrl) throws IOException {
        JMXServiceURL url = new JMXServiceURL(jmxUrl);
        JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
        MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
        return mbsc;
    }
    /**
     * 打印 ObjectName 对象详细信息
     * @param objectName
     * @param printTitle
     * @param jmxConnection
     */
    private void printObjectNameDetails(ObjectName objectName, String printTitle, MBeanServerConnection jmxConnection) {
        try {
            System.out.println("----------"+ printTitle +"----------");
            System.out.println("TotalCount: " + (Long) jmxConnection.getAttribute(objectName, "Count"));
            System.out.println("MeanRate: " + String.format("%.2f", (double) jmxConnection.getAttribute(objectName, "MeanRate")));
            System.out.println("OneMinuteRate: " + String.format("%.2f", (double) jmxConnection.getAttribute(objectName, "OneMinuteRate")));
            System.out.println("FiveMinuteRate: " + String.format("%.2f", (double) jmxConnection.getAttribute(objectName, "FiveMinuteRate")));
            System.out.println("FifteenMinuteRate: " + String.format("%.2f", (double) jmxConnection.getAttribute(objectName, "FifteenMinuteRate")));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行程序,可以看到打印如下信息:分别是消息接收情况统计,bytes接收统计,bytes输出统计,bytes拒绝统计,失败拉取请求统计,失败生产消息统计,生产消息统计。

每一项统计均包括总量统计,平均速率,最近一分钟速率,最近5分钟速率,最近15分钟速率。

----------Messages in /sec----------
TotalCount: 1701
MeanRate: 0.00
OneMinuteRate: 0.16
FiveMinuteRate: 0.05
FifteenMinuteRate: 0.02
----------Bytes in /sec----------
TotalCount: 176547
MeanRate: 0.07
OneMinuteRate: 12.90
FiveMinuteRate: 3.98
FifteenMinuteRate: 1.43
----------Bytes out /sec----------
TotalCount: 171300
MeanRate: 0.06
OneMinuteRate: 0.00
FiveMinuteRate: 0.00
FifteenMinuteRate: 0.00
----------Bytes rejected /sec----------
TotalCount: 0
MeanRate: 0.00
OneMinuteRate: 0.00
FiveMinuteRate: 0.00
FifteenMinuteRate: 0.00
----------Failed fetch request /sec----------
TotalCount: 0
MeanRate: 0.00
OneMinuteRate: 0.00
FiveMinuteRate: 0.00
FifteenMinuteRate: 0.00
----------Failed produce request /sec----------
TotalCount: 0
MeanRate: 0.00
OneMinuteRate: 0.00
FiveMinuteRate: 0.00
FifteenMinuteRate: 0.00
----------Produce request in /sec----------
TotalCount: 1
MeanRate: 0.03
OneMinuteRate: 0.12
FiveMinuteRate: 0.18
FifteenMinuteRate: 0.19

JMX_Exporter + Prometheus + Grafana

JMX_Exporter 通过HTTP的方式暴露 metrics 数据, Prometheus 主动抓取 metrics 数据,Grafana对接Promethues的数据进行展示。

confluent公司提供了一个demo示例,通过docker-compose快速搭建JMX_Exporter + Prometheus + Grafana监控框架,github地址:https://github.com/confluentinc/jmx-monitoring-stacksimage.pngimage.png

Jolokia + Elasticsearch + Kibana

Jolokia也是通过JMX的方式来获取Kafka运行状态指标,通过Elasticsearch做数据的存储,搜索,Kibana做图表的展示。

image.pngimage.pngimage.png

Confluent Control Center

Confluent 公司发布的 Control Center, 这是目前已知的最强大的Kafka 监控框架。2014 年,Kafka 的 3 个创始人 Jay Kreps、Naha Narkhede 和饶军离开 LinkedIn 创办了 Confluent 公司,专注于提供基于 Kafka 的企业级流处理解决方案。

Control Center 不但能够实时地监控 Kafka 集群,而且还能够帮助你操作和搭建基于 Kafka 的实时流处理应用。更棒的是,Control Center 提供了统一式的主题管理功能。你可以在这里享受到 Kafka 主题和 Schema 的一站式管理服务。image.pngimage.pngimage.png

Kafka Eagle

Kafka Eagle是由国人维护的,目前还在积极地演进着。根据 Kafka Eagle 官网的描述,除了提供常规的监控功能之外,还开放了告警功能(Alert),非常值得一试。

github地址:https://github.com/smartloli/kafka-eagle/image.pngimage.png

目录
相关文章
|
消息中间件 监控 Kafka
Kafka集群监控系统Kafka Eagle部署与体验
Kafka集群监控系统Kafka Eagle部署与体验
567 0
Kafka集群监控系统Kafka Eagle部署与体验
|
2月前
|
消息中间件 监控 数据可视化
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
57 2
|
4月前
|
消息中间件 监控 Java
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
|
4月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
113 0
|
6月前
|
消息中间件 监控 Java
「布道师系列文章」宝兰德徐清康解析 Kafka 和 AutoMQ 的监控
本文由北京宝兰德公司解决方案总监徐清康撰写,探讨了Kafka和AutoMQ集群的监控。
244 2
「布道师系列文章」宝兰德徐清康解析 Kafka 和 AutoMQ 的监控
|
5月前
|
消息中间件 监控 Java
使用 JMX 监控 Kafka 集群性能指标
使用 JMX 监控 Kafka 集群性能指标
553 1
|
5月前
|
消息中间件 NoSQL Kafka
基于Kafka的nginx日志收集分析与监控平台(3)
基于Kafka的nginx日志收集分析与监控平台(3)
|
5月前
|
消息中间件 监控 Kafka
基于Kafka的nginx日志收集分析与监控平台(2)
基于Kafka的nginx日志收集分析与监控平台(2)
|
5月前
|
消息中间件 负载均衡 应用服务中间件
基于Kafka的nginx日志收集分析与监控平台(1)
基于Kafka的nginx日志收集分析与监控平台(1)
|
5月前
|
消息中间件 Java Kafka
Java中的流处理框架:Kafka Streams与Flink
Java中的流处理框架:Kafka Streams与Flink