大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(正在更新…)

章节内容

上节我们完成了如下的内容:


Kafka 集群模式搭建

Kafka 集群模式的应用场景

实机云服务器搭建

6d2444c0a9e48f2ad70412545587961e_47e39d43078e4fe79895b0a9859372a9.png 监控度量指标

Kafka使用Yammer Metrics在服务器和Scala客户端中报告指标,Java客户端使用Kafka Metrics,它是一种内置的度量标准注册表,可最大程度的减少拉入客户端应用程序的传递依赖项。

两者都通过JMX公开指标,并且可以配置为使用可插拔的统计报告器报告统计信息,以连接到你的监控系统中。


JMX

export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote \
                       -Dcom.sun.management.jmxremote.port=9999 \
                       -Dcom.sun.management.jmxremote.authenticate=false \
                       -Dcom.sun.management.jmxremote.ssl=false \
                       -Djava.rmi.server.hostname=${服务器的IP,尽量写IP,不要hostname或者域名}"

接着我们启动Kafka:

kafka-server-start.sh /opt/servers/kafka_2.12-2.7.2/config/server.properties

JConsole

在本机上启动 jconsole 服务,我们运行如下指令:(本机要有JDK)

启动窗口如下图所示:

我们输入Kafka的地址和端口:

连接成功之后页面如下图:

我们选择 MBean 选项卡:

可以看到对应的数据情况:

详细监控指标

http://kafka.apache.org/10/documentation.html#monitoring

OS监控项

Broker指标

Producer和Topic指标

Consumer指标

获取监控指标

我们可以通过编程的方式来获取到Kafka的指标信息:

编写代码

package icu.wzk.kafka;

import javax.management.MBeanServerConnection;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.util.Iterator;
import java.util.Set;

public class JMXMonitorDemo {

    public static void main(String[] args) throws Exception {
        String jmxServiceUrl = "service:jmx:rmi:///jndi/rmi://h121.wzk.icu:9999/jmxrmi";
        JMXServiceURL jmxUrl = null;
        JMXConnector jmxc = null;
        MBeanServerConnection jmxs = null;
        ObjectName mbeanObjectName = null;
        Iterator sampleIter = null;
        Set sampleSet = null;

        // 创建JMXServiceURL 对象
        jmxUrl = new JMXServiceURL(jmxServiceUrl);
        // 建立指定的URL服务器的连接
        jmxc = JMXConnectorFactory.connect(jmxUrl);
        // 返回代表远程MBean服务器的MBeanServiceConnection对象
        jmxs = jmxc.getMBeanServerConnection();
        // 根据传入的字符串,创建ObjectName对象
        mbeanObjectName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec");
        // 指定ObjectName对应的MBeans
        sampleSet = jmxs.queryMBeans(null, mbeanObjectName);
        // 迭代器
        sampleIter = sampleSet.iterator();
        if (!sampleSet.isEmpty()) {
            // 如果返回了 则打印信息
            while (sampleIter.hasNext()) {
                ObjectInstance sampleObject = (ObjectInstance) sampleIter.next();
                ObjectName objectName = sampleObject.getObjectName();
                // 查看指定MBean指定属性的值
                String count = jmxs.getAttribute(objectName, "Count").toString();
                System.out.println("count: " + count);
            }
        }
        // 关闭
        jmxc.close();
    }
}

运行测试

控制台输出结果如下:

Kafka Eagle

我们可以使用 kafka-eagle 管理 Kafka集群。


核心模块

面板可视化

主题管理,包含创建主题、删除主题、主题列举、主题配置、主题查询

消费者应用:对不同消费者应用进行监控,包含KafkaAPI、FlinkAPI、SparkAPI、StormAPI、FlumeAPI、LogStashAPI等等

集群管理:包含对Kafka集群和ZooKeeper集群的详情展示,其内容包含Kafka启动时间、Kafka端口号、ZooKeeperLeader角色等。同时,还有多集群切换管理,ZooKeeperClient操作入口

集群监控:包含对Broker、Kafka核心指标、ZooKeeper核心指标进行监控,并绘制历史趋势图

告警功能:对消费者应用数据积压情况进行告警,以及对Kafka和ZooKeeper监控度进行告警,同时,支持邮件、微信、钉钉告警通知

系统管理:包含用户创建、用户角色分配、资源访问进行管理

整体架构

可视化:负责展示主题列表、集群健康、消费应用等

采集器:数据采集的来源包含ZooKeeper、Kafka JMX & 内部Topic、KafkaAPI(2.x以后版本)

数据存储:目前Kafka Eagle存储采用MySQL或SQLite,数据库和表的创建均是自动完成的,按照官方文档配置好即可,启动Kafka Eagle就会自动创建,用来存储元数据和监控数据

监控:负责见消费者应用消费情况,集群健康状态

告警:对监控到的异常进行告警通知,支持邮件、微信、钉钉等方式

权限管理:对访问用户进行权限管理,对于管理员、开发者、访问者等不同角色的用户,分配不用的访问权限

