Kafka监控数据采集

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 上篇文章讲解了运维平台的整体设计,对各个部分并未深入介绍,今天将比较重要的一环——监控数据来源进行讲解。

来源数据可以分为服务端也就是broker端和客户端,服务端因为相对固定通过JMX方式进行抓取即可,客户端的话,本身一般无状态,像现在的分布式应用,配合动态扩容缩容,像现在的K8S应用,每次重新发布IP都不一样,所以无法使用对固定地址JMX的方式抓取,可以改成主动上报的方式,下面分别说明。

Kafka版本为目前最新的:3.2.x

对应文档:Kafka官方文档-监控

环境搭建

  1. 下载解压
$ tar-xzf kafka_2.13-3.2.0.tgz
$ cd kafka_2.13-3.2.0
  1. 启动zookeeper
# -daemon:后台运行# windows的运行脚本在bin/windwos,并去掉-daemon$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

3. 启动broker

# 设置环境变量# Powershell:$env:JMX_PORT=9999# CMD:set JMX_PORT=9999# Linux:JMX_PORT=9999$ bin/kafka-server-start.sh -daemon config/server.properties

如果是远程连接需要修改config/server.propertiesr,因为默认会将localhost注册到zookeeper,连接的时候会从读取到localhost导致无法连接

# config/server.properties:listeners=PLAINTEXT://:9092
# 改成listeners=PLAINTEXT://172.23.70.123:9092
# 172.23.70.123换成自己的IP

如果是使用Windows的WSL,还需要修改bin/kafka-run-class.sh,指定JMX的hostname,不然无法通过JMX连接,这好像只是WSL的问题,其他系统不需要修改(知道原因的大佬,可以分享一下原理)。

# bin/kafka-run-class.sh:# JMX settingsif [ -z"$KAFKA_JMX_OPTS" ]; thenKAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false "fi# 在前面加上-Djava.rmi.server.hostname=172.23.70.123# 改成# JMX settingsif [ -z"$KAFKA_JMX_OPTS" ]; thenKAFKA_JMX_OPTS="-Djava.rmi.server.hostname=172.23.70.123 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false "fi
  1. 创建Topic
bin/kafka-topics.sh --create--topic test-topic --bootstrap-server localhost:9092
5.
  1. 生成测试数据
# 生成10000条数据bin/kafka-producer-perf-test.sh --num-records10000--record-size=1024--throughput-1--topic test-topic --producer-props bootstrap.servers=localhost:9092
  1. 尝试消费
# --from-beginning 从头消费# --max-messages 1 只消费一条数据bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092 --from-beginning--max-messages1到这里完成了Topic的创建和生成了测试数据,下面我们开始通过JMX抓取

参考

Kafka官方文档-快速开始

服务端

通过jconsole观察

先用通过JDK自带的jconsole对照官方文档观察一下数据

对照这文档可以找到对应的MBean,可以看到有数据单位、数量、1、5、15分钟的数据,这也是后续通过程序可以抓取到的值

通过Java API抓取

通过下面程序可以抓取指定的MBean

/*** JMX工具类*/publicclassJMXUtils {
privatefinalstaticStringJMX_CONNECT_URL="service:jmx:rmi:///jndi/rmi://%s/jmxrmi";
privatefinalstaticStringJMX_CONNECT_HOST="localhost:9999";
/*** 获取MBean attributes* @param mBeanName* @return*/publicstaticMap<String, Object>getMBeanAttrs(StringmBeanName) {
MBeanServerConnectionmbeanConnection=null;
Map<String, Object>result=newHashMap<>();
try (JMXConnectorconnector=JMXConnectorFactory                .connect(newJMXServiceURL(String.format(JMX_CONNECT_URL, JMX_CONNECT_HOST)))) {
mbeanConnection=connector.getMBeanServerConnection();
ObjectNamename=newObjectName(mBeanName);
MBeanInfomBeanInfo=mbeanConnection.getMBeanInfo(name);
for (MBeanAttributeInfoval : mBeanInfo.getAttributes()) {
Objectattribute=mbeanConnection.getAttribute(name, val.getName());
result.put(val.getName(), attribute);
            }
        } catch (Exceptione) {
e.printStackTrace();
        }
returnresult;
    }
publicstaticvoidmain(Stringargv[]) {
Map<String, Object>result=JMXUtils                .getMBeanAttrs("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec");
/*** 输出样例* name:RateUnit,valus:SECONDS* name:OneMinuteRate,valus:2.0212348203790318E-82* name:EventType,valus:bytes* name:FifteenMinuteRate,valus:1.2405687834521815E-6* name:Count,valus:351* name:FiveMinuteRate,valus:3.76580595627846E-17* name:MeanRate,valus:0.028921775433401117*/for (Map.Entry<String, Object>entry : result.entrySet()) {
System.out.println(String.format("name:%s,valus:%s", entry.getKey(), entry.getValue()));
        }
    }
}


客户端

通过jconsole观察

启动一个消费者并消费test-topic

bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092

通过jconsole连接

可以看到对应这两个部分的监控点数据

主动上报改造

创建Maven项目,引入kafka-client

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.2.0</version></dependency>

新建PushReporter实现MetricsReporter,用于处理获取到的监控数据,在metricChange方法实现主动上报

