Kafka常用命令大全及kafka-console-consumer.sh及参数说明

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 该文章汇总了Kafka常用命令,包括集群管理、Topic操作、生产者与消费者的命令行工具使用方法等,适用于Kafka的日常运维和开发需求。

kafka常用命令

一、脚本简介

1、kafka-acls.sh #配置,查看kafka集群鉴权信息
2、kafka-configs.sh #查看,修改kafka配置
3、kafka-console-consumer.sh #消费命令
4、kafka-console-producer.sh #生产命令
5、kafka-consumer-groups.sh #查看消费者组,重置消费位点等
6、kafka-consumer-perf-test.sh #kafka自带消费性能测试命令
7、kafka-mirror-maker.sh #kafka集群间同步命令
8、kafka-preferred-replica-election.sh #重新选举topic分区leader
9、kafka-producer-perf-test.sh #kafka自带生产性能测试命令
10、kafka-reassign-partitions.sh #kafka数据重平衡命令
11、kafka-run-class.sh #kafka执行脚本
12、kafka-server-start.sh #进程启动
13、kafka-server-stop.sh #进程停止
14、kafka-topics.sh #查询topic状态,新建,删除,扩容

二、常用命令大全

###1、集群相关命令###
bin/zookeeper-server-start.sh config/zookeeper.properties & # 启动zookeeper
bin/zookeeper-server-stop.sh # 停止zookeeper

bin/zkServer.sh start
bin/zkServer.sh stop

bin/kafka-server-start.sh $path/server.properties # 前台启动broker
bin/kafka-server-start.sh -daemon $path/server.properties # 后台启动broker
bin/kafka-server-stop.sh # 关闭broker

###2、topic###
bin/kafka-topics.sh --create --zookeeper $zkhost --replication-factor $replCount --partitions $partitionCount --topic $topicName #指定副本数、pritition数创建topic
bin/kafka-topics.sh --zookeeper $zkhost --alter --topic $topicName --partitions $partitionCount #扩容分区、只能扩,不能缩(涉及数据迁移、合并所以不支持))
bin/kafka-topics.sh --zookeeper $zkhost --alter --config.compression.type=gzip --topic $topicName  #动态配置topic参数
    --config.compression.type=gzip #修改或添加参数配置
    --add-config.compression.type=gzip #添加参数配置
    --delete-config.compression.type=gzip #删除参数配置
bin/kafka-topics.sh --zookeeper $zkhost --delete --topic $topicName #删除topic、需关注配置文件delete.topic.enble=true,否则无法真正删除
bin/kafka-topics.sh --zookeeper $zkhost --list #查看topic列表
bin/kafka-topics.sh --zookeeper $zkhost --describe #查看所有topic详细信息
    --topic $topicname   #指定topic查看详细信息

####################topic详细信息####################
Topic:test    PartitionCount:3    ReplicationFactor:3    Configs:
    Topic: test    Partition: 0    Leader: 0    Replicas: 0,1,2    Isr: 0,2,1
    Topic: test    Partition: 1    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: test    Partition: 2    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    #Topic:topic名称 
    #prititionCount:partition数量
    #replicationFactor:副本数量
    #configs:其他配置
    #Partition:prititonId
    #Leader:leader节点
    #Replicas:挂载节点
    #ISR:存活节点
######################################################


###3、消费者组###
bin/kafka-consumer-groups.sh --bootstrap-server $nodes --list #消费者列表查询(待验证)
bin/kafka-consumer-groups.sh --bootstrap-server $nodes --describe --group $groupName #显示某个消费组的消费详情(待验证)
bin/kafka-consumer-groups.sh --bootstrap-server $nodes --group $groupname --reset-offsets --all-topics --to-earliest --execute # 重设消费者组位移(待验证) 
    --to-latest --execute # 最新处    
    --to-offset 2000 --execute # 某个位置
    --to-datetime 2019-09-15T00:00:00.000 # 调整到某个时间之后的最早位移
