3.【kafka运维】Topic的生产和消费运维脚本

简介: 3.【kafka运维】Topic的生产和消费运维脚本

文章目录

1.Topic的发送kafka-console-producer.sh

2. Topic的消费kafka-console-consumer.sh

3. 持续批量推送消息kafka-verifiable-producer.sh

4. 持续批量拉取消息kafka-verifiable-consumer

More

日常运维 、问题排查 怎么能够少了滴滴开源的

滴滴开源LogiKM一站式Kafka监控与管控平台


1.Topic的发送kafka-console-producer.sh

1.1 生产无key消息

## 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties

1.2 生产有key消息

加上属性--property parse.key=true

## 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties  --property parse.key=true


默认消息key与消息value间使用“Tab键”进行分隔,所以消息key以及value中切勿使用转义字符(\t)


可选参数


参数 值类型 说明 有效值

–bootstrap-server String 要连接的服务器必需(除非指定–broker-list) 如:host1:prot1,host2:prot2

–topic String (必需)接收消息的主题名称

–batch-size Integer 单个批处理中发送的消息数 200(默认值)

–compression-codec String 压缩编解码器 none、gzip(默认值)snappy、lz4、zstd

–max-block-ms Long 在发送请求期间,生产者将阻止的最长时间 60000(默认值)

–max-memory-bytes Long 生产者用来缓冲等待发送到服务器的总内存 33554432(默认值)

–max-partition-memory-bytes Long 为分区分配的缓冲区大小 16384

–message-send-max-retries Integer 最大的重试发送次数 3

–metadata-expiry-ms Long 强制更新元数据的时间阈值(ms) 300000

–producer-property String 将自定义属性传递给生成器的机制 如:key=value

–producer.config String 生产者配置属性文件[–producer-property]优先于此配置 配置文件完整路径

–property String 自定义消息读取器 parse.key=true/false key.separator=<key.separator>ignore.error=true/false

–request-required-acks String 生产者请求的确认方式 0、1(默认值)、all

–request-timeout-ms Integer 生产者请求的确认超时时间 1500(默认值)

–retry-backoff-ms Integer 生产者重试前,刷新元数据的等待时间阈值 100(默认值)

–socket-buffer-size Integer TCP接收缓冲大小 102400(默认值)

–timeout Integer 消息排队异步等待处理的时间阈值 1000(默认值)

–sync 同步发送消息  

–version 显示 Kafka 版本 不配合其他参数时,显示为本地Kafka版本

–help 打印帮助信息  

2. Topic的消费kafka-console-consumer.sh

1. 新客户端从头消费--from-beginning (注意这里是新客户端,如果之前已经消费过了是不会从头消费的)

下面没有指定客户端名称,所以每次执行都是新客户端都会从头消费


sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning


2. 正则表达式匹配topic进行消费--whitelist

消费所有的topic


sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’


消费所有的topic,并且还从头消费


sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’ --from-beginning


3.显示key进行消费--property print.key=true


sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --property print.key=true


4. 指定分区消费--partition 指定起始偏移量消费--offset


sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 100


5. 给客户端命名--group


注意给客户端命名之后,如果之前有过消费,那么--from-beginning就不会再从头消费了


sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test-group


6. 添加客户端属性--consumer-property


这个参数也可以给客户端添加属性,但是注意 不能多个地方配置同一个属性,他们是互斥的;比如在下面的基础上还加上属性--group test-group 那肯定不行


sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=test-consumer-group


7. 添加客户端属性--consumer.config


跟--consumer-property 一样的性质,都是添加客户端的属性,不过这里是指定一个文件,把属性写在文件里面, --consumer-property 的优先级大于 --consumer.config


sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties


参数 描述 例子

--group 指定消费者所属组的ID

--topic 被消费的topic

--partition 指定分区 ;除非指定–offset,否则从分区结束(latest)开始消费 --partition 0

--offset 执行消费的起始offset位置 ;默认值: latest; /latest /earliest /偏移量 --offset 10

