【kafka运维】Topic的生产和消费运维脚本

简介: 1.Topic的发送kafka-console-producer.sh1.1 生产无key消息## 生产者bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties1.2 生产有key消息加上属性--property parse.key=true

作者石臻臻, CSDN博客之星Top5Kafka Contributornacos Contributor华为云 MVP ,腾讯云TVP, 滴滴Kafka技术专家KnowStreaming PMC)


KnowStreaming  是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,带你一起你参与开源!

1.Topic的发送kafka-console-producer.sh

1.1 生产无key消息

## 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties

1.2 生产有key消息加上属性--property parse.key=true

## 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties  --property parse.key=true



默认消息key与消息value间使用“Tab键”进行分隔,所以消息key以及value中切勿使用转义字符(\t)


可选参数

参数 值类型 说明 有效值
--bootstrap-server String 要连接的服务器必需(除非指定--broker-list) 如:host1:prot1,host2:prot2
--topic String (必需)接收消息的主题名称
--batch-size Integer 单个批处理中发送的消息数 200(默认值)
--compression-codec String 压缩编解码器 none、gzip(默认值)snappy、lz4、zstd
--max-block-ms Long 在发送请求期间,生产者将阻止的最长时间 60000(默认值)
--max-memory-bytes Long 生产者用来缓冲等待发送到服务器的总内存 33554432(默认值)
--max-partition-memory-bytes Long 为分区分配的缓冲区大小 16384
--message-send-max-retries Integer 最大的重试发送次数 3
--metadata-expiry-ms Long 强制更新元数据的时间阈值(ms) 300000
--producer-property String 将自定义属性传递给生成器的机制 如:key=value
--producer.config String 生产者配置属性文件[--producer-property]优先于此配置 配置文件完整路径
--property String 自定义消息读取器 parse.key=true/false key.separator=<key.separator>ignore.error=true/false
--request-required-acks String 生产者请求的确认方式 0、1(默认值)、all
--request-timeout-ms Integer 生产者请求的确认超时时间 1500(默认值)
--retry-backoff-ms Integer 生产者重试前,刷新元数据的等待时间阈值 100(默认值)
--socket-buffer-size Integer TCP接收缓冲大小 102400(默认值)
--timeout Integer 消息排队异步等待处理的时间阈值 1000(默认值)
--sync 同步发送消息
--version 显示 Kafka 版本 不配合其他参数时,显示为本地Kafka版本
--help 打印帮助信息

2. Topic的消费kafka-console-consumer.sh

1. 新客户端从头消费--from-beginning (注意这里是新客户端,如果之前已经消费过了是不会从头消费的)下面没有指定客户端名称,所以每次执行都是新客户端都会从头消费

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test  --from-beginning

2. 正则表达式匹配topic进行消费--whitelist 消费所有的topic

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --whitelist '.*'

消费所有的topic,并且还从头消费

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --whitelist '.*'   --from-beginning

3.显示key进行消费--property print.key=true

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test  --property print.key=true

4. 指定分区消费--partition 指定起始偏移量消费--offset

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 100

5. 给客户端命名--group

注意给客户端命名之后,如果之前有过消费,那么--from-beginning 就不会再从头消费了

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test  --group test-group

6. 添加客户端属性--consumer-property

这个参数也可以给客户端添加属性,但是注意 不能多个地方配置同一个属性,他们是互斥的;比如在下面的基础上还加上属性--group test-group 那肯定不行

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test  --consumer-property group.id=test-consumer-group

7. 添加客户端属性--consumer.config

--consumer-property 一样的性质,都是添加客户端的属性,不过这里是指定一个文件,把属性写在文件里面, --consumer-property 的优先级大于 --consumer.config

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test   --consumer.config config/consumer.properties


