Kafka两种集群详解和搭建教程

本文涉及的产品
.cn 域名,1个 12个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: Kafka两种集群详解和搭建教程

Kafka是一个能够支持高并发以及流式消息处理的消息中间件,并且Kafka天生就是支持集群的,今天就主要来介绍一下如何搭建Kafka集群。

Kafka目前支持使用Zookeeper模式搭建集群以及KRaft模式(即无Zookeeper)模式这两种模式搭建集群,这两种模式各有各的好处,今天就来分别介绍一下这两种方式。

1,Kafka集群中的节点类型

我们首先需要了解一下,一个Kafka集群是由下列几种类型的节点构成的,它们充当着不同的作用:

  • Broker节点:即代理节点,是Kafka中的工作节点,充当消息队列的角色,负责储存和处理消息,每个Broker都是一个独立的Kafka服务器,可以在不同的机器上运行,除此之外Broker还负责分区(partition)的管理,将主题(topic)划分为多个分区,并分布在集群的不同Broker上
  • Controller节点:即控制器节点,是集群中的特殊节点,负责储存和管理整个集群元数据状态,它能够监控整个集群中的Broker,在需要时还能够进行平衡操作
  • 混合节点:即同时担任Broker和Controller节点角色的节点

2,两种模式集群的搭建方式

接下来,我就来介绍一下两种模式的集群架构和搭建方式,即Zookeeper模式集群和KRaft模式集群。

(1) Zookeeper模式集群

这是一种比较简单,相对“传统”的搭建方式了!在这种模式下,每个Kafka节点都是依赖于Zookeeper的,使用Zookeeper存储集群中所有节点的元数据。

只要所有的Kafka节点连接到同一个Zookeeper上面(或者同一个Zookeeper集群),这些Kafka节点就构成了一个集群。所以说就算是只有一个Kafka节点在运行,这一个节点也可以称作一个集群。

在Zookeeper模式集群中,Zookeeper节点(或者集群)就充当了Controller的角色,而所有的Kafka节点就充当着Broker的角色。

image.png

下面就来介绍一下搭建过程,这里我在4台Linux虚拟机上分别运行Zookeeper和Kafka来模拟一个集群,一共一个Zookeeper节点和三个Kafka节点构成,如下:

节点名 地址和端口
Zookeeper节点 zookeeper:2181
Kafka节点1 kafka1:9092
Kafka节点2 kafka2:9092
Kafka节点3 kafka3:9092

上述地址例如kafka1等等,是通过修改虚拟机的主机名(hostname)实现的,这样虚拟机之间可以直接通过这些主机名相互访问,这个主机名我们就可以视作实际在服务器上面搭建时,服务器的外网地址或者域名,这里就不再赘述如何修改虚拟机的主机名了,需要保证上述所有虚拟机在一个虚拟机网段中并且能够互相ping通,即上述所有虚拟机需要两两之间可以通过网络互相访问。

运行Kafka和Zookeeper都需要Java 8及其以上运行环境,大家要首先在虚拟机中安装并配置好。

① 搭建Zookeeper

首先我们要运行起一个Zookeeper节点,这里就不再赘述Zookeeper节点如何搭建了!搭建可以查看官方文档,或者使用Docker的方式搭建。

搭建完成并运行Zookeeper之后,我们会把所有的Kafka节点都配置到这一个Zookeeper节点上。

② 配置并运行所有Kafka节点

首先去Kafka官网下载最新版并解压,然后将解压出来的Kafka分别复制到三台虚拟机中

然后修改每台虚拟机的Kafka目录中的配置文件,配置文件位于解压的Kafka文件夹中的config/server.properties,使用文本编辑器打开,并找到下列配置项进行配置:

  • broker.id 表示每个节点的id每个节点需要设置为不一样的整数,我这里分别设置为123
  • zookeeper.connect 表示要使用的Zookeeper的地址和端口,我这里都配置为zookeeper:2181
  • advertised.listeners 表示这个Kafka节点的外网地址,这里分别配置为PLAINTEXT://kafka1:9092PLAINTEXT://kafka2:9092PLAINTEXT://kafka3:9092,需要注意的是这个配置项必须要配置为其所在服务器的外网地址,如果说你是在一台服务器上面配置Kafka并要通过外网访问,这个配置就需要配置为服务器外网地址(域名),并且都以PLAINTEXT://开头

