探究Kafka原理-2.Kafka基本命令实操(上)

本文涉及的产品
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
RDS Agent(兼容OpenClaw),2核4GB
简介: 探究Kafka原理-2.Kafka基本命令实操

安装部署


安装 zookeeper 集群


配置zookeeper集群的核心就是,以下每个zookeeper都要有

vi zoo.cfg
dataDir=/opt/apps/data/zkdata
server.1=doitedu01:2888:3888
server.2=doitedu02:2888:3888
server.3=doitedu03:2888:3888


安装 kafka 集群


核心操作如下:

vi server.properties
#为依次增长的:0、1、2、3、4,集群中唯一 id
broker.id=0
# 数据存储的⽬录
# 每一个broker都把自己所管理的数据存储在自己的本地磁盘
log.dirs=/opt/data/kafkadata
#底层存储的数据(日志)留存时长(默认 7 天)
log.retention.hours=168
#底层存储的数据(日志)留存量(默认 1G)
log.retention.bytes=1073741824
#指定 zk 集群地址
zookeeper.connect=doitedu01:2181,doitedu02:2181,doitedu03:2181

查看框架兼容的版本:查看依赖的pom / 要去查看对应框架的源码,然后进入查看


Kafka 运维监控


kafka 自身并没有集成监控管理系统,因此对 kafka 的监控管理比较不便,好在有大量的第三方监控管

理系统来使用,比如Kafka Eagle


Kafka-Eagle 简介



Kafka-Eagle 安装


官方文档地址:https://docs.kafka-eagle.org/


其中修改最核心的配置

######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
######################################
# 给kafka集群起一个名字
efak.zk.cluster.alias=cluster1
# 告诉它kafka集群所连接的zookeeper在哪里 
cluster1.zk.list=doit01:2181,doit02:2181,doit03:2181
######################################
# broker size online list
######################################
# 告诉集群broker服务器有多少台
cluster1.efak.broker.size=3
# 还需要一种数据库,可以针对性的选择
######################################
# kafka sqlite jdbc driver address 本地的嵌入式数据库
######################################
efak.driver=org.sqlite.JDBC
efak.url=jdbc:sqlite:/opt/data/kafka-eagle/db/ke.db
efak.username=root
efak.password=www.kafka-eagle.org
######################################
# kafka mysql jdbc driver address
######################################
#efak.driver=com.mysql.cj.jdbc.Driver
#efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior
=convertToNull
#efak.username=root
#efak.password=123456
如上,数据库选择的是 sqlite,需要手动创建所配置的 db 文件存放目录:/opt/data/kafka-eagle/db/
如果,数据库选择的是 mysql,则要放一个 mysql 的 jdbc 驱动 jar 包到 kafka-eagle 的 lib 目录中


启动 KafkaEagle



访问 web 界面


初始是一个快速看板,快速得到一些最要紧的信息。


启动,先启动zookeeper,再kafka


kafka默认对客户端暴漏的连接端口是 9092、zookeeper默认暴漏的端口是2181

显示信息比如说有多少分区、分布的均匀率、数据倾斜的比例、Leader倾斜的比例等参数。

Kafka-Eagle的意义:


1.对生产力的提升。

2.监控Kafka集群的手段。


命令行工具


概述


Kafka 中提供了许多命令行工具(位于$KAFKA HOME/bin 目录下)用于管理集群的变更。

命令 描述
kafka-console-consumer.sh 用于消费消息
kafka-console-producer.sh 用于生产消息
kafka-topics.sh 用于管理主题
kafka-server-stop.sh 用于关闭 Kafka 服务
kafka-server-start.sh 用于启动 Kafka 服务
kafka-configs.sh 用于配置管理
kafka-consumer-perf-test.sh 用于测试消费性能
kafka-producer-perf-test.sh 用于测试生产性能
kafka-dump-log.sh 用于查看数据日志内容
kafka-preferred-replica-election.sh 用于优先副本的选举
kafka-reassign-partitions.sh 用于分区重分配


topic 管理操作:kafka-topics


查看 topic 列


bin/kafka-topics.sh --list --zookeeper doit01:2181


查看 topic 状态信息


(1)查看 topic 详细信息

bin/kafka-topics.sh --zookeeper doitedu01:2181 --describe --topic topic-doit29

