集群参数配置
静态参数和动态参数
静态参数是指在 Kafka 启动时配置的参数,一旦设置后,只能通过重启 Kafka 来更改。这些参数通常是对 Kafka 整体行为的全局设置,例如 Kafka 的监听端口、日志目录、副本数量等。静态参数的配置通常在 Kafka 的配置文件(如 server.properties)中进行。
动态参数是指在 Kafka 运行时可以动态修改的参数,而无需重启 Kafka。这些参数通常是对 Kafka 的某个特定组件或功能进行细粒度的调整。动态参数可以通过 Kafka 的命令行工具或 API 进行修改。
Broker
磁盘相关
在 Kafka 中,Broker 是消息队列的核心组件,负责接收、存储和转发消息。为了配置存储信息,我们需要设置一些重要的参数。
- log.dirs:这是一个非常重要的参数,用于指定 Broker 使用的文件目录路径。这个参数没有默认值,因此必须由用户自己指定。在生产环境中,建议为 log.dirs 配置多个路径,以提高读写性能和实现故障转移。具体格式是一个 CSV 格式,多个路径之间用逗号分隔,例如:
/home/kafka1,/home/kafka2,/home/kafka3
。如果有条件的话,最好将这些目录挂载到不同的物理磁盘上,以提高性能和可靠性。 - log.dir:这是 log.dirs 的补充参数,用于指定单个路径。在实际使用中,我们只需要设置 log.dirs 参数即可,不需要设置 log.dir。
为什么要为 log.dirs 配置多个路径呢?这是因为多块物理磁盘同时读写数据可以提高吞吐量,同时也能实现故障转移。在 Kafka 1.1 版本之前,如果 Broker 使用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。但是从 Kafka 1.1 版本开始,引入了 Failover 功能,坏掉的磁盘上的数据会自动转移到其他正常的磁盘上,Broker 仍然可以正常工作。这个改进使得我们不再依赖 RAID 来提供数据的可靠性,而是通过多块磁盘的故障转移来实现。
需要注意的是,如果使用了多个路径,Kafka 会根据一定的策略将消息分配到不同的路径上,以实现负载均衡。同时,Kafka 也会自动管理磁盘空间,当某个路径的磁盘空间不足时,会自动将消息转移到其他路径上。
总结一下,为了配置存储信息,我们需要设置 log.dirs 参数,为其配置多个路径,最好挂载到不同的物理磁盘上。这样可以提高读写性能和实现故障转移。同时,Kafka 会自动管理磁盘空间和实现负载均衡。这些配置可以在 Kafka 的配置文件中进行设置。
Zookeeper 相关
ZooKeeper 是一个分布式协调框架,用于协调和管理 Kafka 集群的元数据信息。它负责保存 Kafka 集群的配置信息,例如 Broker 的运行状态、Topic 的创建情况、分区信息以及 Leader 副本的位置等。
在 Kafka 中,与 ZooKeeper 相关的最重要的参数是zookeeper.connect
。这个参数是一个 CSV 格式的字符串,用于指定连接到 ZooKeeper 集群的地址和端口。例如,zk1:2181,zk2:2181,zk3:2181
表示连接到三个 ZooKeeper 节点,默认端口为 2181。
如果要让多个 Kafka 集群共享同一个 ZooKeeper 集群,可以使用chroot
参数来进行区分。chroot
是 ZooKeeper 的概念,类似于别名。假设有两个 Kafka 集群,分别命名为 kafka1 和 kafka2,那么可以将zookeeper.connect
参数设置为zk1:2181,zk2:2181,zk3:2181/kafka1
和zk1:2181,zk2:2181,zk3:2181/kafka2
。这样就可以通过chroot
来区分不同的 Kafka 集群。
需要注意的是,chroot
只需要在参数中指定一次,并且应该添加到最后。有时候会遇到这样的错误格式:zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3
,这是不正确的。
总结一下,设置与 ZooKeeper 相关的参数时,需要注意以下几点:
zookeeper.connect
参数是一个 CSV 格式的字符串,用于指定连接到 ZooKeeper 集群的地址和端口。- 如果多个 Kafka 集群共享同一个 ZooKeeper 集群,可以使用
chroot
参数来进行区分。 chroot
参数只需要在参数中指定一次,并且应该添加到最后。
下面是一个示例配置:
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka1
这个配置表示连接到三个 ZooKeeper 节点,并使用kafka1
作为chroot
。
Broker 相关
在 Kafka 中,listeners 参数用于指定外部连接者通过什么协议访问 Kafka 服务。它是一个逗号分隔的三元组列表,每个三元组由协议名称、主机名和端口号组成。协议名称可以是标准的协议,如 PLAINTEXT 表示明文传输,SSL 表示使用 SSL 或 TLS 加密传输等,也可以是自定义的协议名称。
举个例子,如果你定义了一个名为 CONTROLLER 的自定义协议,你可以在 listeners 参数中添加 CONTROLLER://localhost:9092
,表示该协议通过 localhost 的 9092 端口进行通信。
需要注意的是,如果你自定义了协议名称,你还需要通过 listener.security.protocol.map
参数告诉 Kafka 使用哪种安全协议。比如,如果你定义了 CONTROLLER 协议,并且该协议使用明文传输数据,你需要设置 listener.security.protocol.map=CONTROLLER:PLAINTEXT
。
另外,主机名和端口号比较直观,不需要过多解释。但是需要注意的是,建议在 Broker 端和客户端应用的配置中都使用主机名而不是 IP 地址。因为在 Kafka 的源代码中,也是使用主机名进行连接的。如果你在某些地方使用了 IP 地址进行连接,可能会导致连接失败的问题。
总结一下,listeners 参数用于指定 Kafka 服务的监听器,告诉外部连接者通过什么协议访问 Kafka。它是一个三元组列表,每个三元组由协议名称、主机名和端口号组成。建议使用主机名而不是 IP 地址进行配置。如果使用自定义协议,还需要通过 listener.security.protocol.map
参数指定安全协议。
Topic 相关
auto.create.topics.enable
该参数用于控制是否允许自动创建 Topic。建议将该参数设置为 false,即不允许自动创建 Topic。
在线上环境中,如果该参数被设置为 true,可能会导致出现很多名字稀奇古怪的 Topic。例如,当我们想要为名为 test 的 Topic 发送事件时,由于拼写错误将 test 写成了 tst,启动生产者程序后,一个名为 tst 的 Topic 就会被自动创建。这种情况下,好的运维应该防止这种情况的发生,特别是对于大公司而言,每个部门被分配的 Topic 应该由运维严格把控,不允许自行创建任何 Topic。
unclean.leader.election.enable
该参数用于控制是否允许 Unclean Leader 选举。Unclean Leader 选举是指在 Kafka 中,当保存数据较多的副本都挂掉时,是否允许从保存数据较少的副本中选举出新的 Leader。
在 Kafka 中,每个分区都有多个副本来提供高可用性,其中只有一个副本对外提供服务,即 Leader 副本。只有保存数据较多的副本才有资格竞选 Leader,而那些落后进度太多的副本没有资格竞选。
如果设置unclean.leader.election.enable
为 false,那么 Kafka 将坚持之前的原则,坚决不允许那些落后太多的副本竞选 Leader。这样做的后果是该分区将不可用,因为没有 Leader。
如果设置unclean.leader.election.enable
为 true,那么 Kafka 允许从那些保存数据较少的副本中选举出新的 Leader。这样做的后果是数据有可能丢失,因为这些副本保存的数据本来就不全,当成为 Leader 后,它本身就变得膨胀了,认为自己的数据才是权威的。
需要注意的是,该参数在最新版的 Kafka 中默认为 false。但是由于社区对该参数的默认值进行了多次更改,所以建议在使用时显式地将其设置为 false。
auto.leader.rebalance.enable
该参数用于控制是否允许 Kafka 定期进行 Leader 选举。建议将该参数设置为 false。
设置auto.leader.rebalance.enable
为 true 表示允许 Kafka 定期对一些 Topic 分区进行 Leader 重选举。需要满足一定的条件才会触发 Leader 重选举。
与unclean.leader.election.enable
参数不同的是,auto.leader.rebalance.enable
并不是选举新的 Leader,而是更换现有的 Leader。例如,如果 Leader A 一直表现良好,但是当auto.leader.rebalance.enable
为 true 时,经过一段时间后,Leader A 可能会被强制卸任,换成 Leader B。
需要注意的是,Leader 的更换代价很高。原本向 Leader A 发送请求的所有客户端都需要切换成向 Leader B 发送请求。而且这种 Leader 的更换本质上没有任何性能收益。
因此,在生产环境中,建议将auto.leader.rebalance.enable
设置为 false,避免不必要的 Leader 更换。
数据留存方面
在 Kafka 中,有一组参数用于控制数据的留存。下面我将逐个介绍这些参数。
log.retention.{hours|minutes|ms}
:这是一组参数,用于控制消息数据在 Kafka 中保存的时间。这三个参数分别是以小时(hours)、分钟(minutes)和毫秒(ms)为单位的时间间隔。优先级上,ms 设置最高,minutes 次之,hours 最低。通常情况下,我们会设置较长的时间间隔,比如 log.retention.hours=168 表示默认保存 7 天的数据,自动删除 7 天前的数据。如果将 Kafka 用作存储系统,那么这个值可能需要相应调大。log.retention.bytes
:这个参数用于指定 Broker 在磁盘上保存的消息数据的总容量大小。默认值为-1,表示没有容量限制,即可以保存任意大小的数据。这个参数在构建云上的多租户 Kafka 集群时发挥作用。假设你要提供一个云上的 Kafka 服务,每个租户只能使用 100GB 的磁盘空间,为了避免某个租户占用过多的磁盘空间,设置这个参数就非常重要了。message.max.bytes
:这个参数用于控制 Broker 能够接收的最大消息大小。默认值为 1000012,即不到 1MB。然而,在实际场景中,超过 1MB 的消息是很常见的。因此,在生产环境中,将这个值设置得比较大是比较保险的做法。这个参数只是一个标尺,仅仅衡量 Broker 能够处理的最大消息大小,即使设置得大一点也不会占用太多磁盘空间。
需要注意的是,这些参数都是可配置的,可以根据实际需求进行调整。在配置文件中,可以通过设置对应的属性来修改这些参数的值。例如,可以在server.properties
文件中添加以下配置来修改log.retention.hours
参数的值:
log.retention.hours=168
这样就将消息数据的保存时间设置为 7 天。
总结一下,这些参数在 Kafka 中起到了重要的作用,可以根据实际需求来调整,以满足不同的业务场景。
Topic 级别参数
Topic 级别的参数在 Kafka 中非常重要,它允许我们为每个 Topic 设置特定的参数值,这些参数会覆盖全局 Broker 参数的值。这样做的好处是可以根据不同的业务需求,为不同的 Topic 设置不同的参数,提高系统的灵活性和效率。
下面我将详细介绍几个重要的 Topic 级别参数,按照用途分组。
- 保存消息方面的参数:
retention.ms
:规定了该 Topic 消息被保存的时长。默认值是 7 天,即该 Topic 只保存最近 7 天的消息。如果设置了这个值,它会覆盖 Broker 端的全局参数值。通过设置不同的 retention.ms 值,我们可以根据业务需求来控制消息的保存时长,避免无效的数据占用过多的存储空间。retention.bytes
:规定了为该 Topic 预留的磁盘空间大小。和全局参数的作用类似,这个值在多租户的 Kafka 集群中非常有用。默认值是-1,表示可以无限使用磁盘空间。通过设置不同的 retention.bytes 值,我们可以根据不同的 Topic 的数据量来合理分配磁盘空间,避免存储空间不足的问题。
- 处理消息大小方面的参数:
max.message.bytes
:决定了 Kafka Broker 能够正常接收该 Topic 的最大消息大小。在很多公司中,Kafka 作为基础架构组件运行,承载了大量的业务数据。如果在全局层面上无法给出一个合适的最大消息值,那么允许不同的业务部门自行设定 Topic 级别的max.message.bytes
参数就显得非常必要了。通过设置不同的max.message.bytes
值,我们可以根据不同的业务需求来控制消息的大小,确保系统能够正常处理各种大小的消息。
通过设置 Topic 级别的参数,我们可以根据不同的业务需求来灵活地调整 Kafka 的配置,提高系统的性能和可用性。同时,这也是 Kafka 作为一个高性能分布式消息系统的重要特性之一。
需要注意的是,Topic 级别的参数只对该 Topic 中的消息生效,不会影响其他 Topic。如果没有为某个 Topic 设置特定的参数值,那么将会使用全局 Broker 参数的默认值。
除了上述介绍的参数,Kafka 还有其他一些 Topic 级别的参数,如cleanup.policy
、compression.type
等,它们都可以根据具体的业务需求进行设置。在实际应用中,我们可以根据不同的场景和需求,灵活地使用这些参数来优化 Kafka 集群的性能和可靠性。
总结一下,Topic 级别的参数允许我们为每个 Topic 设置特定的参数值,覆盖全局 Broker 参数的值。通过设置不同的参数值,我们可以根据业务需求来控制消息的保存时长、磁盘空间使用和消息大小等,提高系统的灵活性和效率。这是 Kafka 作为一个高性能分布式消息系统的重要特性之一。
在 Kafka 中,可以通过两种方式来设置 Topic 级别的参数:在创建 Topic 时设置和修改已存在的 Topic 时设置。
1. 创建 Topic 时设置参数
在创建 Topic 时,可以通过--config
参数来设置 Topic 级别的参数。例如,我们要创建一个名为transaction
的 Topic,并设置retention.ms
为 15552000000,max.message.bytes
为 5242880,可以使用以下命令:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880
在上述命令中,--config
后面的参数用于指定要设置的 Topic 级别参数。
2. 修改已存在的 Topic 时设置参数
可以使用kafka-configs
命令来修改已存在的 Topic 的参数。假设我们要将transaction
Topic 的max.message.bytes
修改为 10485760,可以使用以下命令:
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760
在上述命令中,--entity-type topics
表示要修改的实体类型为 Topic,--entity-name transaction
表示要修改的 Topic 名称,--alter
表示要进行修改操作,--add-config
后面的参数用于指定要修改的 Topic 级别参数及其新值。
个人建议
个人建议始终坚持使用第二种方式来设置 Topic 级别参数,并且在未来,Kafka 社区很有可能统一使用kafka-configs
脚本来调整 Topic 级别参数。这样做的好处是统一了设置参数的方式,减少了学习成本和混淆,同时也更加方便管理和维护。
JVM 参数
JVM 参数对于 Kafka 集群的性能和稳定性非常重要。在设置 JVM 参数之前,首先需要确定 Java 版本。对于 Kafka 来说,不推荐在 Java 6 或 7 的环境上运行,建议至少使用 Java 8。
在 JVM 参数设置中,堆大小是一个关键参数。尽管后面我们还会讨论如何调优 Kafka 性能的问题,但是现在我想给出一个通用的建议:将 JVM 堆大小设置为 6GB,这是目前业界普遍认可的一个合理值。很多人使用默认的堆大小来运行 Kafka,但是默认的 1GB 有点小,因为 Kafka Broker 在与客户端进行交互时会在 JVM 堆上创建大量的 ByteBuffer 实例,堆大小不能太小。
另一个重要的 JVM 参数是垃圾回收器(GC)的设置。如果你仍在使用 Java 7,可以根据以下规则选择合适的垃圾回收器:如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS(Concurrent Mark Sweep)收集器,启用方法是指定-XX:+UseConcMarkSweepGC
。否则,使用吞吐量收集器,启用方法是指定-XX:+UseParallelGC
。如果你使用 Java 8,可以手动设置使用 G1(Garbage First)收集器。在没有任何调优的情况下,G1 表现要比 CMS 更出色,主要体现在更少的 Full GC 和需要调整的参数更少等方面,所以使用 G1 就可以了。
现在我们确定了要设置的 JVM 参数,接下来我们来为 Kafka 进行设置。奇怪的是,这个问题在 Kafka 官网上居然没有被提及。实际上,设置的方法非常简单,你只需要设置下面这两个环境变量即可:
KAFKA_HEAP_OPTS
:指定堆大小。例如,你可以这样设置:export KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"
KAFKA_JVM_PERFORMANCE_OPTS
:指定 GC 参数。例如,你可以这样设置:export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
在启动 Kafka Broker 之前,先设置好这两个环境变量,然后执行启动命令,例如:
$ export KAFKA_HEAP_OPTS="-Xms6g -Xmx6g" $ export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true" $ bin/kafka-server-start.sh config/server.properties
这样就完成了 JVM 参数的设置。通过合理的设置,可以提高 Kafka 集群的性能和稳定性。需要注意的是,具体的参数设置可能因环境和需求而有所不同,可以根据实际情况进行调整。
操作系统参数
Kafka 集群通常需要设置一些操作系统参数来优化性能和稳定性。下面是一些常见的操作系统参数设置:
- 文件描述符限制(ulimit -n):文件描述符是操作系统用于跟踪打开文件的标识符。Kafka 集群需要同时打开大量的文件描述符,因此需要增加文件描述符限制。默认情况下,操作系统的文件描述符限制较低,可能会导致 Kafka 进程无法打开足够的文件描述符,从而影响性能。建议将文件描述符限制设置为一个较大的值,例如 ulimit -n 1000000。
- 文件系统类型的选择:Kafka 集群的性能和稳定性受到文件系统的影响。根据官方测试报告,XFS 文件系统的性能要优于 ext4 文件系统。因此,在生产环境中最好选择 XFS 文件系统。最近也有一些关于 Kafka 使用 ZFS 文件系统的报告,显示其性能更强劲,如果条件允许,可以尝试使用 ZFS 文件系统。
- Swap 的调优:Swap 是操作系统用于将内存中不常用的数据暂时存储在磁盘上的机制。一些文章建议将 Swap 设置为 0,完全禁用 Swap,以防止 Kafka 进程使用 Swap 空间。然而,个人认为最好不要将 Swap 设置为 0,而是设置为一个较小的值。这是因为当物理内存耗尽时,操作系统会触发 OOM killer 组件,随机选择一个进程并杀死它,而不给用户任何预警。如果将 Swap 设置为一个较小的值,当开始使用 Swap 空间时,你至少能观察到 Broker 性能的急剧下降,从而有时间进行进一步的调优和问题诊断。建议将 swappiness 设置为接近 0 但不为 0 的值,例如 1。
- 提交时间(Flush 落盘时间):Kafka 发送数据时,并不需要等待数据被写入磁盘才认为成功,只要数据被写入操作系统的页缓存(Page Cache)即可。操作系统会根据 LRU 算法定期将页缓存上的“脏”数据写入物理磁盘。提交时间决定了这个定期的间隔,默认为 5 秒。通常情况下,这个时间间隔可能太频繁,可以适当增加提交时间间隔来降低物理磁盘的写操作。需要注意的是,如果数据在写入磁盘之前发生机器宕机,数据将会丢失。但由于 Kafka 在软件层面提供了多副本的冗余机制,因此可以适当增加提交时间间隔以换取性能。
需要注意的是,以上参数设置是一般情况下的建议,具体的设置还需要根据实际情况和硬件配置进行调整。另外,不同的操作系统和版本可能会有不同的参数设置方式,请参考相应的操作系统文档或官方建议进行设置。
下面是一个示例,展示如何在 Linux 系统上设置 ulimit -n 参数:
# 查看当前文件描述符限制 ulimit -n # 修改文件描述符限制为 1000000 ulimit -n 1000000 # 验证修改是否生效 ulimit -n
请注意,以上示例仅适用于 Linux 系统,其他操作系统可能有不同的设置方式。
参考资料
[1]
首发博客地址: https://blog.zysicyj.top/
[2] [3]系列文章地址: https://blog.zysicyj.top/categories/技术文章/后端技术/系列文章/Kafka/
本文由 mdnice 多平台发布