使用JMX监控Kafka-阿里云开发者社区

开发者社区> 微服务> 正文

使用JMX监控Kafka

简介: JMX监控Kafka
http://blog.csdn.net/eric_sunah/article/details/44980385?utm_source=tuicool

 

使用JMX监控Kafka

标签: KafkaJMX监控
 2952人阅读 评论(3) 收藏 举报
category_icon.jpg 分类:
Kafka(8) arrow_triangle%20_down.jpg

目录(?)[+]

Kafka可以配置使用JMX进行运行状态的监控,既可以通过JDK自带Jconsole来观察结果,也可以通过Java API的方式来.
关于监控指标的描述,可以参考:http://kafka.apache.org/documentation.html#monitoring

开启JMX端口

修改bin/kafka-server-start.sh,添加JMX_PORT参数,添加后样子如下
[html] view plain copy
 print?
  1. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then  
  2.     export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"  
  3.     export JMX_PORT="9999"  
  4. fi  


通过Jconsole测试时候可以连接





通过JavaAPI来访问


通过以下方法获取目标值
[java] view plain copy
 print?在CODE上查看代码片派生到我的代码片
  1. public class KafkaDataProvider{  
  2.     protected final Logger LOGGER = LoggerFactory.getLogger(getClass());  
  3.     private static final String MESSAGE_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec";  
  4.     private static final String BYTES_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec";  
  5.     private static final String BYTES_OUT_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec";  
  6.     private static final String PRODUCE_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce";  
  7.     private static final String CONSUMER_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer";  
  8.     private static final String FLOWER_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower";  
  9.     private static final String ACTIVE_CONTROLLER_COUNT = "kafka.controller:type=KafkaController,name=ActiveControllerCount";  
  10.     private static final String PART_COUNT = "kafka.server:type=ReplicaManager,name=PartitionCount";  
  11.     public String extractMonitorData() {  
  12.         //TODO 通过调用API获得IP以及参数  
  13.         KafkaRoleInfo monitorDataPoint = new KafkaRoleInfo();  
  14.         String jmxURL = "service:jmx:rmi:///jndi/rmi://192.168.40.242:9999/jmxrmi";  
  15.         try {  
  16.             MBeanServerConnection jmxConnection = MetricDataUtils.getMBeanServerConnection(jmxURL);  
  17.             ObjectName messageCountObj = new ObjectName(MESSAGE_IN_PER_SEC);  
  18.             ObjectName bytesInPerSecObj = new ObjectName(BYTES_IN_PER_SEC);  
  19.             ObjectName bytesOutPerSecObj = new ObjectName(BYTES_OUT_PER_SEC);  
  20.             ObjectName produceRequestsPerSecObj = new ObjectName(PRODUCE_REQUEST_PER_SEC);  
  21.             ObjectName consumerRequestsPerSecObj = new ObjectName(CONSUMER_REQUEST_PER_SEC);  
  22.             ObjectName flowerRequestsPerSecObj = new ObjectName(FLOWER_REQUEST_PER_SEC);  
  23.             ObjectName activeControllerCountObj = new ObjectName(ACTIVE_CONTROLLER_COUNT);  
  24.             ObjectName partCountObj = new ObjectName(PART_COUNT);  
  25.             Long messagesInPerSec = (Long) jmxConnection.getAttribute(messageCountObj, "Count");  
  26.             Long bytesInPerSec = (Long) jmxConnection.getAttribute(bytesInPerSecObj, "Count");  
  27.             Long bytesOutPerSec = (Long) jmxConnection.getAttribute(bytesOutPerSecObj, "Count");  
  28.             Long produceRequestCountPerSec = (Long) jmxConnection.getAttribute(produceRequestsPerSecObj, "Count");  
  29.             Long consumerRequestCountPerSec = (Long) jmxConnection.getAttribute(consumerRequestsPerSecObj, "Count");  
  30.             Long flowerRequestCountPerSec = (Long) jmxConnection.getAttribute(flowerRequestsPerSecObj, "Count");  
  31.             Integer activeControllerCount = (Integer) jmxConnection.getAttribute(activeControllerCountObj, "Value");  
  32.             Integer partCount = (Integer) jmxConnection.getAttribute(partCountObj, "Value");  
  33.             monitorDataPoint.setMessagesInPerSec(messagesInPerSec);  
  34.             monitorDataPoint.setBytesInPerSec(bytesInPerSec);  
  35.             monitorDataPoint.setBytesOutPerSec(bytesOutPerSec);  
  36.             monitorDataPoint.setProduceRequestCountPerSec(produceRequestCountPerSec);  
  37.             monitorDataPoint.setConsumerRequestCountPerSec(consumerRequestCountPerSec);  
  38.             monitorDataPoint.setFlowerRequestCountPerSec(flowerRequestCountPerSec);  
  39.             monitorDataPoint.setActiveControllerCount(activeControllerCount);  
  40.             monitorDataPoint.setPartCount(partCount);  
  41.         } catch (IOException e) {  
  42.             e.printStackTrace();  
  43.         } catch (MalformedObjectNameException e) {  
  44.             e.printStackTrace();  
  45.         } catch (AttributeNotFoundException e) {  
  46.             e.printStackTrace();  
  47.         } catch (MBeanException e) {  
  48.             e.printStackTrace();  
  49.         } catch (ReflectionException e) {  
  50.             e.printStackTrace();  
  51.         } catch (InstanceNotFoundException e) {  
  52.             e.printStackTrace();  
  53.         }  
  54.         return monitorDataPoint.toString();  
  55.     }  
  56.     public static void main(String[] args) {  
  57.         System.out.println(new KafkaDataProvider().extractMonitorData());  
  58.     }  
  59.     /** 
  60.      * 获得MBeanServer 的连接 
  61.      * 
  62.      * @param jmxUrl 
  63.      * @return 
  64.      * @throws IOException 
  65.      */  
  66.     public MBeanServerConnection getMBeanServerConnection(String jmxUrl) throws IOException {  
  67.         JMXServiceURL url = new JMXServiceURL(jmxUrl);  
  68.         JMXConnector jmxc = JMXConnectorFactory.connect(url, null);  
  69.         MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();  
  70.         return mbsc;  
  71.     }  
  72. }  

其他工具

除了自己编写定制化的监控程序外


kafka-web-console

https://github.com/claudemamo/kafka-web-console
部署sbt:
http://www.scala-sbt.org/0.13/tutorial/Manual-Installation.html
http://www.scala-sbt.org/release/tutorial/zh-cn/Installing-sbt-on-Linux.html

KafkaOffsetMonitor

https://github.com/quantifind/KafkaOffsetMonitor/releases/tag/v0.2.0
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk localhost:12181 --port 8080 --refresh 5.minutes --retain 1.day

Mx4jLoader

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
微服务
使用钉钉扫一扫加入圈子
+ 订阅

构建可靠、高效、易扩展的技术基石

其他文章