Python 基于Python结合pykafka实现kafka生产及消费速率&主题分区偏移实时监控

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: Python 基于Python结合pykafka实现kafka生产及消费速率&主题分区偏移实时监控

基于Python结合pykafka实现kafka生产及消费速率&主题分区偏移实时监控


 

1.测试环境

python 3.4

 

zookeeper-3.4.13.tar.gz

下载地址1:

http://zookeeper.apache.org/releases.html#download

https://www.apache.org/dyn/closer.cgi/zookeeper/

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

 

下载地址2:

https://pan.baidu.com/s/1dnBgHvySE9pVRZXJVmezyQ

 

kafka_2.12-2.1.0.tgz

下载地址1:

http://kafka.apache.org/downloads.html

下载地址2:

https://pan.baidu.com/s/1VnHkJgy4iQ73j5rLbEL0jw

 

pykafka-2.8.0.tar.gz

下载地址1:

https://pypi.org/project/pykafka/

https://files.pythonhosted.org/packages/55/4b/4828ec5ed766cca0c27de234688122494c5762965e70deeb88b84f5d8d98/pykafka-2.8.0.tar.gz

  

 

2.实现功能

实时采集Kafka生产者主题生产速率,主题消费速率,主题分区偏移,消费组消费速率,支持同时对多个来自不同集群的主题进行实时采集,支持同时对多个消费组实时采集

 

 

 

3.使用前提

1、“主题消费速率”&“消费组消费速率” 统计 依赖“消费组”,所以要统计消费速率,必须存在消费组才能统计;

 

2、“主题消费速率”&“消费组消费速率” 统计 依赖消费者自动、手动提交“offset”,所以所以要统计消费速率,必须确保消费者消费时,会提交消息的offset

 

3、Kafka版本大于等于0.10.1.1

 

 

 

4.使用方法

influxDB主机配置

KafkaMonitor\conf\influxDB.conf

[INFLUXDB]

influxdb_host = 10.203.25.106

influxdb_port = 8086

 

brokers集群配置

KafkaMonitor\conf\brokers.conf

 

[CLUSTER1]

broker1 = 127.0.0.1:9092

 

[bus]

#broker1 =10.202.xxx.xx:9096,10.202.xx.xx:9096,10.202.xxx.x:9096

 

格式说明:

[集群名称]

自定义brokers标识 = broker ip:port配置(如果有多个broker,用英文逗号分隔)

 

如果不想对指定集群进行监控(不监控该集群的主题生产、消费速率,主题分区偏移,消费组消费速率),用 # 号注释掉 该集群的“自定义brokers标识” 所在行即可,如上

 

topics主题配置

KafkaMonitor\conf\brokers.conf

 

[CLUSTER1]

topic1 = MY_TOPIC1

 

[bus]

topic1=NEXT_MARM_CORE_REPORT

#topic2=NEXT_MARM_CORE_EVENT

 

格式说明:

[集群名称]

自定义topic 标识 = topic名称

 

如果不想对指定主题进行监控(不监控该主题的生产、消费速率,主题分区偏移,该主题相关消费组消费速率),用 # 号注释掉 该集群的“自定义 topic标识” 所在行即可,如上

 

注意:每个集群名称下的 自定义 topic 标识不能重复

consumer_groups消费组配置

KafkaMonitor\conf\consumer_groups.conf

 

[CLUSTER1]

groupID1 = MY_TOPIC1|MY_GROUP1:5000


[bus]

#groupID1=NEXT_MARM_CORE_EVENT|NEXT_MARM_CORE_TASK

groupID2=NEXT_MARM_CORE_REPORT|NEXT_MARM_CORE_REPORT,NEXT_MARM_CORE_REPORTTAG

 

格式说明:

[集群名称]

自定义consumer_groups 标识 = 主题名称|消费该主题的消费组名称[:提交msg offset的时间间隔(单位为 毫秒)](如果有多个消费组,彼此之间用逗号分隔)

 

注意:

1、如果有为消费组设置提交msg offset的时间间隔,并且该时间间隔大于统一设置的数据采集频率,那么该消费组的数据采集频率将自动调整为对应的 提交msg offset的时间间隔/1000 + 1

2、主题消费速率的统计依赖消费该主题的所有消费组的数据信息,所以,同一个主题,不要配置在多个“自定义consumer_groups 标识”配置值中

