使用JMX监控Kafka

简介: JMX监控Kafka
http://blog.csdn.net/eric_sunah/article/details/44980385?utm_source=tuicool

 

使用JMX监控Kafka

标签: KafkaJMX监控
  2952人阅读  评论(3)  收藏  举报
  分类:

目录(?)[+]

Kafka可以配置使用JMX进行运行状态的监控,既可以通过JDK自带Jconsole来观察结果,也可以通过Java API的方式来.
关于监控指标的描述,可以参考: http://kafka.apache.org/documentation.html#monitoring

开启JMX端口

修改bin/kafka-server-start.sh,添加JMX_PORT参数,添加后样子如下
[html]  view plain  copy
 print ?
  1. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then  
  2.     export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"  
  3.     export JMX_PORT="9999"  
  4. fi  


通过Jconsole测试时候可以连接





通过JavaAPI来访问


通过以下方法获取目标值
[java]  view plain  copy
 print ? 在CODE上查看代码片 派生到我的代码片
  1. public class KafkaDataProvider{  
  2.     protected final Logger LOGGER = LoggerFactory.getLogger(getClass());  
  3.     private static final String MESSAGE_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec";  
  4.     private static final String BYTES_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec";  
  5.     private static final String BYTES_OUT_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec";  
  6.     private static final String PRODUCE_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce";  
  7.     private static final String CONSUMER_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer";  
  8.     private static final String FLOWER_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower";  
  9.     private static final String ACTIVE_CONTROLLER_COUNT = "kafka.controller:type=KafkaController,name=ActiveControllerCount";  
  10.     private static final String PART_COUNT = "kafka.server:type=ReplicaManager,name=PartitionCount";  
  11.     public String extractMonitorData() {  
  12.         //TODO 通过调用API获得IP以及参数  
  13.         KafkaRoleInfo monitorDataPoint = new KafkaRoleInfo();  
  14.         String jmxURL = "service:jmx:rmi:///jndi/rmi://192.168.40.242:9999/jmxrmi";  
  15.         try {  
  16.             MBeanServerConnection jmxConnection = MetricDataUtils.getMBeanServerConnection(jmxURL);  
  17.             ObjectName messageCountObj = new ObjectName(MESSAGE_IN_PER_SEC);  
  18.             ObjectName bytesInPerSecObj = new ObjectName(BYTES_IN_PER_SEC);  
  19.             ObjectName bytesOutPerSecObj = new ObjectName(BYTES_OUT_PER_SEC);  
  20.             ObjectName produceRequestsPerSecObj = new ObjectName(PRODUCE_REQUEST_PER_SEC);  
  21.             ObjectName consumerRequestsPerSecObj = new ObjectName(CONSUMER_REQUEST_PER_SEC);  
  22.             ObjectName flowerRequestsPerSecObj = new ObjectName(FLOWER_REQUEST_PER_SEC);  
  23.             ObjectName activeControllerCountObj = new ObjectName(ACTIVE_CONTROLLER_COUNT);  
  24.             ObjectName partCountObj = new ObjectName(PART_COUNT);  
  25.             Long messagesInPerSec = (Long) jmxConnection.getAttribute(messageCountObj, "Count");  
  26.             Long bytesInPerSec = (Long) jmxConnection.getAttribute(bytesInPerSecObj, "Count");  
  27.             Long bytesOutPerSec = (Long) jmxConnection.getAttribute(bytesOutPerSecObj, "Count");  
  28.             Long produceRequestCountPerSec = (Long) jmxConnection.getAttribute(produceRequestsPerSecObj, "Count");  
  29.             Long consumerRequestCountPerSec = (Long) jmxConnection.getAttribute(consumerRequestsPerSecObj, "Count");  
  30.             Long flowerRequestCountPerSec = (Long) jmxConnection.getAttribute(flowerRequestsPerSecObj, "Count");  
  31.             Integer activeControllerCount = (Integer) jmxConnection.getAttribute(activeControllerCountObj, "Value");  
  32.             Integer partCount = (Integer) jmxConnection.getAttribute(partCountObj, "Value");  
  33.             monitorDataPoint.setMessagesInPerSec(messagesInPerSec);  
  34.             monitorDataPoint.setBytesInPerSec(bytesInPerSec);  
  35.             monitorDataPoint.setBytesOutPerSec(bytesOutPerSec);  
  36.             monitorDataPoint.setProduceRequestCountPerSec(produceRequestCountPerSec);  
  37.             monitorDataPoint.setConsumerRequestCountPerSec(consumerRequestCountPerSec);  
  38.             monitorDataPoint.setFlowerRequestCountPerSec(flowerRequestCountPerSec);  
  39.             monitorDataPoint.setActiveControllerCount(activeControllerCount);  
  40.             monitorDataPoint.setPartCount(partCount);  
  41.         } catch (IOException e) {  
  42.             e.printStackTrace();  
  43.         } catch (MalformedObjectNameException e) {  
  44.             e.printStackTrace();  
  45.         } catch (AttributeNotFoundException e) {  
  46.             e.printStackTrace();  
  47.         } catch (MBeanException e) {  
  48.             e.printStackTrace();  
  49.         } catch (ReflectionException e) {  
  50.             e.printStackTrace();  
  51.         } catch (InstanceNotFoundException e) {  
  52.             e.printStackTrace();  
  53.         }  
  54.         return monitorDataPoint.toString();  
  55.     }  
  56.     public static void main(String[] args) {  
  57.         System.out.println(new KafkaDataProvider().extractMonitorData());  
  58.     }  
  59.     /** 
  60.      * 获得MBeanServer 的连接 
  61.      * 
  62.      * @param jmxUrl 
  63.      * @return 
  64.      * @throws IOException 
  65.      */  
  66.     public MBeanServerConnection getMBeanServerConnection(String jmxUrl) throws IOException {  
  67.         JMXServiceURL url = new JMXServiceURL(jmxUrl);  
  68.         JMXConnector jmxc = JMXConnectorFactory.connect(url, null);  
  69.         MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();  
  70.         return mbsc;  
  71.     }  
  72. }  