--whitelist 正则表达式匹配topic;--topic就不用指定了; 匹配到的所有topic都会消费; 当然用了这个参数,--partition --offset等就不能使用了

--consumer-property 将用户定义的属性以key=value的形式传递给使用者 --consumer-propertygroup.id=test-consumer-group

--consumer.config 消费者配置属性文件请注意,[consumer-property]优先于此配置 --consumer.config config/consumer.properties

--property 初始化消息格式化程序的属性 print.timestamp=true,false 、print.key=true,false 、print.value=true,false 、key.separator=<key.separator> 、line.separator=<line.separator>、key.deserializer=<key.deserializer>、value.deserializer=<value.deserializer>

--from-beginning 从存在的最早消息开始,而不是从最新消息开始,注意如果配置了客户端名称并且之前消费过,那就不会从头消费了

--max-messages 消费的最大数据量,若不指定,则持续消费下去 --max-messages 100

--skip-message-on-error 如果处理消息时出错,请跳过它而不是暂停

--isolation-level 设置为read_committed以过滤掉未提交的事务性消息,设置为read_uncommitted以读取所有消息,默认值:read_uncommitted

--formatter kafka.tools.DefaultMessageFormatter、kafka.tools.LoggingMessageFormatter、kafka.tools.NoOpMessageFormatter、kafka.tools.ChecksumMessageFormatter

3. 持续批量推送消息kafka-verifiable-producer.sh

单次发送100条消息--max-messages 100


一共要推送多少条,默认为-1,-1表示一直推送到进程关闭位置


sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --max-messages 100


每秒发送最大吞吐量不超过消息 --throughput 100


推送消息时的吞吐量,单位messages/sec。默认为-1,表示没有限制


sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --throughput 100


发送的消息体带前缀--value-prefix


sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --value-prefix 666


注意--value-prefix 666必须是整数,发送的消息体的格式是加上一个 点号. 例如: 666.


其他参数:

--producer.config CONFIG_FILE 指定producer的配置文件

--acks ACKS 每次推送消息的ack值,默认是-1


4. 持续批量拉取消息kafka-verifiable-consumer

持续消费


sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4


单次最大消费10条消息--max-messages 10


sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4 --max-messages 10


相关可选参数


参数 描述 例子

--bootstrap-server 指定kafka服务 指定连接到的kafka服务; –bootstrap-server localhost:9092

--topic 指定消费的topic

--group-id 消费者id;不指定的话每次都是新的组id

group-instance-id 消费组实例ID,唯一值

--max-messages 单次最大消费的消息数量

--enable-autocommit 是否开启offset自动提交;默认为false

--reset-policy 当以前没有消费记录时,选择要拉取offset的策略,可以是earliest, latest,none。默认是earliest

--assignment-strategy consumer分配分区策略,默认是org.apache.kafka.clients.consumer.RangeAssignor

--consumer.config 指定consumer的配置文件

More

Kafka专栏持续更新中…(源码、原理、实战、运维、视频、面试视频)