publicclassPushReporterimplementsMetricsReporter {
privateHttpClienthttpClient;
@Overridepublicvoidconfigure(Map<String, ?>configs) {
// TODO Auto-generated method stub    }
@Overridepublicvoidinit(List<KafkaMetric>metrics) {
// 初始化HTTP客户端httpClient=HttpClient.newBuilder().build();
    }
@OverridepublicvoidmetricChange(KafkaMetricmetric) {
// 向自己的收集接口发送数据Stringurl="https://www.baidu.com";
try {
HttpRequestrequest=HttpRequest.newBuilder(newURI(url)).build();
HttpResponse<String>response=httpClient.send(request, HttpResponse.BodyHandlers.ofString());
System.out.println(String.format("metricName:%s, metricValue:%s, response:%s", metric.metricName().name(),
metric.metricValue(), response.statusCode()));
        } catch (IOException|InterruptedException|URISyntaxExceptione) {
e.printStackTrace();
        }
    }
@OverridepublicvoidmetricRemoval(KafkaMetricmetric) {
// TODO Auto-generated method stub    }
@Overridepublicvoidclose() {
// TODO Auto-generated method stub    }
}


创建并启动消费者

publicclassKafkaConsumerClient {
publicstaticvoidmain(String[] args) {
Propertiesprops=newProperties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test-consumer");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 上面实现的PushReporterprops.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "com.xdd.kafka.monitor.metric.PushReporter");
try (KafkaConsumer<String, String>consumer=newKafkaConsumer<>(props)) {
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
ConsumerRecords<String, String>records=consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String>record : records)
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(),
record.key(),
record.value());
            }
        }
    }
}


源码分析

在KafkaConsumer的这个方法会构建监控点,读取上面配置的ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,继续看看getConfiguredInstances方法

继续跟下去会到下面这两个方法,可以看到读取到上面配置的PushReporter

添加到监听列表中,只要加到监听列表里之后,在监控点变更时就会回调PushReporter实现的各种方法

详细可以看Metrics的registerMetric和其他方法

因为PushReporter只对baidu.com发送请求,然后打印监控数据和响应结果,所以可以看到以下日志打印

metricName:count, metricValue:1.0, response:200
metricName:fetch-throttle-time-avg, metricValue:NaN, response:200
metricName:fetch-throttle-time-max, metricValue:NaN, response:200


总结

至此实现了服务端和客户端的监控数据收集,补充了上篇运维平台的监控数据采集实现,也通过追踪源码的方式了解客户端是如何完成上报的,感谢阅读。

目录
相关文章
|
消息中间件 监控 Kafka
Kafka集群监控系统Kafka Eagle部署与体验
Kafka集群监控系统Kafka Eagle部署与体验
535 0
Kafka集群监控系统Kafka Eagle部署与体验
|
6天前
|
消息中间件 数据采集 关系型数据库
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
23 1
|
11天前
|
数据采集 消息中间件 存储
实时数据处理的终极武器:Databricks与Confluent联手打造数据采集与分析的全新篇章!
【8月更文挑战第9天】利用Databricks与Confluent打造实时数据处理方案。Confluent的Kafka负责数据采集,通过主题接收IoT及应用数据;Databricks运用Structured Streaming处理Kafka数据,并以Delta Lake存储,支持ACID事务。这套组合实现了从数据采集、存储到分析的全流程自动化,满足企业对大数据实时处理的需求。
23 3
|
2月前
|
消息中间件 监控 Java
「布道师系列文章」宝兰德徐清康解析 Kafka 和 AutoMQ 的监控
本文由北京宝兰德公司解决方案总监徐清康撰写,探讨了Kafka和AutoMQ集群的监控。
189 2
「布道师系列文章」宝兰德徐清康解析 Kafka 和 AutoMQ 的监控
|
1月前
|
消息中间件 监控 Java
使用 JMX 监控 Kafka 集群性能指标
使用 JMX 监控 Kafka 集群性能指标
108 1
|
3月前
|
消息中间件 数据采集 分布式计算
【数据采集与预处理】数据接入工具Kafka
【数据采集与预处理】数据接入工具Kafka
56 1
【数据采集与预处理】数据接入工具Kafka
|
1月前
|
消息中间件 NoSQL Kafka
基于Kafka的nginx日志收集分析与监控平台(3)
基于Kafka的nginx日志收集分析与监控平台(3)
|
1月前
|
消息中间件 监控 Kafka
基于Kafka的nginx日志收集分析与监控平台(2)
基于Kafka的nginx日志收集分析与监控平台(2)
|
1月前
|
消息中间件 负载均衡 应用服务中间件
基于Kafka的nginx日志收集分析与监控平台(1)
基于Kafka的nginx日志收集分析与监控平台(1)
|
消息中间件 数据采集 存储
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Channel的Kafka Channel
在大数据处理和管理中,数据采集是非常重要的一环。为了更加高效地进行数据采集,Flume作为一种流式数据采集工具得到了广泛的应用。其中,Flume的Channel模块是实现数据缓存和传输的核心模块之一。本文将介绍Flume中的Kafka Channel,讲解其数据采集流程。
169 0

热门文章

最新文章