【Kafka从入门到成神系列 二】Kafka集群参数配置

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【Kafka从入门到成神系列 二】Kafka集群参数配置

一、Kafka 线上部署集群

1. 操作系统

一般来说,我们操作系统主要有 3 种:Linux、Windows、macOS

我们的 Kafka 一般部署在 Linux 系统上,主要在以下三个方面,Linux 更胜一筹

  • I/O 模型的使用
  • 数据网络传输效率
  • 社区支持度

主流的 I/O 模型通常有 5 种类型:阻塞式 I/O、非阻塞式 I/O、I/O 多路复用、信号驱动 I/O、异步 I/O。

每种 I/O 模型都有各自的使用场景,Java 中的 Socket 对象的阻塞模式和非阻塞模式对应前两种模型;而 Linux 中的系统调用 select 函数就属于 I/O 多路复用模型;epoll 系统调用介于第三种和第四种模型之间;至于第五种模型,Window 提供了一个 IOCP 线程模型。


Kafka 客户端底层使用了 Java 的 selector,selector 在 Linux 的实现机制是 epoll,而在 Window 平台实现机制是 select。因此将 Kafka 部署在 Linux 是有优势的,能够获得更高效的 I/O 性能。


网络传输的差距。Kafka 生产者和消费者的消息通过网络传输,消息保存在磁盘。所以,Kafka 需要在磁盘和网络进行大量的数据传输。在 Linux 平台通过零拷贝(Zero Copy)技术,大大的提高了传输的效率。

零拷贝:当数据在磁盘和网络进行传输时避免昂贵的内核态数据拷贝从而实现快速的数据传输。

社区目前对 Window 的 Kafka Bug 不进行修改。Windows 平台上部署 Kafka 只适合于个人测试或用于功能验证,千万不要应用于生产环境。

2. 磁盘

对于 Kafka 来说,使用机械硬盘足够支撑我们的日常业务。Kafka 利用顺序读写操作,一定程度上规避了机械硬盘最大的劣势——随机读写操作慢

另外,关于磁盘阵列(RAID)的使用。使用 RAID 主要优势在于:

  • 提供冗余的磁盘存储空间
  • 提供负载均衡

Kafka 自己实现冗余机制提高可靠性。通过分区的概念,达到负载均衡。

3. 磁盘容量

假如某个业务一天需要向 Kafka 写入一亿条消息。每条消息保存 2 份防止数据丢失,消息默认保存两周时间,一条消息的大小是1KB。

总空间大小 = 1亿 * 2 * 1KB / 1024 / 1024 = 190G

我们 Kafka 除了消息还具有一些索引信息,我们留出 10% 的空间。

14 天总空间大小 = **190G / 90% * 14 = 2966GB **

Kafka 支持数据压缩,假设压缩比为 0.75,最终的磁盘预估大小为:2966GB * 0.75 = 2225GB

规划磁盘容量考虑以下:

  • 新增消息数量
  • 消息留存时间
  • 平均消息大小
  • 备份数
  • 是否启动压缩

4. 带宽

带宽主要分为两种:1Gbps 的千兆网络和10Gbps 的万兆网络。

带宽是 1Gbps,即每秒处理 1Gb 的数据。传输速度为每秒1000兆位。

如果我们业务目前是 1 小时处理 1TB 的业务数据

由于我们的 Kafka 所在的机器还有其他的服务,所以,假设我们 Kafka 会用到 70% 的带宽资源。也就相当于一小时处理:1Gbps * 70% = 700Mbps

这是我们理想状态下的最大带宽,我们不能让 Kafka 服务器使用这么多资源,所以经常要留出 2/3 的资源,相当于每一台使用带宽为:700Mbps / 3 = 240Mbps

我们 1 小时处理 1TB 的数据。最终使用的服务器个数为:1024 * 1024 / 3600 = 291MB,既每秒处理的数据为:291MB

1Mbps代表每秒传输1,000,000位(bit),即每秒传输的数据量为:1,000,000/8=125,000Byte/s。

我们带宽为 240Mbps

最终的机器数量:290 * 8 / 240 = 10台

如果我们的数据需要复制两份的话,那么我们最终的服务器:10 * 2 + 10 = 30台

因素 考量点 建议
操作系统 操作系统 I/O 模型 将 Kafka 部署在 Linux 系统上
磁盘 磁盘 I/O 性能 普通环境使用机械硬盘,不需要搭载 RAID
磁盘容量 根据消息数、留存时间预估磁盘容量 实际使用建议预留20%~30%的磁盘空间
带宽 根据实际带宽资源和业务预估服务器数量 对于千兆网络来说,建议每台服务器按照700Mbps计算,防止丢包

二、Kafka 集群参数配置

1. Broker 端参数

Broker 需要配置存储信息,既 Broker 使用哪些磁盘。