参数 描述 例子
--group 指定消费者所属组的ID
--topic 被消费的topic
--partition 指定分区 ;除非指定–offset,否则从分区结束(latest)开始消费 --partition 0
--offset 执行消费的起始offset位置 ;默认值: latest; /latest /earliest /偏移量 --offset 10
--whitelist 正则表达式匹配topic;--topic就不用指定了; 匹配到的所有topic都会消费; 当然用了这个参数,--partition--offset等就不能使用了
--consumer-property 将用户定义的属性以key=value的形式传递给使用者 --consumer-property group.id=test-consumer-group
--consumer.config 消费者配置属性文件请注意,[consumer-property]优先于此配置 --consumer.config config/consumer.properties
--property 初始化消息格式化程序的属性 print.timestamp=true,false 、print.key=true,false 、print.value=true,false 、key.separator=<key.separator> 、line.separator=<line.separator>、key.deserializer=<key.deserializer>、value.deserializer=<value.deserializer>
--from-beginning 从存在的最早消息开始,而不是从最新消息开始,注意如果配置了客户端名称并且之前消费过,那就不会从头消费了
--max-messages 消费的最大数据量,若不指定,则持续消费下去 --max-messages 100
--skip-message-on-error 如果处理消息时出错,请跳过它而不是暂停
--isolation-level 设置为read_committed以过滤掉未提交的事务性消息,设置为read_uncommitted以读取所有消息,默认值:read_uncommitted
--formatter kafka.tools.DefaultMessageFormatter、kafka.tools.LoggingMessageFormatter、kafka.tools.NoOpMessageFormatter、kafka.tools.ChecksumMessageFormatter

3. 持续批量推送消息kafka-verifiable-producer.sh

单次发送100条消息--max-messages 100

一共要推送多少条,默认为-1,-1表示一直推送到进程关闭位置

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server  localhost:9092 --max-messages 100

每秒发送最大吞吐量不超过消息 --throughput 100

推送消息时的吞吐量,单位messages/sec。默认为-1,表示没有限制

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server   localhost:9092  --throughput 100

发送的消息体带前缀--value-prefix

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server   localhost:9092   --value-prefix 666

注意 --value-prefix 666必须是整数,发送的消息体的格式是加上一个 点号.  例如: 666.

其他参数: --producer.config CONFIG_FILE 指定producer的配置文件--acks ACKS            每次推送消息的ack值,默认是-1

4. 持续批量拉取消息kafka-verifiable-consumer

持续消费

sh bin/kafka-verifiable-consumer.sh --group-id test_consumer  --bootstrap-server  localhost:9092   --topic test_create_topic4

单次最大消费10条消息--max-messages 10

sh bin/kafka-verifiable-consumer.sh --group-id test_consumer  --bootstrap-server  localhost:9092  --topic test_create_topic4 --max-messages 10


相关可选参数

参数 描述 例子
--bootstrap-server 指定kafka服务 指定连接到的kafka服务; --bootstrap-server localhost:9092
--topic 指定消费的topic
--group-id 消费者id;不指定的话每次都是新的组id
group-instance-id 消费组实例ID,唯一值
--max-messages 单次最大消费的消息数量
--enable-autocommit 是否开启offset自动提交;默认为false
--reset-policy 当以前没有消费记录时,选择要拉取offset的策略,可以是earliest, latest,none。默认是earliest
--assignment-strategy consumer分配分区策略,默认是org.apache.kafka.clients.consumer.RangeAssignor
--consumer.config 指定consumer的配置文件

5More

Kafka专栏持续更新中...(源码、原理、实战、运维、视频、面试视频)


【kafka运维】Kafka全网最全最详细运维命令合集(精品强烈建议收藏!!!)_石臻臻的杂货铺-CSDN博客

【kafka实战】分区重分配可能出现的问题和排查问题思路(生产环境实战,干货!!!非常干!!!建议收藏)

【kafka异常】kafka 常见异常处理方案(持续更新! 建议收藏)

【kafka运维】分区从分配、数据迁移、副本扩缩容 (附教学视频)