其他工具

除了自己编写定制化的监控程序外


kafka-web-console

https://github.com/claudemamo/kafka-web-console
部署sbt:
http://www.scala-sbt.org/0.13/tutorial/Manual-Installation.html
http://www.scala-sbt.org/release/tutorial/zh-cn/Installing-sbt-on-Linux.html

KafkaOffsetMonitor

https://github.com/quantifind/KafkaOffsetMonitor/releases/tag/v0.2.0
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk localhost:12181 --port 8080 --refresh 5.minutes --retain 1.day

Mx4jLoader

目录
相关文章
|
20天前
|
消息中间件 监控 数据可视化
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
41 2
|
3月前
|
消息中间件 监控 Java
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
|
5月前
|
消息中间件 监控 Java
「布道师系列文章」宝兰德徐清康解析 Kafka 和 AutoMQ 的监控
本文由北京宝兰德公司解决方案总监徐清康撰写,探讨了Kafka和AutoMQ集群的监控。
225 2
「布道师系列文章」宝兰德徐清康解析 Kafka 和 AutoMQ 的监控
|
4月前
|
消息中间件 监控 Java
使用 JMX 监控 Kafka 集群性能指标
使用 JMX 监控 Kafka 集群性能指标
422 1
|
4月前
|
消息中间件 NoSQL Kafka
基于Kafka的nginx日志收集分析与监控平台(3)
基于Kafka的nginx日志收集分析与监控平台(3)
|
4月前
|
消息中间件 监控 Kafka
基于Kafka的nginx日志收集分析与监控平台(2)
基于Kafka的nginx日志收集分析与监控平台(2)
|
4月前
|
消息中间件 负载均衡 应用服务中间件
基于Kafka的nginx日志收集分析与监控平台(1)
基于Kafka的nginx日志收集分析与监控平台(1)
|
12天前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
21天前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
29 1
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
195 9

相关实验场景

更多