第一组参数是针对存储的:

  • log.dirs:指定了 Broker 需要使用的若干个文件目录途径。
  • log.dir:表示单个路径,对上面参数的补充

一般设置方式:log.dirs = /home/kafka1,/home/kafka2,/home/kafka3

这样设置的好处:

  • 提升读写性能:多块磁盘具有更高的吞吐量
  • 实现故障转移:Failover ,这是 Kafka 1.1 版本引进的强大功能。在以前,Broker 使用的任何一块磁盘挂掉,都导致整个 Broker 进程关闭。在 1.1 开始,我们挂掉的磁盘的数据会自动转移到其他正常的磁盘上,而且 Broker 还能正常工作。**我们的 Broker 自动在好的路径上重建副本,然后 Leader 同步。Kafka 支持工具能够将某个路径上的数据拷贝到其他的路径上。**这也是我们 Kafka 不使用 RAID(磁盘阵列) 的原因。

第二组参数是针对 Zookeeper 的配置:

Zookeeper 负责协调管理并保存 Kafka 集群的所有元数据信息。比如:Broker、Topic、Partition、Leader 副本的机器等信息

Zookeeper.connect:常规情况:zk1:2181,zk2:2181,zk3:2181。如果当前存在多个 Kafka 集群使用同一套 Zookeeper 集群,

  • ,该参数需要配置成:zk1:2181,zk2:2181,zk3:2181/kafka1zk1:2181,zk2:2181,zk3:2181/kafka2

**第三组参数是与 Broker 连接相关的,***既客户端或其他 Broker 如何与该 Broker 进行通信:

  • listeners:监听器。告诉外部连接者通过什么协议访问主机名和端口开放的 Kafka 服务
  • advertised.listeners:Advertised 表示公开的,既表明这组监听器是 Broker 用于对外发布的。一般用于 外网

监听器:若干个逗号分割的三元组,每个三元组的格式为:<协议名称,主机号,端口号>。这里的协议名称比如 PLAINTEXT 表示明文传输、SSL表示使用 SSL加密或 TLS 加密传输等。主机号一般采用主机名。

第四组参数是关于 Topic 管理的

  • auto.create.topics.enable:是否允许自动创建 Topic
  • auto.leader.election.enable:是否允许 Unclean Leader 选举
  • auto.leader,rebalance.enable:是否允许定期进行 Leader 选举

第一个参数:我们不建议允许自动创建 Topic,因此将 auto.create.topics.enable = false

第二个参数:关闭 Unclean Leader 选举。何为 Unclean,Kafka 中存在 Leader 和 Follower 副本机制,我们的 Follower 副本,如果落后 Leader 太多的情况下,我们不允许该副本竞选 Leader。当我们设置成auto.leader.election.enable = false,就算我们其他副本都挂了,我们也不会让这些落后太多的 Follower 副本竞选 Leader,反之亦然。一般我们会设置成 true

第三个参数:如果我们设置成auto.leader,rebalance.enable = true,我们允许我们的 Kafka 定期的对一些 Topic 分区进行 Leader 重选。如果设置成 true就算我们的 LeaderA 当的好好的,我们也会将其替换掉,这将会浪费大量的资源,我们直接建议将此参数设置成 false

第五组参数是数据留存方面的

  • log.retention.{hour|minutes|ms}:控制一条消息保存多少时间
  • log.retention.bytes:指定 Broker 为消息保存磁盘容量的大小
  • message.max.bytes:控制 Broker 能够接受的最大消息

第一个参数:我们一般会将 log.retention.hour = 168,表示保存 7 天的数据。

第二个参数:默认是 -1,表明你在这台 Broker 保存多少数据都可以。主要应用在云租户,控制用户的磁盘空间。

第三个参数:建议设置的大一点,反正没什么危害,如果设置小了,可能导致消息丢失。

2. Topic参数配置

当我们设置了 Topic 级别参数和全局 Broker 参数,我们的 Topic 级别参数会覆盖全局 Broker 参数的值。

在我们日常的业务中,不同的部门使用不同的 Topic,我们不可能为所有部门的 Topic 划分同样的留存时间。这个时候,设置 Topic 级别参数覆盖 Broker,就是一个不错的选择。

从保存消息来看:

  • retention.ms:规定了该 Topic 消息被保存的时长。默认是 七 天
  • retrnyion.bytes:规定了为该 Topic 预留多大的磁盘空间。

从处理消息大小来看:

  • max.message,bytes:Topic 的最大消息大小

Topic 级别参数设置:

  • 创建 Topic 时进行设置
  • 修改 Topic 时设置

第一种方式:bin/kafka-topics.sh--bootstrap-serverlocalhost:9092--create--topictransaction--partitions1--replicaation-factor1--configretention.ms=15552000000--configmax.message.bytes=5242880

第二种方式:bin/kafka-configs.sh--zookeeperlocalhost:2181--entity-typetopics--entity-nametransaction--alter--add-configmax.message.bytes=10485760