从上面的结果中,可以看出,topic 的分区数量,以及每个分区的副本数量,Configs:compression.type=gzip 代表着我们topic的数据是压缩的。


创建 topic


./kafka-topics.sh --zookeeper doitedu01:2181 --create --replication-factor 3
--partitions 3 --topic test

参数解释:

--replication-factor 副本数量
--partitions 分区数量
--topic topic 名称

本方式,副本的存储位置是系统自动决定的;


手动指定分配方案:分区数,副本数,存储位置

bin/kafka-topics.sh --create --topic tpc-1 --zookeeper doitedu01:2181
--replica-assignment 0:1:3,1:2:6
该 topic,将有如下 partition:
partition0 ,所在节点: broker0、broker1、broker3
partition1 ,所在节点: broker1、broker2、broker6
而顺序的不同有可能导致leader位置的不同


删除 topic


bin/kafka-topics.sh --zookeeper doitedu01:2181 --delete --topic test

删除 topic,server.properties 中需要一个参数处于启用状态: delete.topic.enable = true


使用 kafka-topics .sh 脚本删除主题的行为本质上只是在 ZooKeeper 中的 /admin/delete_topics 路径

下建一个与待删除主题同名的节点,以标记该主题为待删除的状态。然后由 Kafka 控制器异步完成。


增加分区数


kafka-topics.sh --zookeeper doit01:2181 --alter --topic doitedu-tpc2 --partitions 3
Kafka 只支持增加分区,不支持减少分区(增加分区,不涉及历史数据的合并,是一个轻量级的操作,而减少分区,必然涉及到历史数据的转移合并,代价太大)
原因是:减少分区,代价太大(数据的转移,日志段拼接合并)


如果真的需要实现此功能,则完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按

照既定的逻辑复制过去;


动态配置 topic 参数


通过管理命令,可以为已创建的 topic 增加、修改、删除 topic level 参数


添加/修改 指定 topic 的配置参数:

kafka-topics.sh --alter --topic tpc2 --config compression.type=gzip --zookeeper doit01:2181
# --config compression.type=gzip 修改或添加参数配置
# --add-config compression.type=gzip 添加参数配置
# --delete-config compression.type 删除配置参

topic 配置参数文档地址: https://kafka.apache.org/documentation/#topicconfigs


当然其核心的配置参数有蛮多的


  • broker的配置参数
  • consumer的配置参数
  • producer的配置参数
  • topic的配置参数


探究Kafka原理-2.Kafka基本命令实操(下):https://developer.aliyun.com/article/1413714

目录
相关文章
|
10月前
|
消息中间件 Java Kafka
Java 事件驱动架构设计实战与 Kafka 生态系统组件实操全流程指南
本指南详解Java事件驱动架构与Kafka生态实操,涵盖环境搭建、事件模型定义、生产者与消费者实现、事件测试及高级特性,助你快速构建高可扩展分布式系统。
474 7
|
消息中间件 存储 缓存
大厂面试高频:Kafka 工作原理 ( 详细图解 )
本文详细解析了 Kafka 的核心架构和实现原理,消息中间件是亿级互联网架构的基石,大厂面试高频,非常重要,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:Kafka 工作原理 ( 详细图解 )
|
消息中间件 存储 缓存
一文带你秒懂 Kafka工作原理!
Apache Kafka 是一个高吞吐量、低延迟的分布式消息系统,广泛应用于实时数据处理、日志收集和消息队列等领域。它最初由LinkedIn开发,2011年成为Apache项目。Kafka支持消息的发布与订阅,具备高效的消息持久化能力,适用于TB级数据的处理。
|
消息中间件 Kafka API
原理剖析| Kafka Exactly Once 语义实现原理:幂等性与事务消息
原理剖析| Kafka Exactly Once 语义实现原理:幂等性与事务消息
426 0
|
消息中间件 缓存 大数据
大数据-57 Kafka 高级特性 消息发送相关01-基本流程与原理剖析
大数据-57 Kafka 高级特性 消息发送相关01-基本流程与原理剖析
268 3
|
消息中间件 缓存 分布式计算
大数据-59 Kafka 高级特性 消息发送03-自定义拦截器、整体原理剖析
大数据-59 Kafka 高级特性 消息发送03-自定义拦截器、整体原理剖析
254 2
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
1292 0
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
677 1