bin/kafka-consumer-groups.sh --zookeeper $zkhost --delete --group $groupname # 删除消费者组
#####消费组的消费详情#######
TOPIC        PARTITION    CURRENT-OFFSET       LOG-END-OFFSET    LAG         CONSUMER-ID    HOST    CLIENT-ID
topic名字    分区id        当前已消费的条数    总条数        未消费的条数    消费id       主机ip    客户端id
############################


###4、生产、消费者###
bin/kafka-console-producer.sh --broker-list $nodes --topic $topicName # 使用生产者
bin/kafka-console-consumer.sh --bootstrap-server $nodes --topic $topicName --from-beginning # 使用消费者
    --from-beginning #为可选参数,表示要从头消费消息
    --from-earliest #从最早的消息开始消费(待验证)
    --from-latest #从最新的消息开始消费
    --指定offset #从指定的位置开始消费
bin/kafka-console-consumer.sh --bootstrap-server $nodes --topic $topicName --from-beginning --consumer-property group.id=$groupname # 指定groupid
bin/kafka-console-consumer.sh --bootstrap-server $nodes --topic $topicName --from-beginning --partition $partitionId # 指定分区
bin/kafka-console-consumer.sh --bootstrap-server $nodes --topic $topicName --new-consumer --from-beginning --consumer.config config/consumer.properties # 新消费者
bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234  --max-messages 10 # 高级点的用法(不常使用,未验证)


###5、切换leader###
./bin/kafka-preferred-replica-election.sh --zookeeper 10.161.68.123:2181,10.161.68.124:2181,10.161.68.125:2181  #切换leader,此处指定一个node


###6、压测命令###
bin/kafka-producer-perf-test.sh --topic $topicName --num-records 100 --record-size 1 --throughput 100  --producer-props bootstrap.servers=$nodes #(未验证)


###7、迁移分区###
#可先修改保存时间避免大量数据同步#
bin/kafka-configs.sh --zookeeper=$zkhost --alter --entity-type topics --add-config retention.ms=86400000 --entity-name $topicName


#创建json,json格式如下#
{
   "version":1, "partitions":[
{
   "topic":"__consumer_offsets","partition":0,"replicas":[0,1]},
{
   "topic":"__consumer_offsets","partition":1,"replicas":[0,1]},
{
   "topic":"__consumer_offsets","partition":2,"replicas":[0,1]},
{
   "topic":"__consumer_offsets","partition":3,"replicas":[0,1]},
{
   "topic":"__consumer_offsets","partition":4,"replicas":[0,1]},
{
   "topic":"__consumer_offsets","partition":5,"replicas":[0,1]},
{
   "topic":"__consumer_offsets","partition":6,"replicas":[0,1]}
]}
#执行#
bin/kafka-reassign-partitions.sh --zookeeper $zkhost --reassignment-json-file $jsonFile --execute
#验证#
bin/kafka-reassign-partitions.sh --zookeeper $zkhost --reassignment-json-file $jsonFile --verify


###8、动态参数配置###
bin/kafka-configs.sh --zookeeper=$zkhost --alter --add-config '$具体参数' --entity-type $entitytype --entity-name $entityname #只可修改支持动态配置的参数,其他的报错
    #entityType常用的包括以下几个方面,根据entityType的不同前面选择zk 或者 broker
    topics 、 users、 brokers、 concumers、 groups

kafka-console-consumer.sh 详解

概述

kafka-console-consumer.sh 脚本是一个简易的消费者控制台。该 shell 脚本的功能通过调用 kafka.tools 包下的 ConsoleConsumer 类,并将提供的命令行参数全部传给该类实现。

注意:Kafka 从 2.2 版本开始将 kafka-topic.sh 脚本中的 −−zookeeper 参数标注为 “过时”,推荐使用 −−bootstrap-server 参数。若读者依旧使用的是 2.1 及以下版本,请将下述的 --bootstrap-server 参数及其值手动替换为 --zookeeper zk1:2181,zk2:2181,zk:2181。一定要注意两者参数值所指向的集群地址是不同的。

消息消费

表示从 latest 位移位置开始消费该主题的所有分区消息,即仅消费正在写入的消息。

bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName

从开始位置消费

bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning --topic topicName

