来源数据可以分为服务端也就是broker端和客户端,服务端因为相对固定通过JMX方式进行抓取即可,客户端的话,本身一般无状态,像现在的分布式应用,配合动态扩容缩容,像现在的K8S应用,每次重新发布IP都不一样,所以无法使用对固定地址JMX的方式抓取,可以改成主动上报的方式,下面分别说明。
Kafka版本为目前最新的:3.2.x
对应文档:Kafka官方文档-监控
环境搭建
- 下载解压
$ tar-xzf kafka_2.13-3.2.0.tgz $ cd kafka_2.13-3.2.0
- 启动zookeeper
# -daemon:后台运行# windows的运行脚本在bin/windwos,并去掉-daemon$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
3. 启动broker
# 设置环境变量# Powershell:$env:JMX_PORT=9999# CMD:set JMX_PORT=9999# Linux:JMX_PORT=9999$ bin/kafka-server-start.sh -daemon config/server.properties
如果是远程连接需要修改config/server.propertiesr,因为默认会将localhost注册到zookeeper,连接的时候会从读取到localhost导致无法连接
# config/server.properties:listeners=PLAINTEXT://:9092 # 改成listeners=PLAINTEXT://172.23.70.123:9092 # 172.23.70.123换成自己的IP
如果是使用Windows的WSL,还需要修改bin/kafka-run-class.sh,指定JMX的hostname,不然无法通过JMX连接,这好像只是WSL的问题,其他系统不需要修改(知道原因的大佬,可以分享一下原理)。
# bin/kafka-run-class.sh:# JMX settingsif [ -z"$KAFKA_JMX_OPTS" ]; thenKAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "fi# 在前面加上-Djava.rmi.server.hostname=172.23.70.123# 改成# JMX settingsif [ -z"$KAFKA_JMX_OPTS" ]; thenKAFKA_JMX_OPTS="-Djava.rmi.server.hostname=172.23.70.123 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "fi
- 创建Topic
bin/kafka-topics.sh --create--topic test-topic --bootstrap-server localhost:9092 5.
- 生成测试数据
# 生成10000条数据bin/kafka-producer-perf-test.sh --num-records10000--record-size=1024--throughput-1--topic test-topic --producer-props bootstrap.servers=localhost:9092
- 尝试消费
# --from-beginning 从头消费# --max-messages 1 只消费一条数据bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092 --from-beginning--max-messages1到这里完成了Topic的创建和生成了测试数据,下面我们开始通过JMX抓取
参考
服务端
通过jconsole观察
先用通过JDK自带的jconsole对照官方文档观察一下数据
对照这文档可以找到对应的MBean,可以看到有数据单位、数量、1、5、15分钟的数据,这也是后续通过程序可以抓取到的值
通过Java API抓取
通过下面程序可以抓取指定的MBean
/*** JMX工具类*/publicclassJMXUtils { privatefinalstaticStringJMX_CONNECT_URL="service:jmx:rmi:///jndi/rmi://%s/jmxrmi"; privatefinalstaticStringJMX_CONNECT_HOST="localhost:9999"; /*** 获取MBean attributes* @param mBeanName* @return*/publicstaticMap<String, Object>getMBeanAttrs(StringmBeanName) { MBeanServerConnectionmbeanConnection=null; Map<String, Object>result=newHashMap<>(); try (JMXConnectorconnector=JMXConnectorFactory .connect(newJMXServiceURL(String.format(JMX_CONNECT_URL, JMX_CONNECT_HOST)))) { mbeanConnection=connector.getMBeanServerConnection(); ObjectNamename=newObjectName(mBeanName); MBeanInfomBeanInfo=mbeanConnection.getMBeanInfo(name); for (MBeanAttributeInfoval : mBeanInfo.getAttributes()) { Objectattribute=mbeanConnection.getAttribute(name, val.getName()); result.put(val.getName(), attribute); } } catch (Exceptione) { e.printStackTrace(); } returnresult; } publicstaticvoidmain(Stringargv[]) { Map<String, Object>result=JMXUtils .getMBeanAttrs("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec"); /*** 输出样例* name:RateUnit,valus:SECONDS* name:OneMinuteRate,valus:2.0212348203790318E-82* name:EventType,valus:bytes* name:FifteenMinuteRate,valus:1.2405687834521815E-6* name:Count,valus:351* name:FiveMinuteRate,valus:3.76580595627846E-17* name:MeanRate,valus:0.028921775433401117*/for (Map.Entry<String, Object>entry : result.entrySet()) { System.out.println(String.format("name:%s,valus:%s", entry.getKey(), entry.getValue())); } } }
客户端
通过jconsole观察
启动一个消费者并消费test-topic
bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092
通过jconsole连接
可以看到对应这两个部分的监控点数据
主动上报改造
创建Maven项目,引入kafka-client
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.2.0</version></dependency>
新建PushReporter实现MetricsReporter,用于处理获取到的监控数据,在metricChange方法实现主动上报
publicclassPushReporterimplementsMetricsReporter { privateHttpClienthttpClient; publicvoidconfigure(Map<String, ?>configs) { // TODO Auto-generated method stub } publicvoidinit(List<KafkaMetric>metrics) { // 初始化HTTP客户端httpClient=HttpClient.newBuilder().build(); } publicvoidmetricChange(KafkaMetricmetric) { // 向自己的收集接口发送数据Stringurl="https://www.baidu.com"; try { HttpRequestrequest=HttpRequest.newBuilder(newURI(url)).build(); HttpResponse<String>response=httpClient.send(request, HttpResponse.BodyHandlers.ofString()); System.out.println(String.format("metricName:%s, metricValue:%s, response:%s", metric.metricName().name(), metric.metricValue(), response.statusCode())); } catch (IOException|InterruptedException|URISyntaxExceptione) { e.printStackTrace(); } } publicvoidmetricRemoval(KafkaMetricmetric) { // TODO Auto-generated method stub } publicvoidclose() { // TODO Auto-generated method stub } }
创建并启动消费者
publicclassKafkaConsumerClient { publicstaticvoidmain(String[] args) { Propertiesprops=newProperties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test-consumer"); props.setProperty("enable.auto.commit", "false"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 上面实现的PushReporterprops.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "com.xdd.kafka.monitor.metric.PushReporter"); try (KafkaConsumer<String, String>consumer=newKafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList("test-topic")); while (true) { ConsumerRecords<String, String>records=consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String>record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } }
源码分析
在KafkaConsumer的这个方法会构建监控点,读取上面配置的ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,继续看看getConfiguredInstances方法
继续跟下去会到下面这两个方法,可以看到读取到上面配置的PushReporter
添加到监听列表中,只要加到监听列表里之后,在监控点变更时就会回调PushReporter实现的各种方法
详细可以看Metrics的registerMetric和其他方法
因为PushReporter只对baidu.com发送请求,然后打印监控数据和响应结果,所以可以看到以下日志打印
metricName:count, metricValue:1.0, response:200 metricName:fetch-throttle-time-avg, metricValue:NaN, response:200 metricName:fetch-throttle-time-max, metricValue:NaN, response:200
总结
至此实现了服务端和客户端的监控数据收集,补充了上篇运维平台的监控数据采集实现,也通过追踪源码的方式了解客户端是如何完成上报的,感谢阅读。