【Kafka从入门到成神系列 二】Kafka集群参数配置

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【Kafka从入门到成神系列 二】Kafka集群参数配置

一、Kafka 线上部署集群

1. 操作系统

一般来说,我们操作系统主要有 3 种:Linux、Windows、macOS

我们的 Kafka 一般部署在 Linux 系统上,主要在以下三个方面,Linux 更胜一筹

  • I/O 模型的使用
  • 数据网络传输效率
  • 社区支持度

主流的 I/O 模型通常有 5 种类型:阻塞式 I/O、非阻塞式 I/O、I/O 多路复用、信号驱动 I/O、异步 I/O。

每种 I/O 模型都有各自的使用场景,Java 中的 Socket 对象的阻塞模式和非阻塞模式对应前两种模型;而 Linux 中的系统调用 select 函数就属于 I/O 多路复用模型;epoll 系统调用介于第三种和第四种模型之间;至于第五种模型,Window 提供了一个 IOCP 线程模型。


Kafka 客户端底层使用了 Java 的 selector,selector 在 Linux 的实现机制是 epoll,而在 Window 平台实现机制是 select。因此将 Kafka 部署在 Linux 是有优势的,能够获得更高效的 I/O 性能。


网络传输的差距。Kafka 生产者和消费者的消息通过网络传输,消息保存在磁盘。所以,Kafka 需要在磁盘和网络进行大量的数据传输。在 Linux 平台通过零拷贝(Zero Copy)技术,大大的提高了传输的效率。

零拷贝:当数据在磁盘和网络进行传输时避免昂贵的内核态数据拷贝从而实现快速的数据传输。

社区目前对 Window 的 Kafka Bug 不进行修改。Windows 平台上部署 Kafka 只适合于个人测试或用于功能验证,千万不要应用于生产环境。

2. 磁盘

对于 Kafka 来说,使用机械硬盘足够支撑我们的日常业务。Kafka 利用顺序读写操作,一定程度上规避了机械硬盘最大的劣势——随机读写操作慢

另外,关于磁盘阵列(RAID)的使用。使用 RAID 主要优势在于:

  • 提供冗余的磁盘存储空间
  • 提供负载均衡

Kafka 自己实现冗余机制提高可靠性。通过分区的概念,达到负载均衡。

3. 磁盘容量

假如某个业务一天需要向 Kafka 写入一亿条消息。每条消息保存 2 份防止数据丢失,消息默认保存两周时间,一条消息的大小是1KB。

总空间大小 = 1亿 * 2 * 1KB / 1024 / 1024 = 190G

我们 Kafka 除了消息还具有一些索引信息,我们留出 10% 的空间。

14 天总空间大小 = **190G / 90% * 14 = 2966GB **

Kafka 支持数据压缩,假设压缩比为 0.75,最终的磁盘预估大小为:2966GB * 0.75 = 2225GB

规划磁盘容量考虑以下:

  • 新增消息数量
  • 消息留存时间
  • 平均消息大小
  • 备份数
  • 是否启动压缩

4. 带宽

带宽主要分为两种:1Gbps 的千兆网络和10Gbps 的万兆网络。

带宽是 1Gbps,即每秒处理 1Gb 的数据。传输速度为每秒1000兆位。

如果我们业务目前是 1 小时处理 1TB 的业务数据

由于我们的 Kafka 所在的机器还有其他的服务,所以,假设我们 Kafka 会用到 70% 的带宽资源。也就相当于一小时处理:1Gbps * 70% = 700Mbps

这是我们理想状态下的最大带宽,我们不能让 Kafka 服务器使用这么多资源,所以经常要留出 2/3 的资源,相当于每一台使用带宽为:700Mbps / 3 = 240Mbps

我们 1 小时处理 1TB 的数据。最终使用的服务器个数为:1024 * 1024 / 3600 = 291MB,既每秒处理的数据为:291MB

1Mbps代表每秒传输1,000,000位(bit),即每秒传输的数据量为:1,000,000/8=125,000Byte/s。

我们带宽为 240Mbps

最终的机器数量:290 * 8 / 240 = 10台

如果我们的数据需要复制两份的话,那么我们最终的服务器:10 * 2 + 10 = 30台

因素 考量点 建议
操作系统 操作系统 I/O 模型 将 Kafka 部署在 Linux 系统上
磁盘 磁盘 I/O 性能 普通环境使用机械硬盘,不需要搭载 RAID
磁盘容量 根据消息数、留存时间预估磁盘容量 实际使用建议预留20%~30%的磁盘空间
带宽 根据实际带宽资源和业务预估服务器数量 对于千兆网络来说,建议每台服务器按照700Mbps计算,防止丢包

二、Kafka 集群参数配置

1. Broker 端参数