目录
相关文章
|
6月前
|
存储 人工智能 运维
别再靠脚本“救火”了!让智能数据治理接管你的运维世界
别再靠脚本“救火”了!让智能数据治理接管你的运维世界
346 14
消息中间件 Java Kafka
595 0
|
9月前
|
机器学习/深度学习 消息中间件 人工智能
别只会写脚本了!看看机器学习是怎么帮运维“摸鱼”的
别只会写脚本了!看看机器学习是怎么帮运维“摸鱼”的
233 13
|
消息中间件 存储 NoSQL
RocketMQ实战—6.生产优化及运维方案
本文围绕RocketMQ集群的使用与优化,详细探讨了六个关键问题。首先,介绍了如何通过ACL配置实现RocketMQ集群的权限控制,防止不同团队间误用Topic。其次,讲解了消息轨迹功能的开启与追踪流程,帮助定位和排查问题。接着,分析了百万消息积压的处理方法,包括直接丢弃、扩容消费者或通过新Topic间接扩容等策略。此外,提出了针对RocketMQ集群崩溃的金融级高可用方案,确保消息不丢失。同时,讨论了为RocketMQ增加限流功能的重要性及实现方式,以提升系统稳定性。最后,分享了从Kafka迁移到RocketMQ的双写双读方案,确保数据一致性与平稳过渡。
|
消息中间件 人工智能 安全
秒级灾备恢复:Kafka 2025 AI自愈集群下载及跨云Topic迁移终极教程
Apache Kafka 2025作为企业级实时数据中枢,实现五大革新:量子安全传输(CRYSTALS-Kyber抗量子加密算法)、联邦学习总线(支持TensorFlow Federated/Horizontal FL框架)、AI自愈集群(MTTR缩短至30秒内)、多模态数据处理(原生支持视频流、3D点云等)和跨云弹性扩展(AWS/GCP/Azure间自动迁移)。平台采用混合云基础设施矩阵与软件依赖拓扑设计,提供智能部署架构。安装流程涵盖抗量子安装包获取、量子密钥配置及联邦学习总线设置。
|
消息中间件 Java Kafka
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
本文介绍了Kafka集群的搭建过程,涵盖从虚拟机安装到集群测试的详细步骤。首先规划了集群架构,包括三台Kafka Broker节点,并说明了分布式环境下的服务进程配置。接着,通过VMware导入模板机并克隆出三台虚拟机(kafka-broker1、kafka-broker2、kafka-broker3),分别设置IP地址和主机名。随后,依次安装JDK、ZooKeeper和Kafka,并配置相应的环境变量与启动脚本,确保各组件能正常运行。最后,通过编写启停脚本简化集群的操作流程,并对集群进行测试,验证其功能完整性。整个过程强调了自动化脚本的应用,提高了部署效率。
3391 1
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
|
运维 Kubernetes Devops
自动化运维:从脚本到工具的演进之旅
在数字化浪潮中,自动化运维成为提升效率、保障系统稳定的关键。本文将探索自动化运维的发展脉络,从基础的Shell脚本编写到复杂的自动化工具应用,揭示这一技术变革如何重塑IT运维领域。我们将通过实际案例,展示自动化运维在简化工作流程、提高响应速度和降低人为错误中的重要作用。无论你是初学者还是资深专家,这篇文章都将为你提供宝贵的洞见和实用的技巧。
|
运维 Devops
自动化运维:从脚本到DevOps的进化之旅
在数字化时代,自动化运维不仅是提高生产效率的关键,更是企业竞争力的象征。本文将带领读者穿越自动化运维的发展历程,从最初的脚本编写到现代DevOps文化的形成,揭示这一演变如何重塑IT行业的工作模式。通过具体案例,我们将展示自动化工具和实践如何简化复杂任务,优化流程,并促进团队协作。你将发现,自动化运维不仅关乎技术的进步,更体现了人、流程和技术三者之间协同增效的深层逻辑。
|
机器学习/深度学习 人工智能 运维
自动化运维之路:从脚本到工具的演进
在IT运维领域,效率和准确性是衡量工作成效的关键指标。随着技术的发展,自动化运维逐渐成为提升这两个指标的重要手段。本文将带领读者了解自动化运维的演变历程,从最初的简单脚本编写到现今复杂的自动化工具应用,展示如何通过技术提升运维效率。文章不仅介绍理论和实践案例,还提供了代码示例,帮助读者理解自动化运维的实际应用场景。
|
运维 监控 网络安全
自动化运维的崛起:如何利用Python脚本简化日常任务
【10月更文挑战第43天】在数字化时代的浪潮中,运维工作已从繁琐的手工操作转变为高效的自动化流程。本文将引导您了解如何运用Python编写脚本,以实现日常运维任务的自动化,从而提升工作效率和准确性。我们将通过一个实际案例,展示如何使用Python来自动部署应用、监控服务器状态并生成报告。文章不仅适合运维新手入门,也能为有经验的运维工程师提供新的视角和灵感。