注意上述advertised.listeners这个配置项默认情况下是被注释掉了的,大家需要去仔细找一下并去掉注释(开头的#)然后再进行配置。

三台虚拟机配置完成后,分别使用终端进入到Kafka目录下并启动,执行下列命令:

bin/kafka-server-start.sh config/server.properties

在上述三台虚拟机上面都通过这个命令启动Kafka,如图则启动成功:

image.png

到此,整个集群就搭建完成了!大家需要保证上述三台虚拟机中的终端不被关闭。

③ 创建话题测试

我们先在kafka1的虚拟机上面再开一个终端并进入Kafka目录,执行下列命令创建Topic:

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092

然后去kafka2的虚拟机上面再开一个终端并进入Kafka目录,执行下列命令列出Topic:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

image.png

可见我们在第一个节点上创建了话题,但是在第二个节点上仍然可以获取这个话题,说明集群创建成功,数据在集群之间可以共享。

(2) KRaft模式集群

在上述传统方案中,Kafka需要依赖Zookeeper完成元数据存放和共享,这样也就暴露出了一些问题:

  • 搭建Kafka集群时还需要额外搭建Zookeeper,增加了运维成本
  • Zookeeper是强一致性的组件(符合CP理论),如果集群中数据发生变化,那么必须要等到其它节点都同步,至少超过一半同步完成,这样节点数多性能差

那么KRaft模式是新版本Kafka中推出的集群模式,这种模式下就完全不需要Zookeeper了!只需要数个Kafka节点就可以直接构成集群,在这时集群中的Kafka节点既有可能是Controller节点也可能是Broker节点,在这个模式中,我们不仅可以手动配置某个节点的角色(是Controller还是Broker),还可以使其同时担任Broker和Controller角色(混合节点)。

在KRaft模式中,集群的节点会通过投票选举的方式,选择出一个主要的Controller节点,这个节点也称作领导者,它将负责维护整个集群的元数据和状态信息,那么其它的Controller节点或者混合节点就称之为追随者,它们会从领导者同步集群元数据和状态信息。如果领导者宕机了,所有的节点会重新投票选举一个新的领导者。

在选举过程中,所有的节点都会参与投票过程,而候选节点只会是Controller节点或者混合节点(即Broker节点不会被选举为领导者)。

需要注意的是,在默认情况下Kafka集群中的Broker节点和Controller节点通常会监听不同的端口:

  • Broker节点是Kafka集群中的数据节点(消息队列),它们负责接收客户端的消息和传递消息给客户端,默认情况下,每个Broker节点会监听9092端口,该端口用于与客户端进行通信,客户端可以将消息发送到这个端口,或者从这个端口接收消息,这个端口可以称作客户端通信端口
  • Controller节点是Kafka集群中的控制器节点,负责管理集群的状态和元数据,Controller节点监听的端口通常是9093,该端口用于集群中其他节点获取元数据或在混合节点选举新的Controller时进行通信,通过该端口,其他节点可以与Controller节点交互,获取集群的元数据信息或参与控制器的选举过程,这个端口可以称作控制器端口
  • 混合节点(即同时担任Broker和Controller角色的节点)中,这两个端口都会被使用,默认情况下混合节点将监听9092端口接收和传递消息给客户端,并监听9093端口用于与其他节点进行元数据交换和控制器选举通信,可见混合节点会同时使用两个端口分别作为客户端通信端口控制器端口

所以需要根据实际情况配置网络设置和防火墙规则,以确保Kafka集群中的节点能够在正确的端口上进行通信。上述提到的两种端口也是可以修改的,当然不建议修改。

同样地,就算是你只是搭建了一个Kafka节点,这一个节点也仍然被视为一个Kafka集群,并且KRaft模式下如果只需要建立一个节点,那么这个节点必须是混合节点。

image.png

下面同样是开启三台虚拟机,搭建三个Kafka节点构成的KRaft模式集群如下:

节点名 地址 节点类型 客户端通信端口 控制器端口
Kafka节点1 kafka1 混合节点 9092 9093
Kafka节点2 kafka2 混合节点 9092 9093
Kafka节点3 kafka3 混合节点 9092 9093

这里就不再赘述下载Kafka的过程了!

① 修改配置文件

在KRaft模式下,配置文件位于Kafka目录中的config/kraft/server.properties,使用文本编辑器打开并找到下列配置以修改:

  • node.id 表示这个节点的id,一个集群中每个节点id不能重复,需要是不小于1的整数,这里三台虚拟机的配置分别为123(类似上述Zookeeper的broker.id配置)
  • controller.quorum.voters 设定投票者列表,即需要配置所有的Controller节点id及其地址端口,配置格式为节点1的id@节点1地址:节点1端口,节点2的id@节点2地址:节点2端口,节点3的id@节点3地址:节点3端口...,这里的端口需要是控制器端口,默认都是9093,上面也提到过了,默认不需要修改,我这里三台虚拟机的都配置为1@kafka1:9093,2@kafka2:9093,3@kafka3:9093(实际在服务器上搭建时替换为服务器的外网地址或者域名)
  • advertised.listeners 表示这个Kafka节点的外网地址,这里分别配置为PLAINTEXT://kafka1:9092PLAINTEXT://kafka2:9092PLAINTEXT://kafka3:9092(和上述Zookeeper模式中的一样,实际在服务器上搭建时替换为服务器的外网地址或者域名)

上述是必须要进行配置的,还有下面配置是可以选择性配置的:

  • process.roles 表示设定这个节点的类型,设定为broker表示设定这个节点为Broker节点,同样地设定controller表示设定为Controller节点,默认是broker,controller表示这个节点会自动切换节点类型,这里先保持默认不变,下面再来详细讨论

② 生成集群ID并使用集群ID格式化数据目录

在KRaft模式下,一个集群需要设定一个id,我们可以使用自带的命令生成,先进入上述任意一台虚拟机并使用终端进入Kafka目录中,执行下列命令生成一个UUID:

bin/kafka-storage.sh random-uuid

image.png

我们这里记录下这个ID以备用。

这个集群ID事实上是一个长度16位的字符串通过Base64编码后得来的,因此你也可以不使用上述命令,直接自定义一个16位长度的纯英文和数字组成的字符串,然后将这个字符串编码为Base64格式作为这个集群ID也可以。可以使用菜鸟工具中的在线Base64编码工具。

然后在上述三台虚拟机中,都使用终端进入Kafka目录后,执行下列命令:

bin/kafka-storage.sh format -t 生成的集群ID -c config/kraft/server.properties

这样,三个Kafka节点都使用了这一个ID完成了集群元数据配置,表示这三个Kafka节点构成一个集群。

③ 启动Kafka

同样地,在三台虚拟机中,都使用终端进入Kafka目录后,执行下列命令:

bin/kafka-server-start.sh config/kraft/server.properties

三台虚拟机全部启动后,这个集群才启动完毕。

④ 创建话题测试

同样地,现在第一个虚拟机的Kafka目录下执行下列命令:

bin/kafka-topics.sh --create --topic my-topic-kraft --bootstrap-server localhost:9092

然后在第二个虚拟机的Kafka目录下查看话题:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

image-20230612212505591

可见集群节点之间可以互相通信。

无论是在虚拟机还是服务器上,都要保证90929093端口开放,且所有虚拟机/服务器之间都能够两两互相访问(网络连通)!

3,重要配置介绍

无论是那种模式的集群,我们都涉及到了许多配置项,大家通过上述的搭建示例也能够了解到每个配置项的意义,这里就专门来着重介绍一下,Kafka中一些重要的配置项。

(1) listeners

这个配置项用于指定Kafka服务器监听客户端连接的地址和端口,当 Kafka 服务器启动时,它将监听listeners配置项中指定的地址和端口,等待客户端的连接请求。

一般情况下这个配置以PLAINTEXT://或者CONTROLLER://开头,意义如下:

  • 若这个节点是Broker节点,则以PLAINTEXT://开头
  • 若这个节点是Controller节点,则以CONTROLLER://开头
  • 若这个节点是混合节点,则需要同时配置两者开头的地址

这个配置项通常不需要修改,下面给出几个配置示例:

  • PLAINTEXT://:9092 本节点作为Broker节点,监听本机所有可用网卡9092端口(使用9092端口作为客户端通信端口),也是默认配置
  • PLAINTEXT://127.0.0.1:9092 本节点作为Broker节点,监听本地的9092端口,这样仅接受来自本地的请求
  • CONTROLLER://:10000 本节点作为Controller节点,监听本机所有可用网卡10000端口(使用10000端口作为控制器端口
  • PLAINTEXT://:9092,CONTROLLER://:9093 本节点作为混合节点,监听本机所有可用网卡90929093端口,其中9092作为客户端通信端口9093作为控制器端口

(2) advertise.listeners

这个配置容易和listeners混淆,事实上它们是有较大的区别的。

该配置项指定Kafka服务器广播给客户端的地址和端口,通常配置为Kafka所在服务器的外网地址

当客户端(生产者或消费者)尝试连接到Kafka服务器时,它首先会获取Kafka服务器广播的地址和端口,也就是advertise.listeners配置所指定的地址和端口,然后才会使用advertise.listeners配置所指定的地址和端口来建立与Kafka服务器的连接。

相信这时大家会有个疑问:既然客户端要连接Kafka(例如Spring Boot集成Kafka客户端),那一定是已经知道了Kafka对外的地址端口了,那为什么连接的时候还需要获取一下广播的地址端口再进行连接呢?这样是不是有一些多此一举?

事实上,Kafka设计这个配置是为了解决下面较为复杂的网络场景:

  • 多网络接口的主机部署:在一个多网络接口的主机部署Kafka时,Kafka服务器可能会监听多个地址和端口,这些地址和端口可能与客户端实际访问的地址和端口不同,advertise.listeners允许服务器指定一个公开的、可访问的地址和端口,以便客户端能够正确连接
  • NAT/代理环境:在某些网络环境下,Kafka服务器位于一个私有网络中,客户端位于一个公共网络中,两者之间可能存在网络地址转换(NAT)或代理,在这种情况下,Kafka服务器的内部地址和端口对客户端来说是不可访问的。通过使用advertise.listeners,Kafka服务器可以将一个公共地址和端口广播给客户端,使得客户端能够通过公共网络连接到服务器
  • 容器环境:例如你把Kafka放在Docker容器中运行,按照默认配置,Kafka服务端只会监听容器网络的9092端口,我们知道外部不能直接访问容器的网络,而是需要使用网络映射,假设你把Kafka容器的9092端口映射至了宿主机9095端口,也就是说外部需要通过9095端口访问到Kafka容器的9092端口,那么你就配置advertise.listenersPLAINTEXT://服务器外网地址:9095,客户端就可以正确访问容器中的Kafka了

总之,这个配置设置为Kafka服务器所在的外网地址即可!例如PLAINTEXT://69.54.112.239:9092

(3) process.roles

这是KRaft模式下专门的配置,用于配置这个节点的类型,可以配置为下列值:

  • broker 表示这个节点是Broker节点,充当消息队列的角色
  • controller 表示这个节点是Controller节点,充当元数据存放和管理的角色
  • broker,controller 表示这个节点同时担任Broker和Controller的角色,也称作混合节点

如果没有配置这个选项,则Kafka会以Zookeeper模式运行。

这里有下列注意事项:

  • 如果设定节点为controller
    • 不能配置advertised.listeners,可以将其注释掉或者删掉
    • listeners需要配置为CONTROLLER://开头,建议配置为CONTROLLER://:9093
  • 如果设定节点为broker
    • 则需要配置advertised.listeners为服务器外网地址和端口,这和Zookeeper模式中相同
    • listeners需要配置为PLAINTEXT://开头,建议配置为PLAINTEXT://:9092
  • 如果设定节点为混合节点:
    • 同样需要配置advertised.listeners为服务器外网地址和端口
    • listeners需要同时配置CONTROLLER://PLAINTEXT://,建议配置为PLAINTEXT://:9092,CONTROLLER://:9093

在开发环境或者小规模集群,可以全部使用混合节点,如果是生产环境就建议设定好每个节点的类型了!并且通常需要先启动Controller节点再启动Broker节点。

事实上,我们发现Kafka的KRaft配置目录config/kraft下有三个配置文件,其中server.properties是混合节点的配置模板,而broker.propertiescontroller.properties分别是Broker节点和Controller节点的配置模板,大家如果要设定节点类型,可以直接使用对应的配置文件,将对应配置文件需要修改的部分修改一下,然后将上述格式化数据目录命令和启动命令中的配置文件路径改变一下即可,这样可以省略我们设定process.roleslisteners或者控制器节点删除advertise.listeners配置的操作。

(4) controller.quorum.voters

该配置项用于配置集群中Controller节点选举过程中的投票者,集群中所有的Controller节点都需要被罗列在这个配置项中,其配置格式为id1@host1:port1,id2@host2:port2,id3@host3:port3...

有的同学可能认为这里需要把集群中所有节点都写进去,事实上这是错误的,这里只需要写所有的Controller节点和混合节点id、地址和端口即可,这个配置中配置的端口当然是控制器端口

上述集群搭建的例子中,由于所有的节点都是混合节点,因此就全部写在其中了!如果我们手动设定每个节点的类型,例如:

节点名 节点id 地址 服务器通信端口 控制器端口 节点类型
Kafka节点1 1 kafka1 / 9093 Controller
Kafka节点2 2 kafka2 9092 / Broker
Kafka节点3 3 kafka3 9092 / Broker

那么所有节点的controller.quorum.voters都需要配置为1@kafka1:9093

事实上,所有的节点都是通过这个配置中的节点列表,来得知所有的控制器节点信息(以获取集群元数据)并得到投票候选者的,因此集群中所有节点,不论是Broker还是Controller,还是混合节点,都需要配置这一项。

(5) 其它配置

除了上述我们涉及到的一些配置之外,还有下列配置大家可以进行修改:

  • socket.send.buffer.bytes 每次发送的数据包的最大大小(单位:字节)
  • socket.receive.buffer.bytes 每次接收的数据包的最大大小(单位:字节)
  • socket.request.max.bytes 接收的最大请求大小(单位:字节)
  • num.partitions 每个Topic的默认分区数

上述无论是哪个模式的集群,都可以在配置文件中找到这些配置,如果找不到可手动加入。除了修改配置文件之外,我们还可以在启动Kafka的命令中指定配置和值,例如:

bin/kafka-server-start.sh config/server.properties --override zookeeper.connect=127.0.0.1:2181 --override broker.id=1

上述命令在启动时通过命令指定了zookeeper.connect配置值为127.0.0.1:2181,以及broker.id1,可见在后面追加--override 配置名=值即可,注意命令行中指定的配置值会覆盖掉配置文件中的配置值!

4,总结

本文介绍了Kafka的两种集群模式结构及其搭建方式,其中一些概念尤其是KRaft模式可能需要大家多读几遍,并结合实际操作才能明白。

参考文档:

相关文章
|
24天前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
58 4
|
1月前
|
消息中间件 监控 数据可视化
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
54 2
|
22天前
|
消息中间件 存储 Prometheus
Kafka集群如何配置高可用性
Kafka集群如何配置高可用性
|
1月前
|
消息中间件 分布式计算 监控
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
69 6
|
1月前
|
消息中间件 Kafka API
|
2月前
|
消息中间件 Kafka API
kafka使用教程
kafka使用教程
|
3月前
|
消息中间件 Kafka 测试技术
【Kafka揭秘】Leader选举大揭秘!如何打造一个不丢失消息的强大Kafka集群?
【8月更文挑战第24天】Apache Kafka是一款高性能分布式消息系统,利用分区机制支持数据并行处理。每个分区含一个Leader处理所有读写请求,并可有多个副本确保数据安全与容错。关键的Leader选举机制保障了系统的高可用性和数据一致性。选举发生于分区创建、Leader故障或被手动移除时。Kafka提供多种选举策略:内嵌机制自动选择最新数据副本为新Leader;Unclean选举快速恢复服务但可能丢失数据;Delayed Unclean选举则避免短暂故障下的Unclean选举;Preferred选举允许基于性能或地理位置偏好指定特定副本为首选Leader。
80 5
|
3月前
|
消息中间件 监控 Java
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
|
3月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
78 2
|
3月前
|
消息中间件 Java Kafka
Linux——Kafka集群搭建
Linux——Kafka集群搭建
49 0
下一篇
无影云桌面