开发者社区> 浦涛> 正文

使用JMX监控Kafka

简介: JMX监控Kafka
+关注继续查看
http://blog.csdn.net/eric_sunah/article/details/44980385?utm_source=tuicool

 

使用JMX监控Kafka

标签: KafkaJMX监控
 2952人阅读 评论(3) 收藏 举报
category_icon.jpg 分类:
Kafka(8) arrow_triangle%20_down.jpg

目录(?)[+]

Kafka可以配置使用JMX进行运行状态的监控,既可以通过JDK自带Jconsole来观察结果,也可以通过Java API的方式来.
关于监控指标的描述,可以参考:http://kafka.apache.org/documentation.html#monitoring

开启JMX端口

修改bin/kafka-server-start.sh,添加JMX_PORT参数,添加后样子如下
[html] view plain copy
 print?
  1. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then  
  2.     export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"  
  3.     export JMX_PORT="9999"  
  4. fi  


通过Jconsole测试时候可以连接





通过JavaAPI来访问


通过以下方法获取目标值
[java] view plain copy
 print?在CODE上查看代码片派生到我的代码片
  1. public class KafkaDataProvider{  
  2.     protected final Logger LOGGER = LoggerFactory.getLogger(getClass());  
  3.     private static final String MESSAGE_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec";  
  4.     private static final String BYTES_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec";  
  5.     private static final String BYTES_OUT_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec";  
  6.     private static final String PRODUCE_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce";  
  7.     private static final String CONSUMER_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer";  
  8.     private static final String FLOWER_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower";  
  9.     private static final String ACTIVE_CONTROLLER_COUNT = "kafka.controller:type=KafkaController,name=ActiveControllerCount";  
  10.     private static final String PART_COUNT = "kafka.server:type=ReplicaManager,name=PartitionCount";  
  11.     public String extractMonitorData() {  
  12.         //TODO 通过调用API获得IP以及参数  
  13.         KafkaRoleInfo monitorDataPoint = new KafkaRoleInfo();  
  14.         String jmxURL = "service:jmx:rmi:///jndi/rmi://192.168.40.242:9999/jmxrmi";  
  15.         try {  
  16.             MBeanServerConnection jmxConnection = MetricDataUtils.getMBeanServerConnection(jmxURL);  
  17.             ObjectName messageCountObj = new ObjectName(MESSAGE_IN_PER_SEC);  
  18.             ObjectName bytesInPerSecObj = new ObjectName(BYTES_IN_PER_SEC);  
  19.             ObjectName bytesOutPerSecObj = new ObjectName(BYTES_OUT_PER_SEC);  
  20.             ObjectName produceRequestsPerSecObj = new ObjectName(PRODUCE_REQUEST_PER_SEC);  
  21.             ObjectName consumerRequestsPerSecObj = new ObjectName(CONSUMER_REQUEST_PER_SEC);  
  22.             ObjectName flowerRequestsPerSecObj = new ObjectName(FLOWER_REQUEST_PER_SEC);  
  23.             ObjectName activeControllerCountObj = new ObjectName(ACTIVE_CONTROLLER_COUNT);  
  24.             ObjectName partCountObj = new ObjectName(PART_COUNT);  
  25.             Long messagesInPerSec = (Long) jmxConnection.getAttribute(messageCountObj, "Count");  
  26.             Long bytesInPerSec = (Long) jmxConnection.getAttribute(bytesInPerSecObj, "Count");  
  27.             Long bytesOutPerSec = (Long) jmxConnection.getAttribute(bytesOutPerSecObj, "Count");  
  28.             Long produceRequestCountPerSec = (Long) jmxConnection.getAttribute(produceRequestsPerSecObj, "Count");  
  29.             Long consumerRequestCountPerSec = (Long) jmxConnection.getAttribute(consumerRequestsPerSecObj, "Count");  
  30.             Long flowerRequestCountPerSec = (Long) jmxConnection.getAttribute(flowerRequestsPerSecObj, "Count");  
  31.             Integer activeControllerCount = (Integer) jmxConnection.getAttribute(activeControllerCountObj, "Value");  
  32.             Integer partCount = (Integer) jmxConnection.getAttribute(partCountObj, "Value");  
  33.             monitorDataPoint.setMessagesInPerSec(messagesInPerSec);  
  34.             monitorDataPoint.setBytesInPerSec(bytesInPerSec);  
  35.             monitorDataPoint.setBytesOutPerSec(bytesOutPerSec);  
  36.             monitorDataPoint.setProduceRequestCountPerSec(produceRequestCountPerSec);  
  37.             monitorDataPoint.setConsumerRequestCountPerSec(consumerRequestCountPerSec);  
  38.             monitorDataPoint.setFlowerRequestCountPerSec(flowerRequestCountPerSec);  
  39.             monitorDataPoint.setActiveControllerCount(activeControllerCount);  
  40.             monitorDataPoint.setPartCount(partCount);  
  41.         } catch (IOException e) {  
  42.             e.printStackTrace();  
  43.         } catch (MalformedObjectNameException e) {  
  44.             e.printStackTrace();  
  45.         } catch (AttributeNotFoundException e) {  
  46.             e.printStackTrace();  
  47.         } catch (MBeanException e) {  
  48.             e.printStackTrace();  
  49.         } catch (ReflectionException e) {  
  50.             e.printStackTrace();  
  51.         } catch (InstanceNotFoundException e) {  
  52.             e.printStackTrace();  
  53.         }  
  54.         return monitorDataPoint.toString();  
  55.     }  
  56.     public static void main(String[] args) {  
  57.         System.out.println(new KafkaDataProvider().extractMonitorData());  
  58.     }  
  59.     /** 
  60.      * 获得MBeanServer 的连接 
  61.      * 
  62.      * @param jmxUrl 
  63.      * @return 
  64.      * @throws IOException 
  65.      */  
  66.     public MBeanServerConnection getMBeanServerConnection(String jmxUrl) throws IOException {  
  67.         JMXServiceURL url = new JMXServiceURL(jmxUrl);  
  68.         JMXConnector jmxc = JMXConnectorFactory.connect(url, null);  
  69.         MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();  
  70.         return mbsc;  
  71.     }  
  72. }  