3、主题消费速率数据采集频率取最大值 max(统一设置的数据采集频率,max(消费该主题的消费组提交msg offset的时间间隔/1000 + 1))

 

如果不想对指定消费组进行监控(不监控该消费组消费速率,消费组关联的主题消费速率),用 # 号注释掉 该集群的“自定义consumer_groups 标识” 所在行即可,如上,,或者把对应消费组及其提交msg offset的时间间隔信息删除即可。

 

运行程序

python main.py 采集频率(单位 秒) 采集时长

eg:

每5秒采集一次,总共采集120秒

python main.py 5 120

 

 

 

 

注意:

如果(根据配置自动调整后的)采集频率时间间隔大于单次程序采样耗时,则处理完成后立即进行下一次采样,忽略采样频率设置,实际采集时长变长,但是采集次数不变 int(采集时长/采样频率)

 

grafana图表配置

数据源配置

 

 

说明:Database db_+brokers.conf中配置的集群名称

 

Dashboard变量配置

 

 

 

 

Dashboard Pannel主要配置项

 

 

 

效果展示

 

 

参考链接:

https://pykafka.readthedocs.io/en/latest/index.html

 

源码下载地址:

https://gitee.com/ishouke/KafkaMonitor

目录
相关文章
|
6月前
|
数据采集 自然语言处理 算法
如何使用Python的Gensim库进行自然语言处理和主题建模?
使用Gensim库进行Python自然语言处理和主题建模,包括:1) 安装Gensim;2) 导入`corpora`, `models`, `nltk`等相关模块;3) 对文本数据进行预处理,如分词和去除停用词;4) 创建字典和语料库;5) 使用LDA算法训练模型;6) 查看每个主题的主要关键词。代码示例展示了从数据预处理到主题提取的完整流程。
157 3
|
6月前
|
消息中间件 分布式计算 监控
Python面试:消息队列(RabbitMQ、Kafka)基础知识与应用
【4月更文挑战第18天】本文探讨了Python面试中RabbitMQ与Kafka的常见问题和易错点,包括两者的基础概念、特性对比、Python客户端使用、消息队列应用场景及消息可靠性保证。重点讲解了消息丢失与重复的避免策略,并提供了实战代码示例,帮助读者提升在分布式系统中使用消息队列的能力。
200 2
|
2月前
|
消息中间件 Kafka Python
|
2月前
|
消息中间件 Kafka API
python之kafka日志
python之kafka日志
25 3
|
3月前
|
监控 数据可视化 前端开发
基于python django生产数据与计划大屏,可链接数据库
本文介绍了一个基于Python Django框架开发的生产数据与计划大屏系统,该系统能够实时采集和展示生产数据,支持数据可视化和实时更新,以提高生产监控的效率和质量。
|
4月前
|
消息中间件 存储 Kafka
深入理解Kafka核心设计及原理(四):主题管理
深入理解Kafka核心设计及原理(四):主题管理
74 8
|
3月前
|
消息中间件 SQL Java
实时数仓 Hologres产品使用合集之如何用python将kafka数据写入
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
4月前
|
数据采集 自然语言处理 大数据
​「Python大数据」LDA主题分析模型
使用Python进行文本聚类,流程包括读取VOC数据、jieba分词、去除停用词,应用LDA模型(n_components=5)进行主题分析,并通过pyLDAvis生成可视化HTML。关键代码涉及数据预处理、CountVectorizer、LatentDirichletAllocation以及HTML文件的本地化处理。停用词和业务术语列表用于优化分词效果。
207 0
​「Python大数据」LDA主题分析模型
|
4月前
|
存储 消息中间件 数据挖掘
Python实时数据分析:利用丰富的库(如Pandas, PySpark, Kafka)进行流处理,涵盖数据获取、预处理、处理、存储及展示。
【7月更文挑战第5天】Python实时数据分析:利用丰富的库(如Pandas, PySpark, Kafka)进行流处理,涵盖数据获取、预处理、处理、存储及展示。示例代码展示了从Kafka消费数据,计算社交媒体活跃度和物联网设备状态,并可视化结果。适用于监控、故障检测等场景。通过学习和实践,提升实时数据分析能力。
100 0
|
6月前
|
开发工具 Python
国外的大学图书馆也像国内的一样吗?用Python脚本抓取期刊的主题标题!
国外的大学图书馆也像国内的一样吗?用Python脚本抓取期刊的主题标题!