安装部署
安装 zookeeper 集群
配置zookeeper集群的核心就是,以下每个zookeeper都要有
vi zoo.cfg dataDir=/opt/apps/data/zkdata server.1=doitedu01:2888:3888 server.2=doitedu02:2888:3888 server.3=doitedu03:2888:3888
安装 kafka 集群
核心操作如下:
vi server.properties #为依次增长的:0、1、2、3、4,集群中唯一 id broker.id=0 # 数据存储的⽬录 # 每一个broker都把自己所管理的数据存储在自己的本地磁盘 log.dirs=/opt/data/kafkadata #底层存储的数据(日志)留存时长(默认 7 天) log.retention.hours=168 #底层存储的数据(日志)留存量(默认 1G) log.retention.bytes=1073741824 #指定 zk 集群地址 zookeeper.connect=doitedu01:2181,doitedu02:2181,doitedu03:2181
查看框架兼容的版本:查看依赖的pom / 要去查看对应框架的源码,然后进入查看
Kafka 运维监控
kafka 自身并没有集成监控管理系统,因此对 kafka 的监控管理比较不便,好在有大量的第三方监控管
理系统来使用,比如Kafka Eagle
Kafka-Eagle 简介
Kafka-Eagle 安装
官方文档地址:https://docs.kafka-eagle.org/
其中修改最核心的配置
###################################### # multi zookeeper & kafka cluster list # Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead ###################################### # 给kafka集群起一个名字 efak.zk.cluster.alias=cluster1 # 告诉它kafka集群所连接的zookeeper在哪里 cluster1.zk.list=doit01:2181,doit02:2181,doit03:2181 ###################################### # broker size online list ###################################### # 告诉集群broker服务器有多少台 cluster1.efak.broker.size=3 # 还需要一种数据库,可以针对性的选择 ###################################### # kafka sqlite jdbc driver address 本地的嵌入式数据库 ###################################### efak.driver=org.sqlite.JDBC efak.url=jdbc:sqlite:/opt/data/kafka-eagle/db/ke.db efak.username=root efak.password=www.kafka-eagle.org ###################################### # kafka mysql jdbc driver address ###################################### #efak.driver=com.mysql.cj.jdbc.Driver #efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior =convertToNull #efak.username=root #efak.password=123456 如上,数据库选择的是 sqlite,需要手动创建所配置的 db 文件存放目录:/opt/data/kafka-eagle/db/ 如果,数据库选择的是 mysql,则要放一个 mysql 的 jdbc 驱动 jar 包到 kafka-eagle 的 lib 目录中
启动 KafkaEagle
访问 web 界面
初始是一个快速看板,快速得到一些最要紧的信息。
启动,先启动zookeeper,再kafka
kafka默认对客户端暴漏的连接端口是 9092、zookeeper默认暴漏的端口是2181
显示信息比如说有多少分区、分布的均匀率、数据倾斜的比例、Leader倾斜的比例等参数。
Kafka-Eagle的意义:
1.对生产力的提升。
2.监控Kafka集群的手段。
命令行工具
概述
Kafka 中提供了许多命令行工具(位于$KAFKA HOME/bin 目录下)用于管理集群的变更。
命令 | 描述 |
kafka-console-consumer.sh | 用于消费消息 |
kafka-console-producer.sh | 用于生产消息 |
kafka-topics.sh | 用于管理主题 |
kafka-server-stop.sh | 用于关闭 Kafka 服务 |
kafka-server-start.sh | 用于启动 Kafka 服务 |
kafka-configs.sh | 用于配置管理 |
kafka-consumer-perf-test.sh | 用于测试消费性能 |
kafka-producer-perf-test.sh | 用于测试生产性能 |
kafka-dump-log.sh | 用于查看数据日志内容 |
kafka-preferred-replica-election.sh | 用于优先副本的选举 |
kafka-reassign-partitions.sh | 用于分区重分配 |
topic 管理操作:kafka-topics
查看 topic 列
bin/kafka-topics.sh --list --zookeeper doit01:2181
查看 topic 状态信息
(1)查看 topic 详细信息
bin/kafka-topics.sh --zookeeper doitedu01:2181 --describe --topic topic-doit29
从上面的结果中,可以看出,topic 的分区数量,以及每个分区的副本数量,Configs:compression.type=gzip 代表着我们topic的数据是压缩的。
创建 topic
./kafka-topics.sh --zookeeper doitedu01:2181 --create --replication-factor 3 --partitions 3 --topic test
参数解释:
--replication-factor 副本数量 --partitions 分区数量 --topic topic 名称
本方式,副本的存储位置是系统自动决定的;
手动指定分配方案:分区数,副本数,存储位置
bin/kafka-topics.sh --create --topic tpc-1 --zookeeper doitedu01:2181 --replica-assignment 0:1:3,1:2:6
该 topic,将有如下 partition: partition0 ,所在节点: broker0、broker1、broker3 partition1 ,所在节点: broker1、broker2、broker6 而顺序的不同有可能导致leader位置的不同
删除 topic
bin/kafka-topics.sh --zookeeper doitedu01:2181 --delete --topic test
删除 topic,server.properties 中需要一个参数处于启用状态: delete.topic.enable = true
使用 kafka-topics .sh 脚本删除主题的行为本质上只是在 ZooKeeper 中的 /admin/delete_topics 路径
下建一个与待删除主题同名的节点,以标记该主题为待删除的状态。然后由 Kafka 控制器异步完成。
增加分区数
kafka-topics.sh --zookeeper doit01:2181 --alter --topic doitedu-tpc2 --partitions 3
Kafka 只支持增加分区,不支持减少分区(增加分区,不涉及历史数据的合并,是一个轻量级的操作,而减少分区,必然涉及到历史数据的转移合并,代价太大) 原因是:减少分区,代价太大(数据的转移,日志段拼接合并)
如果真的需要实现此功能,则完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按
照既定的逻辑复制过去;
动态配置 topic 参数
通过管理命令,可以为已创建的 topic 增加、修改、删除 topic level 参数
添加/修改 指定 topic 的配置参数:
kafka-topics.sh --alter --topic tpc2 --config compression.type=gzip --zookeeper doit01:2181 # --config compression.type=gzip 修改或添加参数配置 # --add-config compression.type=gzip 添加参数配置 # --delete-config compression.type 删除配置参
topic 配置参数文档地址: https://kafka.apache.org/documentation/#topicconfigs
当然其核心的配置参数有蛮多的
- broker的配置参数
- consumer的配置参数
- producer的配置参数
- topic的配置参数
探究Kafka原理-2.Kafka基本命令实操(下):https://developer.aliyun.com/article/1413714