下载项目

# Github 地址
# https://github.com/smartloli/EFAK

wget https://github.com/smartloli/kafka-eagle-bin/archive/v3.0.1.tar.gz
mv v3.0.1.tar.gz kafka-eagle-v3.0.1.tar.gz
tar -zxvf kafka-eagle-v3.0.1.tar.gz
cd kafka-eagle-bin-3.0.1/
tar -zxvf efak-web-3.0.1-bin.tar.gz
mv efak-web-3.0.1/ /opt/servers/

下载过程如下图所示:

整理好的项目如下所示:

配置项目

cd /opt/servers/efak-web-3.0.1
• 1

修改配置文件

vim conf/system-config.properties

文件按照自己的需要修改,我这里修改了部分:

efak.zk.cluster.alias=cluster1
cluster1.zk.list=h121.wzk.icu:2181,h122.wzk.icu:2181,h123.wzk.icu:2181
######################################
# kafka sqlite jdbc driver address
######################################
efak.driver=org.sqlite.JDBC
efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
efak.username=root
efak.password=www.kafka-eagle.org

# 我注释掉了MySQL

此时我们需要新建一个文件夹:

mkdir -p /hadoop/kafka-eagle/db/

环境变量

vim /etc/profile

# efak
export KE_HOME=/opt/servers/efak-web-3.0.1
export PATH=$PATH:$KE_HOME/bin

启动服务

./bin/ke.sh start

启动我们的服务,如下图所示:

访问服务

http://h121.wzk.icu:8048

admin
123456

运行结果如下图所示:

打开之后,填写账号密码:

62a5074764c700da30745cd2af92d3f1_c744df43845d4133b81d0531021f6b8d.png

目录
相关文章
|
4天前
|
消息中间件 人工智能 安全
秒级灾备恢复:Kafka 2025 AI自愈集群下载及跨云Topic迁移终极教程
Apache Kafka 2025作为企业级实时数据中枢,实现五大革新:量子安全传输(CRYSTALS-Kyber抗量子加密算法)、联邦学习总线(支持TensorFlow Federated/Horizontal FL框架)、AI自愈集群(MTTR缩短至30秒内)、多模态数据处理(原生支持视频流、3D点云等)和跨云弹性扩展(AWS/GCP/Azure间自动迁移)。平台采用混合云基础设施矩阵与软件依赖拓扑设计,提供智能部署架构。安装流程涵盖抗量子安装包获取、量子密钥配置及联邦学习总线设置。
|
3月前
|
消息中间件 Java Kafka
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
本文介绍了Kafka集群的搭建过程,涵盖从虚拟机安装到集群测试的详细步骤。首先规划了集群架构,包括三台Kafka Broker节点,并说明了分布式环境下的服务进程配置。接着,通过VMware导入模板机并克隆出三台虚拟机(kafka-broker1、kafka-broker2、kafka-broker3),分别设置IP地址和主机名。随后,依次安装JDK、ZooKeeper和Kafka,并配置相应的环境变量与启动脚本,确保各组件能正常运行。最后,通过编写启停脚本简化集群的操作流程,并对集群进行测试,验证其功能完整性。整个过程强调了自动化脚本的应用,提高了部署效率。
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
|
3月前
|
消息中间件 存储 Kafka
2024最全Kafka集群方案汇总
Apache Kafka 是一个高吞吐量、可扩展、可靠的分布式消息系统,广泛应用于数据驱动的应用场景。Kafka 支持集群架构,具备高可用性和容错性。其核心组件包括 Broker(服务器实例)、Topic(消息分类)、Partition(有序消息序列)、Producer(消息发布者)和 Consumer(消息消费者)。每个分区有 Leader 和 Follower,确保数据冗余和高可用。Kafka 2.8+ 引入了不依赖 Zookeeper 的 KRaft 协议,进一步简化了集群管理。常见的集群部署方案包括单节点和多节点集群,后者适用于生产环境以确保高可用性。
155 0
|
4月前
|
SQL 存储 算法
基于对象 - 事件模式的数据计算问题
基于对象-事件模式的数据计算是商业中最常见的数据分析任务之一。对象如用户、账号、商品等,通过唯一ID记录其相关事件,如操作日志、交易记录等。这种模式下的统计任务包括无序计算(如交易次数、通话时长)和有序计算(如漏斗分析、连续交易检测)。尽管SQL在处理无序计算时表现尚可,但在有序计算中却显得力不从心,主要原因是其对跨行记录运算的支持较弱,且大表JOIN和大结果集GROUP BY的性能较差。相比之下,SPL语言通过强化离散性和有序集合的支持,能够高效地处理这类计算任务,避免了大表JOIN和复杂的GROUP BY操作,从而显著提升了计算效率。
|
4月前
|
消息中间件 存储 Prometheus
Kafka集群如何配置高可用性
Kafka集群如何配置高可用性
|
2月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
5月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
207 1
|
5月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
103 1
|
7月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
298 0
|
7月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。