表示从指定主题中有效的起始位移位置开始消费所有分区的消息。
显示key消费

bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --property print.key=true --topic topicName

消费出的消息结果将打印出消息体的 key 和 value。

参数说明

若还需要为你的消息添加其他属性,请参考下述列表。

参数 值类型 说明 有效值
–topic string 被消费的topic
–whitelist string 正则表达式,指定要包含以供使用的主题的白名单
–partition integer 指定分区除非指定’–offset’,否则从分区结束(latest)开始消费
–offset string 执行消费的起始offset位置默认值:latest latest earliest
–consumer-property string 将用户定义的属性以key=value的形式传递给使用者
–consumer.config string 消费者配置属性文件请注意,[consumer-property]优先于此配置
–formatter string 用于格式化kafka消息以供显示的类的名称默认值:kafka.tools.DefaultMessageFormatter
–property string 初始化消息格式化程序的属性
–from-beginning 从存在的最早消息开始,而不是从最新消息开始
–max-messages integer 消费的最大数据量,若不指定,则持续消费下去
–timeout-ms integer 在指定时间间隔内没有消息可用时退出
–skip-message-on-error 如果处理消息时出错,请跳过它而不是暂停
–bootstrap-server string 必需(除非使用旧版本的消费者),要连接的服务器
–key-deserializer string
–value-deserializer string
–enable-systest-events string 除记录消费的消息外,还记录消费者的生命周期(用于系统测试)
–isolation-level string 设置为read_committed以过滤掉未提交的事务性消息,设置为read_uncommitted以读取所有消息,默认值:read_uncommitted
–blacklist string 要从消费中排除的主题黑名单
–csv-reporter-enabled 如果设置,将启用csv metrics报告器
–delete-consumer-offsets string 如果指定,则启动时删除zookeeper中的消费者信息
–metrics-dir 输出csv度量值,需与[csv-reporter-enable]配合使用
–zookeeper string 必需(仅当使用旧的使用者时)连接zookeeper的字符串。可以给出多个URL以允许故障转移
=
相关文章
|
7月前
|
消息中间件 缓存 关系型数据库
Flink CDC产品常见问题之upsert-kafka增加参数报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
消息中间件 SQL 分布式计算
大数据-62 Kafka 高级特性 主题 kafka-topics相关操作参数 KafkaAdminClient 偏移量管理
大数据-62 Kafka 高级特性 主题 kafka-topics相关操作参数 KafkaAdminClient 偏移量管理
41 6
|
2月前
|
消息中间件 存储 负载均衡
大数据-60 Kafka 高级特性 消息消费01-消费组图例 心跳机制图例 附参数详解与建议值
大数据-60 Kafka 高级特性 消息消费01-消费组图例 心跳机制图例 附参数详解与建议值
68 3
|
7月前
|
消息中间件 关系型数据库 Kafka
实时计算 Flink版产品使用合集之想要加快消费 Kafka 数据的速度,该怎么配置参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
185 2
|
3月前
|
消息中间件 JSON 安全
Kafka常用命令归纳
本文档详细介绍了Kafka 2.2及以上版本中Topic的操作命令,包括创建、查看、修改及删除Topic,以及动态调整主题参数和限速。此外,还涵盖了数据生产和消费的相关命令与性能测试方法,并对内部Topic(如`__consumer_offsets`和`__transaction_state`)的操作进行了说明。最后,提供了常见错误处理方案及Kafka推荐配置,帮助用户更好地管理和优化Kafka集群。
|
7月前
|
消息中间件 Java Kafka
Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
【2月更文挑战第19天】Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
298 1
|
4月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
108 7
|
5月前
|
消息中间件 Java Kafka
kafka Linux环境搭建安装及命令创建队列生产消费消息
kafka Linux环境搭建安装及命令创建队列生产消费消息
115 4
|
5月前
|
消息中间件 存储 资源调度
实时计算 Flink版产品使用问题之在消费Kafka的Avro消息,如何配置FlinkKafka消费者的相关参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
消息中间件 存储 运维
Kafka重要配置参数全面解读(重要)
Kafka重要配置参数全面解读(重要)
236 2
下一篇
DataWorks