跟我学Kafka:如何高效运维之主题篇

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 跟我学Kafka:如何高效运维之主题篇

1、Kafka topic运维命令的基本使用


Kafka提供了kafka-topics脚步用来创建、修改、删除、查询topic,位于${kafka_home}/bin/kafka-topics.sh,其中kafka_home表示Kafka的安装目录。

image.png

image.png

接下来对一些上述一些不那么直观的选项进行单独介绍。


1.1 --replica-assignment


收到指定副本数量和分区信息,该参数不能和--partitions、--replication-factor同时使用。

cc407087d48f090ea9d79cb7831eb6b7.png

其格式为:每一个逗号表示一个分区的配置,每一个分区分布的broker用冒号隔开。


--replication-factor 0:1,1:2,0:2 表示的含义是什么呢?


分区数量为3个,其中分区0(p0)分布在broker 0和1上,分区1(p1)分布在broker 1,2上,分区2(p2)分布在broker 0与2上。从而推出分区数量为3,副本因子为2,每一个分区的第一个broker为Leader,其演示效果如下:

139d763d66b5e5cf83f49d44da771a0c.png


2、Kafka Topic配置项详解


通过kafka-topics脚本在创建topic时可通过--config选项来定制化topic的属性,接下来试图从这些属性来探究Kafka背后的运作机制。


  • cleanup.policy
    数据文件清除机制,支持Broker全局配置,Topic定制化制定,可选策略:delete、compact,默认值为delete。Kafka提供了数据段压缩的功能,按照相同Key只保留最新Key的策略,减少数据段大小,系统主题__consumer_offsets(用于存储消息进度的主题)其清除策略就是compact。
  • compression.type
    压缩类型,Kafka目前支持的压缩算法:gzip,snappy,lz4,zstd,还支持如下两个配置:
  • uncompressed
    不开启压缩
  • producer
    由发送方指定压缩算法,客户端的可选值为gzip,snappy,lz4,zstd。
    数据进行压缩,能节省网络带宽与存储空间,但会增加CPU的性能,故最佳实践:Broker服务端不配置压缩算法,由发送方指定,在发送方进行压缩,服务端原封不动进行存储,并且在消费端解压缩
  • delete.retention.ms
    如果cleanup.policy策略为compact时,针对消息体为null的消息,Kafka会认为对其进行压缩没有意义,立马删除也太草率,故Kafka引入了该参数,用来设置这些body为null的消息,在一次压缩执行后,多久后可被删除,默认值为24h。
  • file.delete.delay.ms
    文件在删除时延迟时间,默认为60s,Kafka中可以支持按topic删除日志文件(数据文件),执行删除之前,首先会将该topic下的分区文件重名为*.deleted,等待file.delete.delay.ms才从文件系统中删除。
  • flush.messages
    按消息条数设置刷盘频率,如果设置为1表示每写一条消息就触发一次刷盘,默认值为Long.MaxValue,在大部分场景官方不建议设置该值,直接利用操作系统的刷盘机制即可,Kafka希望通过副本机制能保证数据的持久可靠存储
  • flush.ms
    按时间间隔设置刷盘频率,默认为Long.MaxValue,Kafka希望借助操作系统的刷盘机制,数据可靠性通过副本机制来保证。(副本机制其实无法保证同机房断电带来的数据丢失)
  • index.interval.bytes
    索引文件的密度,Kafka并不会为每一条消息(消息偏移量)建立索引,而是每隔一定间隔,建立一条索引。该参数就是设置其间隔,默认为4096个字节。
  • max.message.bytes
    一次消息发送(Batch)允许的最大字节数量,默认为1000000,约等于1M。
  • message.downconversion.enable
    是否开启消息格式的自动转化,如果设置为false,Broker不会执行消息格式转化,将不兼容老的客户端消费消息。
  • message.format.version
    可以指定该主题按特定版本的API版本所对应的存储格式进行存储。
  • message.timestamp.type
    设置消息中存储的时间戳的获取方式,可选值:
  • CreateTime
    消息在客户端的创建时间
  • LogAppendTime
    Broker服务端接收到的时间
    默认为CreateTime。
  • message.timestamp.difference.max.ms
    当message.timestamp.type设置为CreateTime时,允许Broker端时间与消息创建时间戳最大的差值,如果超过该参数设置的阔值,Broker会拒绝存储该消息,默认为:Long.MaxValue,表示不开启开机制
  • min.cleanable.dirty.ratio
    控制可压缩的脏数据比例,默认为0.5d,如果一个文件中"脏数据"(未被压缩的数据)低于该阔值,将不继续对该文件进行压缩,该方法生效的条件为cleanup.policy设置为compact
  • min.compaction.lag.ms
    设置一条消息进入到Broker后多久之内不能被compact,默认为0,表示不启用该特性,该方法生效的条件为cleanup.policy设置为compact
  • min.insync.replicas
    如果客户端在消息发送时将ack设置为all,该参数指定必须至少多少个副本写入成功,才能向客户端返回成功,默认为1,这个是一个兜底配置,all的含义表示在ISR中的副本必须全部写入成功。
  • preallocate
    是否开启预热文件(提前创建文件),默认为false。
  • retention.bytes
    一个日志分区保留的最大字节数,默认为-1,表示不限制。
  • retention.ms
    一个日志分区允许保留的最大时长,默认保留7d。
  • segment.bytes
    一个日志段的大小,默认为1G。
  • segment.index.bytes
    一个日志段索引文件的大小,默认为10M。
  • segment.jitter.ms
    段滚动的最大随机差。
  • segment.ms
    Kafka强制滚动一个段的间隔时间,及时该段并未全部填满消息,默认值为7d
  • unclean.leader.election.enable
    是否允许不在ISR中副本在没有ISR副本选择之后竞争成为Leader,这样做有可能丢数据,默认为false。