其他工具

除了自己编写定制化的监控程序外


kafka-web-console

https://github.com/claudemamo/kafka-web-console
部署sbt:
http://www.scala-sbt.org/0.13/tutorial/Manual-Installation.html
http://www.scala-sbt.org/release/tutorial/zh-cn/Installing-sbt-on-Linux.html

KafkaOffsetMonitor

https://github.com/quantifind/KafkaOffsetMonitor/releases/tag/v0.2.0
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk localhost:12181 --port 8080 --refresh 5.minutes --retain 1.day

Mx4jLoader

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
K8S的Kafka监控(Prometheus+Grafana)
本文实战操作如何在K8S环境对kafka做监控(Prometheus+Grafana)
185 0
问题盘点|使用 Prometheus 监控 Kafka,我们该关注哪些指标
Kafka 作为当前广泛使用的中间件产品,承担了重要/核心业务数据流转,其稳定运行关乎整个业务系统可用性。本文旨在分享阿里云 Prometheus 在阿里云 Kafka 和自建 Kafka 的监控实践。
310 0
Kafka监控与指标之UnderReplicatedPartitions
Kafka监控 Kafka 使用 Yammer Metrics 在服务器中报告指标,Java 客户端使用 Kafka Metrics,这是一个内置的指标注册表. 两者都通过 JMX 公开指标 启用JMX并上报指标 Kafka 默认禁用远程 JMX,Kafka启动JMX方式
221 0
Kafka监控数据采集
上篇文章讲解了运维平台的整体设计,对各个部分并未深入介绍,今天将比较重要的一环——监控数据来源进行讲解。
241 0
Sentry 监控 - Snuba 数据中台架构简介(Kafka+Clickhouse)
Sentry 监控 - Snuba 数据中台架构简介(Kafka+Clickhouse)
384 0
Kafka监控必备——Kafka-Eagle 2.0.2正式发布
对于经常使用Kafka的同学,拥有一个炫酷又实用的监控系统是非常有必要的。可以实时的监控数据流的情况,了解实时数据流的变化。
203 0
最佳实践|从Producer 到 Consumer,如何有效监控 Kafka
对于运维人而言,如何安装维护一套监控系统,或如何进行技术选型,从来不是工作重点。如何借助工具对所需的应用、组件进行监控,发现并解决问题才是重中之重。随着 Prometheus 逐渐成为云原生时代可观测标准,为了帮助更多运维人用好 Prometheus,阿里云云原生团队将定期更新 Prometheus 最佳实践系列。第一期我们讲解了《最佳实践|Spring Boot 应用如何接入 Prometheus 监控》,今天将为大家带来,消息队列产品 Kafka 的监控最佳实践。
457 0
Kafka监控框架介绍
Kafka监控框架介绍
289 0
CentOS6.9安装Filebeat监控Nginx的访问日志发送到Kafka
一、下载地址: 官方:https://www.elastic.co/cn/downloads/beats/filebeat 百度云盘:https://pan.baidu.com/s/1dvhqb0   二、安装 tar zxvf filebeat-6.
2674 0
Kafka监控架构设计
目前的Kafka监控产品有很多,比如Kafka Manager、 Kafka Monitor、KafkaOffsetMonitor、Kafka Web Console、Burrow等,都有各自的优缺点,就个人而言用的最多的还是Kafka Manager,不过这些并不是十分的完美。
2811 0
+关注
浦涛
vip java高级开发工程师
文章
问答
文章排行榜
最热
最新
相关电子书
更多
消息队列 Kafka 版差异化特性
立即下载
2019大数据技术公开课第五季—kafka 数据如何同步到 MaxCompute
立即下载
任庆盛|Flink CDC + Kafka 加速业务实时化
立即下载