Python 基于Python结合pykafka实现kafka生产及消费速率&主题分区偏移实时监控

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,182元/月
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Python 基于Python结合pykafka实现kafka生产及消费速率&主题分区偏移实时监控

基于Python结合pykafka实现kafka生产及消费速率&主题分区偏移实时监控


 

1.测试环境

python 3.4

 

zookeeper-3.4.13.tar.gz

下载地址1:

http://zookeeper.apache.org/releases.html#download

https://www.apache.org/dyn/closer.cgi/zookeeper/

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

 

下载地址2:

https://pan.baidu.com/s/1dnBgHvySE9pVRZXJVmezyQ

 

kafka_2.12-2.1.0.tgz

下载地址1:

http://kafka.apache.org/downloads.html

下载地址2:

https://pan.baidu.com/s/1VnHkJgy4iQ73j5rLbEL0jw

 

pykafka-2.8.0.tar.gz

下载地址1:

https://pypi.org/project/pykafka/

https://files.pythonhosted.org/packages/55/4b/4828ec5ed766cca0c27de234688122494c5762965e70deeb88b84f5d8d98/pykafka-2.8.0.tar.gz

  

 

2.实现功能

实时采集Kafka生产者主题生产速率,主题消费速率,主题分区偏移,消费组消费速率,支持同时对多个来自不同集群的主题进行实时采集,支持同时对多个消费组实时采集

 

 

 

3.使用前提

1、“主题消费速率”&“消费组消费速率” 统计 依赖“消费组”,所以要统计消费速率,必须存在消费组才能统计;

 

2、“主题消费速率”&“消费组消费速率” 统计 依赖消费者自动、手动提交“offset”,所以所以要统计消费速率,必须确保消费者消费时,会提交消息的offset

 

3、Kafka版本大于等于0.10.1.1

 

 

 

4.使用方法

influxDB主机配置

KafkaMonitor\conf\influxDB.conf

[INFLUXDB]

influxdb_host = 10.203.25.106

influxdb_port = 8086

 

brokers集群配置

KafkaMonitor\conf\brokers.conf

 

[CLUSTER1]

broker1 = 127.0.0.1:9092

 

[bus]

#broker1 =10.202.xxx.xx:9096,10.202.xx.xx:9096,10.202.xxx.x:9096

 

格式说明:

[集群名称]

自定义brokers标识 = broker ip:port配置(如果有多个broker,用英文逗号分隔)

 

如果不想对指定集群进行监控(不监控该集群的主题生产、消费速率,主题分区偏移,消费组消费速率),用 # 号注释掉 该集群的“自定义brokers标识” 所在行即可,如上

 

topics主题配置

KafkaMonitor\conf\brokers.conf

 

[CLUSTER1]

topic1 = MY_TOPIC1

 

[bus]

topic1=NEXT_MARM_CORE_REPORT

#topic2=NEXT_MARM_CORE_EVENT

 

格式说明:

[集群名称]

自定义topic 标识 = topic名称

 

如果不想对指定主题进行监控(不监控该主题的生产、消费速率,主题分区偏移,该主题相关消费组消费速率),用 # 号注释掉 该集群的“自定义 topic标识” 所在行即可,如上

 

注意:每个集群名称下的 自定义 topic 标识不能重复

consumer_groups消费组配置

KafkaMonitor\conf\consumer_groups.conf

 

[CLUSTER1]

groupID1 = MY_TOPIC1|MY_GROUP1:5000


[bus]

#groupID1=NEXT_MARM_CORE_EVENT|NEXT_MARM_CORE_TASK

groupID2=NEXT_MARM_CORE_REPORT|NEXT_MARM_CORE_REPORT,NEXT_MARM_CORE_REPORTTAG

 

格式说明:

[集群名称]

自定义consumer_groups 标识 = 主题名称|消费该主题的消费组名称[:提交msg offset的时间间隔(单位为 毫秒)](如果有多个消费组,彼此之间用逗号分隔)

 

注意:

1、如果有为消费组设置提交msg offset的时间间隔,并且该时间间隔大于统一设置的数据采集频率,那么该消费组的数据采集频率将自动调整为对应的 提交msg offset的时间间隔/1000 + 1

2、主题消费速率的统计依赖消费该主题的所有消费组的数据信息,所以,同一个主题,不要配置在多个“自定义consumer_groups 标识”配置值中

3、主题消费速率数据采集频率取最大值 max(统一设置的数据采集频率,max(消费该主题的消费组提交msg offset的时间间隔/1000 + 1))

 

如果不想对指定消费组进行监控(不监控该消费组消费速率,消费组关联的主题消费速率),用 # 号注释掉 该集群的“自定义consumer_groups 标识” 所在行即可,如上,,或者把对应消费组及其提交msg offset的时间间隔信息删除即可。

 

运行程序

python main.py 采集频率(单位 秒) 采集时长

eg:

每5秒采集一次,总共采集120秒

python main.py 5 120

 

 

 

 

注意:

如果(根据配置自动调整后的)采集频率时间间隔大于单次程序采样耗时,则处理完成后立即进行下一次采样,忽略采样频率设置,实际采集时长变长,但是采集次数不变 int(采集时长/采样频率)

 

grafana图表配置

数据源配置

 

 

说明:Database db_+brokers.conf中配置的集群名称

 

Dashboard变量配置

 

 

 

 

Dashboard Pannel主要配置项

 

 

 

效果展示

 

 

参考链接:

https://pykafka.readthedocs.io/en/latest/index.html

 

源码下载地址:

https://gitee.com/ishouke/KafkaMonitor

目录
相关文章
|
11月前
|
消息中间件 负载均衡 Kafka
【赵渝强老师】Kafka的主题与分区
Kafka 中的消息按主题分类,生产者发送消息到特定主题,消费者订阅主题消费。主题可分多个分区,每个分区仅属一个主题。消息追加到分区时,Broker 分配唯一偏移量地址,确保消息在分区内的顺序性。Kafka 保证分区有序而非主题有序。示例中,Topic A 有 3 个分区,分区可分布于不同 Broker 上,支持负载均衡和容错。视频讲解及图示详见原文。
258 2
|
11月前
|
消息中间件 监控 负载均衡
在Kafka中,如何进行主题的分区和复制?
在Kafka中,如何进行主题的分区和复制?
|
消息中间件 监控 负载均衡
在Kafka中,如何进行主题的分区和复制?
在Kafka中,如何进行主题的分区和复制?
|
消息中间件 JSON 大数据
大数据-65 Kafka 高级特性 分区 Broker自动再平衡 ISR 副本 宕机恢复再重平衡 实测
大数据-65 Kafka 高级特性 分区 Broker自动再平衡 ISR 副本 宕机恢复再重平衡 实测
267 4
|
消息中间件 分布式计算 算法
大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
175 3
|
消息中间件 JSON 大数据
大数据-66 Kafka 高级特性 分区Partition 副本因子Replication Factor replicas动态修改 线上动态修改副本数
大数据-66 Kafka 高级特性 分区Partition 副本因子Replication Factor replicas动态修改 线上动态修改副本数
317 1
|
11月前
|
消息中间件 Kafka
【赵渝强老师】Kafka分区的副本机制
在Kafka中,每个主题可有多个分区,每个分区有多个副本。其中仅有一个副本为Leader,负责对外服务,其余为Follower。当Leader所在Broker宕机时,Follower可被选为新的Leader,实现高可用。文中附有示意图及视频讲解。
306 0
|
9月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
435 1
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
287 1

推荐镜像

更多