3、总结


本文从运维命令开始学习,从使用运维层面全面了解Topic,从而窥探其Kafka内部一些重要特性,为后续从源码角度研究其实现打下坚实基础,本文的最后给出一个分区数量为3,副本因子为3的topic分区图来结束本文的讲解。

0ab8c17428c76580d1977bf310890c77.png

相关文章
|
8月前
|
消息中间件 监控 安全
探究Kafka主题删除失败的根本原因
探究Kafka主题删除失败的根本原因
107 0
|
消息中间件 Java Kafka
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
17444 1
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
|
4月前
|
消息中间件 运维 Linux
linux之centos运维kafka
linux之centos运维kafka
|
6月前
|
消息中间件 存储 Kafka
深入理解Kafka核心设计及原理(四):主题管理
深入理解Kafka核心设计及原理(四):主题管理
90 8
|
7月前
|
消息中间件 监控 安全
探究Kafka主题删除失败的根本原因
探究Kafka主题删除失败的根本原因
64 0
|
消息中间件 存储 Kafka
Kafka主题,分区,副本介绍
今天分享一下kafka的主题(topic),分区(partition)和副本(replication),主题是Kafka中很重要的部分,消息的生产和消费都要以主题为基础,一个主题可以对应多个分区,一个分区属于某个主题,一个分区又可以对应多个副本,副本分为leader和follower。
149 0
|
消息中间件 存储 分布式计算
运维zookeeper+kafka
zookeeper+kafka
186 1
|
消息中间件 SQL JSON
【大数据开发运维解决方案】Kylin消费Kafka数据流式构建cube
文章开始之前先说明环境情况,这里kylin消费的kafka数据是从Oracle 数据库用Ogg For Bigdata以json格式将数据投递到kafka topic的,投递的时候,关于insert和update 之前的数据投递到名为 ZTVOUCHER_INS 的topic,而delete和update之后的数据投递到名为 ZTVOUCHER_DEL 的topic中,这里主要介绍kylin如何消费数据创建流式cube。
【大数据开发运维解决方案】Kylin消费Kafka数据流式构建cube
|
API Apache
Apache Kafka-通过API获取主题所有分区的积压消息数量
Apache Kafka-通过API获取主题所有分区的积压消息数量
187 0
BXA
|
消息中间件 存储 Prometheus
Kafka运维与监控
Kafka是由Apache Software Foundation开发的一款分布式流处理平台和消息队列系统 可以处理大规模的实时数据流,具有高吞吐量、低延迟、持久性和可扩展性等优点 常用于数据架构、数据管道、日志聚合、事件驱动等场景,对Kafka的运维和监控十分必要 本文旨在介绍Kafka的运维和监控相关内容
BXA
361 0

热门文章

最新文章