【kafka源码】ReassignPartitionsCommand源码分析(副本扩缩、数据迁移、副本重分配、副本跨路径迁移

【kafka】点击更多....

相关文章
|
9天前
|
运维 Devops
自动化运维:从脚本到DevOps的进化之旅
在数字化时代,自动化运维不仅是提高生产效率的关键,更是企业竞争力的象征。本文将带领读者穿越自动化运维的发展历程,从最初的脚本编写到现代DevOps文化的形成,揭示这一演变如何重塑IT行业的工作模式。通过具体案例,我们将展示自动化工具和实践如何简化复杂任务,优化流程,并促进团队协作。你将发现,自动化运维不仅关乎技术的进步,更体现了人、流程和技术三者之间协同增效的深层逻辑。
|
23天前
|
运维 监控 Python
自动化运维:使用Python脚本简化日常任务
【10月更文挑战第36天】在数字化时代,运维工作的效率和准确性成为企业竞争力的关键。本文将介绍如何通过编写Python脚本来自动化日常的运维任务,不仅提高工作效率,还能降低人为错误的风险。从基础的文件操作到进阶的网络管理,我们将一步步展示Python在自动化运维中的应用,并分享实用的代码示例,帮助读者快速掌握自动化运维的核心技能。
45 3
|
28天前
|
缓存 运维 NoSQL
python常见运维脚本_Python运维常用脚本
python常见运维脚本_Python运维常用脚本
28 3
|
29天前
|
运维 监控 应用服务中间件
自动化运维:如何利用Python脚本提升工作效率
【10月更文挑战第30天】在快节奏的IT行业中,自动化运维已成为提升工作效率和减少人为错误的关键技术。本文将介绍如何使用Python编写简单的自动化脚本,以实现日常运维任务的自动化。通过实际案例,我们将展示如何用Python脚本简化服务器管理、批量配置更新以及监控系统性能等任务。文章不仅提供代码示例,还将深入探讨自动化运维背后的理念,帮助读者理解并应用这一技术来优化他们的工作流程。
|
1月前
|
运维 监控 Linux
自动化运维:如何利用Python脚本优化日常任务##
【10月更文挑战第29天】在现代IT运维中,自动化已成为提升效率、减少人为错误的关键技术。本文将介绍如何通过Python脚本来简化和自动化日常的运维任务,从而让运维人员能够专注于更高层次的工作。从备份管理到系统监控,再到日志分析,我们将一步步展示如何编写实用的Python脚本来处理这些任务。 ##
|
1月前
|
运维 Prometheus 监控
自动化运维之路:从脚本到DevOps
【10月更文挑战第25天】在数字化时代的浪潮中,运维不再是简单的服务器管理,而是成为了企业竞争力的核心。本文将带你走进自动化运维的世界,探索如何通过技术手段提升效率和稳定性,以及实现快速响应市场的能力。我们将一起学习如何从基础的脚本编写进化到全面的DevOps实践,包括工具的选择、流程的优化以及文化的建设。无论你是运维新手还是资深专家,这篇文章都将为你提供有价值的见解和实用的技巧。
31 3
|
2月前
|
人工智能 运维 Devops
自动化运维之路:从脚本到DevOps的转变
【10月更文挑战第7天】在这篇文章中,我们将一起探索自动化运维的演变历程,从最初的简单脚本到现代的DevOps实践。我们将深入理解自动化如何改变了运维工作的本质,并讨论实现这一转变的关键技术和策略。文章将不包含代码示例,而是聚焦于理念、工具和方法论的介绍,旨在为读者提供一个全面的自动化运维框架视图。
|
2月前
|
运维 Java Linux
【运维基础知识】Linux服务器下手写启停Java程序脚本start.sh stop.sh及详细说明
### 启动Java程序脚本 `start.sh` 此脚本用于启动一个Java程序,设置JVM字符集为GBK,最大堆内存为3000M,并将程序的日志输出到`output.log`文件中,同时在后台运行。 ### 停止Java程序脚本 `stop.sh` 此脚本用于停止指定名称的服务(如`QuoteServer`),通过查找并终止该服务的Java进程,输出操作结果以确认是否成功。
49 1
|
2月前
|
人工智能 运维 监控
自动化运维:从脚本到工具的演变之路
【10月更文挑战第8天】在数字化时代的浪潮中,运维不再是简单的硬件维护,它已经演变成一场关于效率、稳定性和创新的技术革命。本文将带您领略自动化运维的魅力,从最初的脚本编写到现代复杂的自动化工具,我们将一探究竟,看看这些工具如何帮助运维人员简化日常任务,提升工作效率,并最终推动业务发展。
|
2月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
70 4

热门文章

最新文章