Broker 需要配置存储信息,既 Broker 使用哪些磁盘。

第一组参数是针对存储的:

  • log.dirs:指定了 Broker 需要使用的若干个文件目录途径。
  • log.dir:表示单个路径,对上面参数的补充

一般设置方式:log.dirs = /home/kafka1,/home/kafka2,/home/kafka3

这样设置的好处:

  • 提升读写性能:多块磁盘具有更高的吞吐量
  • 实现故障转移:Failover ,这是 Kafka 1.1 版本引进的强大功能。在以前,Broker 使用的任何一块磁盘挂掉,都导致整个 Broker 进程关闭。在 1.1 开始,我们挂掉的磁盘的数据会自动转移到其他正常的磁盘上,而且 Broker 还能正常工作。**我们的 Broker 自动在好的路径上重建副本,然后 Leader 同步。Kafka 支持工具能够将某个路径上的数据拷贝到其他的路径上。**这也是我们 Kafka 不使用 RAID(磁盘阵列) 的原因。

第二组参数是针对 Zookeeper 的配置:

Zookeeper 负责协调管理并保存 Kafka 集群的所有元数据信息。比如:Broker、Topic、Partition、Leader 副本的机器等信息

Zookeeper.connect:常规情况:zk1:2181,zk2:2181,zk3:2181。如果当前存在多个 Kafka 集群使用同一套 Zookeeper 集群,

  • ,该参数需要配置成:zk1:2181,zk2:2181,zk3:2181/kafka1zk1:2181,zk2:2181,zk3:2181/kafka2

**第三组参数是与 Broker 连接相关的,***既客户端或其他 Broker 如何与该 Broker 进行通信:

  • listeners:监听器。告诉外部连接者通过什么协议访问主机名和端口开放的 Kafka 服务
  • advertised.listeners:Advertised 表示公开的,既表明这组监听器是 Broker 用于对外发布的。一般用于 外网

监听器:若干个逗号分割的三元组,每个三元组的格式为:<协议名称,主机号,端口号>。这里的协议名称比如 PLAINTEXT 表示明文传输、SSL表示使用 SSL加密或 TLS 加密传输等。主机号一般采用主机名。

第四组参数是关于 Topic 管理的

  • auto.create.topics.enable:是否允许自动创建 Topic
  • auto.leader.election.enable:是否允许 Unclean Leader 选举
  • auto.leader,rebalance.enable:是否允许定期进行 Leader 选举

第一个参数:我们不建议允许自动创建 Topic,因此将 auto.create.topics.enable = false

第二个参数:关闭 Unclean Leader 选举。何为 Unclean,Kafka 中存在 Leader 和 Follower 副本机制,我们的 Follower 副本,如果落后 Leader 太多的情况下,我们不允许该副本竞选 Leader。当我们设置成auto.leader.election.enable = false,就算我们其他副本都挂了,我们也不会让这些落后太多的 Follower 副本竞选 Leader,反之亦然。一般我们会设置成 true

第三个参数:如果我们设置成auto.leader,rebalance.enable = true,我们允许我们的 Kafka 定期的对一些 Topic 分区进行 Leader 重选。如果设置成 true就算我们的 LeaderA 当的好好的,我们也会将其替换掉,这将会浪费大量的资源,我们直接建议将此参数设置成 false

第五组参数是数据留存方面的

  • log.retention.{hour|minutes|ms}:控制一条消息保存多少时间
  • log.retention.bytes:指定 Broker 为消息保存磁盘容量的大小
  • message.max.bytes:控制 Broker 能够接受的最大消息

第一个参数:我们一般会将 log.retention.hour = 168,表示保存 7 天的数据。

第二个参数:默认是 -1,表明你在这台 Broker 保存多少数据都可以。主要应用在云租户,控制用户的磁盘空间。

第三个参数:建议设置的大一点,反正没什么危害,如果设置小了,可能导致消息丢失。

2. Topic参数配置

当我们设置了 Topic 级别参数和全局 Broker 参数,我们的 Topic 级别参数会覆盖全局 Broker 参数的值。

在我们日常的业务中,不同的部门使用不同的 Topic,我们不可能为所有部门的 Topic 划分同样的留存时间。这个时候,设置 Topic 级别参数覆盖 Broker,就是一个不错的选择。

从保存消息来看:

  • retention.ms:规定了该 Topic 消息被保存的时长。默认是 七 天
  • retrnyion.bytes:规定了为该 Topic 预留多大的磁盘空间。

从处理消息大小来看:

  • max.message,bytes:Topic 的最大消息大小

Topic 级别参数设置:

  • 创建 Topic 时进行设置
  • 修改 Topic 时设置

第一种方式:bin/kafka-topics.sh--bootstrap-serverlocalhost:9092--create--topictransaction--partitions1--replicaation-factor1--configretention.ms=15552000000--configmax.message.bytes=5242880