一般建议使用第二种方式来设置,在未来,Kafka 社区可能会统一使用 Kafka-configs 脚本来调整 Topic 参数。

3. JVM参数

一般我们会将 Kafka 布置在 Java 8 环境下

在 JVM 端,堆的大小极为重要。一般我们业界会将 JVM 堆的大小设置成 6GB

垃圾回收器的选择:

  • 如果 Broker 机器 CPU 比较充足,建议使用 CMS 收集器。使用方法:
  • ``XX:+UseCurrentMarkSweepGC`
  • 否则,使用吞吐量收集器:使用方法:-XX:+UseParallelGC

如果你已经使用了 Java 8 了,那么就用默认的 G1 收集器就可以了

为 Kafka 设置参数:

  • KAFKA_HEAP_OPTS:指定堆大小
  • KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 参数

例子如下:

$> export KAFKA_HEAP_OPTS=--Xms6g  --Xmx6g
$> export  KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
$> bin/kafka-server-start.sh config/server.properties

4. 操作系统参数

一般我们会关注下面几个:

  • 文件描述符限制
  • 文件系统类型
  • Swappiness
  • 提交时间

首先是 ulimit -n。一般我们都会将该值设置成一个超大的值,这个值其实没有特殊的含义,但是如果你设置小了,会经常报 Too many open files 的错误

其次是文件系统的选择,一般我们的文件系统有:EXT3、EXT4 、 XFS、ZFS 等。根据 Kafka 官网的测试,ZFS > XFS > EXT4 > EXT3

然后是 swap 的调优。首先解释一下 Swappiness 的意思:**Swappiness 是 Linux 内核的一个属性,它改变了换出运行时内存之间的平衡,而不是从系统页面缓存中删除页面。swappiness=0 的时候表示最大限度使用物理内存,然后才是swap空间,swappiness=100 的时候表示积极的使用swap分区,并且把内存上的数据及时的搬运到swap空间里面。主要做的是从内存到磁盘的交换,会加大系统IO,严重影响系统的性能。**我们一般将此参数设置为 1,同时设置告警,当我们的 Broker 进行内存到磁盘的交换时,我们会对其进一步调优和诊断问题。如果我们直接设置成 0,物理内存耗尽时,会直接触发  

OOM killer 随机挑选一个进程然后 kill 掉。

最后是关于 Flush 的落盘时间。当一条消息写入 Kafka 时,kafka 并不会直接去写入磁盘,而是会写入操作系统的页缓存(Page Cache)随后操作系统会根据 LRU 算法定期的将页缓存上的 数据落到物理磁盘中。这个定期的时间由我们自己设置,默认为 5 秒。



相关文章
|
23天前
|
消息中间件 安全 Kafka
2024年了,如何更好的搭建Kafka集群?
我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.6.1来搭建集群。
2024年了,如何更好的搭建Kafka集群?
|
24天前
|
消息中间件 安全 Kafka
一文搞懂Kafka中的listeners配置策略
1. listeners中的plaintext controller external是什么意思? 2. Kraft模式下controller和broker有何区别? 3. 集群节点之间同步什么数据,通过哪个端口,是否可以自定义端口? 4. 客户端通过哪个端口连接到kafka,通过9092连接的是什么,broker还是controller? 5. 为controller配置了单独的端口有什么用? 6. control.plane.listener.name与controller.listener.names有何区别?
|
1月前
|
消息中间件 存储 数据可视化
kafka高可用集群搭建
kafka高可用集群搭建
32 0
|
2月前
|
消息中间件 Java Kafka
kafka入门demo
kafka入门demo
22 0
|
1月前
|
消息中间件 Kafka Linux
Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群
Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群
34 0
|
1月前
|
消息中间件 数据可视化 关系型数据库
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
33 0
|
4天前
|
消息中间件 缓存 Java
Kafka Consumer java api 配置
Kafka Consumer java api 配置
|
1月前
|
消息中间件 监控 负载均衡
Kafka高级应用:如何配置处理MQ百万级消息队列?
在大数据时代,Apache Kafka作为一款高性能的分布式消息队列系统,广泛应用于处理大规模数据流。本文将深入探讨在Kafka环境中处理百万级消息队列的高级应用技巧。
133 0
|
1月前
|
消息中间件 Java Kafka
Apache Kafka-初体验Kafka(02)-Centos7下搭建单节点kafka_配置参数详解_基本命令实操
Apache Kafka-初体验Kafka(02)-Centos7下搭建单节点kafka_配置参数详解_基本命令实操
40 0
|
1月前
|
消息中间件 存储 分布式计算
Apache Kafka-初体验Kafka(01)-入门整体认识kafka
Apache Kafka-初体验Kafka(01)-入门整体认识kafka
32 0

相关产品

  • 云迁移中心