CMAK
CMAK(原Kafka Manager) 是雅虎公司于 2015 年开源的一个 Kafka 监控框架。这个框架用 Scala 语言开发而成,主要用于管理和监控 Kafka 集群。github地址:https://github.com/yahoo/CMAK ,安装前提要求Java版本在11以上。
在其 Github 官网上下载 tar.gz 包之后,我们执行解压缩,可以得到CMAK目录。
之后,我们需要运行 sbt 工具来编译CMAK。sbt 是专门用于构建 Scala 项目的编译构建工具,类似于我们熟知的 Maven 和 Gradle。CMAK 自带了 sbt 命令,我们直接运行它构建项目就可以了:
./sbt clean dist
构建完成后,到target/universal 目录下找到生成的 zip 文件,把它解压,然后修改里面的 conf/application.conf 文件中的 kafka-manager.zkhosts 项,让它指向你环境中的 ZooKeeper 地址,比如:
cmak.zkhosts="kafka1:2181,kafka2:2181,kafka3:2181"
之后进入target/universal/cmak-3.0.0.5/bin目录,运行以下命令启动 CMAK:
bin/cmak
然后访问http://ip:9000就可以访问CMAK了。
点击Add Cluster,填写Zookeeper的地址;选择Kafka版本;勾选上 Enable JMX Polling,这样你才能监控 Kafka 的各种 JMX 指标,其余参数可以保持默认。从这张图中,我们可以发现,CMAK 清晰地列出了当前监控的 Kafka 集群的主题数量、Broker 数量等信息。你可以点击顶部菜单栏的各个条目去查看或者设置具体功能。
JMX
JMX的全称为Java Management Extensions。顾名思义,是管理Java的一种扩展。这种机制可以方便的管理、监控正在运行中的Java程序。常用于管理线程,内存,日志Level,服务重启,系统环境等。
Kafka系统默认是没有开启JMX端口的,设置该端口在$KAFKA_HOME/bin/kafka-server-start.sh脚本中,设置内容如下:
#vim bin/kafka-server-start.sh: if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70" # 添加下面这行,这里的端口不一定非要设置成9999,端口只要可用,均可。 export JMX_PORT="9999" fi
JConsole监控kafka
通过Jconsole连接:
Java API监控kafka
Kafka Mbean定义参考:http://kafka.apache.org/documentation.html#monitoring
package monitor; import javax.management.MBeanServerConnection; import javax.management.ObjectName; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; import java.io.IOException; public class kafkaMonitor { //获取的Kafka指标数据 private static final String MESSAGE_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec"; private static final String BYTES_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec"; private static final String BYTES_OUT_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec"; private static final String BYTES_REJECTED_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec"; private static final String FAILED_FETCH_REQUESTS_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec"; private static final String FAILED_PRODUCE_REQUESTS_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec"; private static final String PRODUCE_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=8"; //JXM连接信息 private static final String JXM_URL = "service:jmx:rmi:///jndi/rmi://11.8.36.125:9999/jmxrmi"; public void extractMonitorData() { try { MBeanServerConnection jmxConnection = this.getMBeanServerConnection(JXM_URL); ObjectName messageCountObj = new ObjectName(MESSAGE_IN_PER_SEC); ObjectName bytesInPerSecObj = new ObjectName(BYTES_IN_PER_SEC); ObjectName bytesOutPerSecObj = new ObjectName(BYTES_OUT_PER_SEC); ObjectName bytesRejectedPerSecObj = new ObjectName(BYTES_REJECTED_PER_SEC); ObjectName failedFetchRequestsPerSecObj = new ObjectName(FAILED_FETCH_REQUESTS_PER_SEC); ObjectName failedProduceRequestsPerSecObj = new ObjectName(FAILED_PRODUCE_REQUESTS_PER_SEC); ObjectName produceRequestPerSecObj = new ObjectName(PRODUCE_REQUEST_PER_SEC); printObjectNameDetails(messageCountObj, "Messages in /sec", jmxConnection); printObjectNameDetails(bytesInPerSecObj, "Bytes in /sec", jmxConnection); printObjectNameDetails(bytesOutPerSecObj, "Bytes out /sec", jmxConnection); printObjectNameDetails(bytesRejectedPerSecObj, "Bytes rejected /sec", jmxConnection); printObjectNameDetails(failedFetchRequestsPerSecObj, "Failed fetch request /sec", jmxConnection); printObjectNameDetails(failedProduceRequestsPerSecObj, "Failed produce request /sec", jmxConnection); printObjectNameDetails(produceRequestPerSecObj, "Produce request in /sec", jmxConnection); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { new kafkaMonitor().extractMonitorData(); } /** * 获得 MBeanServer 的连接 * * @param jmxUrl * @return * @throws IOException */ private MBeanServerConnection getMBeanServerConnection(String jmxUrl) throws IOException { JMXServiceURL url = new JMXServiceURL(jmxUrl); JMXConnector jmxc = JMXConnectorFactory.connect(url, null); MBeanServerConnection mbsc = jmxc.getMBeanServerConnection(); return mbsc; } /** * 打印 ObjectName 对象详细信息 * @param objectName * @param printTitle * @param jmxConnection */ private void printObjectNameDetails(ObjectName objectName, String printTitle, MBeanServerConnection jmxConnection) { try { System.out.println("----------"+ printTitle +"----------"); System.out.println("TotalCount: " + (Long) jmxConnection.getAttribute(objectName, "Count")); System.out.println("MeanRate: " + String.format("%.2f", (double) jmxConnection.getAttribute(objectName, "MeanRate"))); System.out.println("OneMinuteRate: " + String.format("%.2f", (double) jmxConnection.getAttribute(objectName, "OneMinuteRate"))); System.out.println("FiveMinuteRate: " + String.format("%.2f", (double) jmxConnection.getAttribute(objectName, "FiveMinuteRate"))); System.out.println("FifteenMinuteRate: " + String.format("%.2f", (double) jmxConnection.getAttribute(objectName, "FifteenMinuteRate"))); } catch (Exception e) { e.printStackTrace(); } } }
运行程序,可以看到打印如下信息:分别是消息接收情况统计,bytes接收统计,bytes输出统计,bytes拒绝统计,失败拉取请求统计,失败生产消息统计,生产消息统计。
每一项统计均包括总量统计,平均速率,最近一分钟速率,最近5分钟速率,最近15分钟速率。
----------Messages in /sec---------- TotalCount: 1701 MeanRate: 0.00 OneMinuteRate: 0.16 FiveMinuteRate: 0.05 FifteenMinuteRate: 0.02 ----------Bytes in /sec---------- TotalCount: 176547 MeanRate: 0.07 OneMinuteRate: 12.90 FiveMinuteRate: 3.98 FifteenMinuteRate: 1.43 ----------Bytes out /sec---------- TotalCount: 171300 MeanRate: 0.06 OneMinuteRate: 0.00 FiveMinuteRate: 0.00 FifteenMinuteRate: 0.00 ----------Bytes rejected /sec---------- TotalCount: 0 MeanRate: 0.00 OneMinuteRate: 0.00 FiveMinuteRate: 0.00 FifteenMinuteRate: 0.00 ----------Failed fetch request /sec---------- TotalCount: 0 MeanRate: 0.00 OneMinuteRate: 0.00 FiveMinuteRate: 0.00 FifteenMinuteRate: 0.00 ----------Failed produce request /sec---------- TotalCount: 0 MeanRate: 0.00 OneMinuteRate: 0.00 FiveMinuteRate: 0.00 FifteenMinuteRate: 0.00 ----------Produce request in /sec---------- TotalCount: 1 MeanRate: 0.03 OneMinuteRate: 0.12 FiveMinuteRate: 0.18 FifteenMinuteRate: 0.19
JMX_Exporter + Prometheus + Grafana
JMX_Exporter 通过HTTP的方式暴露 metrics 数据, Prometheus 主动抓取 metrics 数据,Grafana对接Promethues的数据进行展示。
confluent公司提供了一个demo示例,通过docker-compose快速搭建JMX_Exporter + Prometheus + Grafana监控框架,github地址:https://github.com/confluentinc/jmx-monitoring-stacks
Jolokia + Elasticsearch + Kibana
Jolokia也是通过JMX的方式来获取Kafka运行状态指标,通过Elasticsearch做数据的存储,搜索,Kibana做图表的展示。
Confluent Control Center
Confluent 公司发布的 Control Center, 这是目前已知的最强大的Kafka 监控框架。2014 年,Kafka 的 3 个创始人 Jay Kreps、Naha Narkhede 和饶军离开 LinkedIn 创办了 Confluent 公司,专注于提供基于 Kafka 的企业级流处理解决方案。
Control Center 不但能够实时地监控 Kafka 集群,而且还能够帮助你操作和搭建基于 Kafka 的实时流处理应用。更棒的是,Control Center 提供了统一式的主题管理功能。你可以在这里享受到 Kafka 主题和 Schema 的一站式管理服务。
Kafka Eagle
Kafka Eagle是由国人维护的,目前还在积极地演进着。根据 Kafka Eagle 官网的描述,除了提供常规的监控功能之外,还开放了告警功能(Alert),非常值得一试。