第二种方式:bin/kafka-configs.sh--zookeeperlocalhost:2181--entity-typetopics--entity-nametransaction--alter--add-configmax.message.bytes=10485760

一般建议使用第二种方式来设置,在未来,Kafka 社区可能会统一使用 Kafka-configs 脚本来调整 Topic 参数。

3. JVM参数

一般我们会将 Kafka 布置在 Java 8 环境下

在 JVM 端,堆的大小极为重要。一般我们业界会将 JVM 堆的大小设置成 6GB

垃圾回收器的选择:

  • 如果 Broker 机器 CPU 比较充足,建议使用 CMS 收集器。使用方法:
  • ``XX:+UseCurrentMarkSweepGC`
  • 否则,使用吞吐量收集器:使用方法:-XX:+UseParallelGC

如果你已经使用了 Java 8 了,那么就用默认的 G1 收集器就可以了

为 Kafka 设置参数:

  • KAFKA_HEAP_OPTS:指定堆大小
  • KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 参数

例子如下:

$> export KAFKA_HEAP_OPTS=--Xms6g  --Xmx6g
$> export  KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
$> bin/kafka-server-start.sh config/server.properties

4. 操作系统参数

一般我们会关注下面几个:

  • 文件描述符限制
  • 文件系统类型
  • Swappiness
  • 提交时间

首先是 ulimit -n。一般我们都会将该值设置成一个超大的值,这个值其实没有特殊的含义,但是如果你设置小了,会经常报 Too many open files 的错误

其次是文件系统的选择,一般我们的文件系统有:EXT3、EXT4 、 XFS、ZFS 等。根据 Kafka 官网的测试,ZFS > XFS > EXT4 > EXT3

然后是 swap 的调优。首先解释一下 Swappiness 的意思:**Swappiness 是 Linux 内核的一个属性,它改变了换出运行时内存之间的平衡,而不是从系统页面缓存中删除页面。swappiness=0 的时候表示最大限度使用物理内存,然后才是swap空间,swappiness=100 的时候表示积极的使用swap分区,并且把内存上的数据及时的搬运到swap空间里面。主要做的是从内存到磁盘的交换,会加大系统IO,严重影响系统的性能。**我们一般将此参数设置为 1,同时设置告警,当我们的 Broker 进行内存到磁盘的交换时,我们会对其进一步调优和诊断问题。如果我们直接设置成 0,物理内存耗尽时,会直接触发  

OOM killer 随机挑选一个进程然后 kill 掉。

最后是关于 Flush 的落盘时间。当一条消息写入 Kafka 时,kafka 并不会直接去写入磁盘,而是会写入操作系统的页缓存(Page Cache)随后操作系统会根据 LRU 算法定期的将页缓存上的 数据落到物理磁盘中。这个定期的时间由我们自己设置,默认为 5 秒。



相关文章
|
7天前
|
消息中间件 Kafka 测试技术
【Kafka揭秘】Leader选举大揭秘!如何打造一个不丢失消息的强大Kafka集群?
【8月更文挑战第24天】Apache Kafka是一款高性能分布式消息系统,利用分区机制支持数据并行处理。每个分区含一个Leader处理所有读写请求,并可有多个副本确保数据安全与容错。关键的Leader选举机制保障了系统的高可用性和数据一致性。选举发生于分区创建、Leader故障或被手动移除时。Kafka提供多种选举策略:内嵌机制自动选择最新数据副本为新Leader;Unclean选举快速恢复服务但可能丢失数据;Delayed Unclean选举则避免短暂故障下的Unclean选举;Preferred选举允许基于性能或地理位置偏好指定特定副本为首选Leader。
28 5
|
5天前
|
消息中间件 监控 Java
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
|
7天前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
18 2
|
21天前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
52 7
|
2月前
|
消息中间件 Kafka
kafka 集群环境搭建
kafka 集群环境搭建
52 8
|
2月前
|
消息中间件 Kafka
面试题Kafka问题之RabbitMQ的路由配置工作如何解决
面试题Kafka问题之RabbitMQ的路由配置工作如何解决
42 1
|
19天前
|
消息中间件 Java Kafka
Linux——Kafka集群搭建
Linux——Kafka集群搭建
26 0
|
24天前
|
消息中间件 Kafka Apache
部署安装kafka集群
部署安装kafka集群
|
2月前
|
消息中间件 存储 缓存
微服务数据问题之Kafka的默认复制配置如何解决
微服务数据问题之Kafka的默认复制配置如何解决
|
2月前
|
消息中间件 存储 资源调度
实时计算 Flink版产品使用问题之在消费Kafka的Avro消息,如何配置FlinkKafka消费者的相关参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章

下一篇
云函数