Kafka 3.0新特性 详解(一)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 导语 | kafka3.0的版本已经试推行去zk的kafka架构了,如果去掉了zk,那么在kafka新的版本当中使用什么技术来代替了zk的位置呢,接下来我们一起来一探究竟,了解kafka的内置共识机制和raft算法

一、Kafka简介


Kafka是一款开源的消息引擎系统。一个典型的Kafka体系架构包括若干Producer、若干Broker、若干Consumer,以及一个ZooKeeper集群,如上图所示。其中ZooKeeper是Kafka用来负责集群元数据的管理、控制器的选举等操作的。Producer将消息发送到Broker,Broker负责将收到的消息存储到磁盘中,而Consumer负责从Broker订阅并消费消息。


(一)Kafka核心组件


producer:消息生产者,就是向broker发送消息的客户端。


consumer:消息消费者,就是从broker拉取数据的客户端。


consumer group:消费者组,由多个消费者consumer组成。消费者组内每个消费者负责消费不同的分区,一个分区只能由同一个消费者组内的一个消费者消费;消费者组之间相互独立,互不影响。所有的消费者都属于某个消费者组,即消费者组是一个逻辑上的订阅者。


broker:一台服务器就是一个broker,一个集群由多个broker组成,一个broker可以有多个topic。


topic:可以理解为一个队列,所有的生产者和消费者都是面向topic的。


partition:分区,kafka中的topic为了提高拓展性和实现高可用而将它分布到不同的broker中,一个topic可以分为多个partition,每个partition都是有序的,即消息发送到队列的顺序跟消费时拉取到的顺序是一致的。


replication:副本。一个topic对应的分区partition可以有多个副本,多个副本中只有一个为leader,其余的为follower。为了保证数据的高可用性,leader和follower会尽量均匀的分布在各个broker中,避免了leader所在的服务器宕机而导致topic不可用的问题。



(二)kafka2当中zk的作用


117.png

/admin:主要保存kafka当中的核心的重要信息,包括类似于已经删除的topic就会保存在这个路径下面。


/brokers:主要用于保存kafka集群当中的broker信息,以及没被删除的topic信息。


/cluster: 主要用于保存kafka集群的唯一id信息,每个kafka集群都会给分配要给唯一id,以及对应的版本号。


/config: 集群配置信息。


/controller:kafka集群当中的控制器信息,控制器组件(Controller),是Apache Kafka的核心组件。它的主要作用是在Apache ZooKeeper的帮助下管理和协调整个Kafka集群。


/controller_epoch:主要用于保存记录controller的选举的次数。


/isr_change_notification:isr列表发生变更时候的通知,在kafka当中由于存在ISR列表变更的情况发生,为了保证ISR列表更新的及时性,定义了isr_change_notification这个节点,主要用于通知Controller来及时将ISR列表进行变更。


/latest_producer_id_block:使用/latest_producer_id_block节点来保存PID块,主要用于能够保证生产者的任意写入请求都能够得到响应。


/log_dir_event_notification:主要用于保存当broker当中某些LogDir出现异常时候,例如磁盘损坏,文件读写失败等异常时候,向ZK当中增加一个通知序号,controller监听到这个节点的变化之后,就会做出对应的处理操作。


以上就是kafka在zk当中保留的所有的所有的相关的元数据信息,这些元数据信息保证了kafka集群的正常运行。


二、kafka3的安装配置

在kafka3的版本当中已经彻底去掉了对zk的依赖,如果没有了zk集群,那么kafka当中是如何保存元数据信息的呢,这里我们通过kafka3的集群来一探究竟。


(一)kafka安装配置核心重要参数

  • Controller服务器


不管是kafka2还是kafka3当中,controller控制器都是必不可少的,通过controller控制器来维护kafka集群的正常运行,例如ISR列表的变更,broker的上线或者下线,topic的创建,分区的指定等等各种操作都需要依赖于Controller,在kafka2当中,controller的选举需要通过zk来实现,我们没法控制哪些机器选举成为Controller,而在kafka3当中,我们可以通过配置文件来自己指定哪些机器成为Controller,这样做的好处就是我们可以指定一些配置比较高的机器作为Controller节点,从而保证controller节点的稳健性。


被选中的controller节点参与元数据集群的选举,每个controller节点要么是Active状态,或者就是standBy状态。


118.png

Process.Roles

使用KRaft模式来运行kafka集群的话,我们有一个配置叫做Process.Roles必须配置,这个参数有以下四个值可以进行配置:


Process.Roles=Broker, 服务器在KRaft模式中充当Broker。


Process.Roles=Controller, 服务器在KRaft模式下充当Controller。


Process.Roles=Broker,Controller,服务器在KRaft模式中同时充当Broker和Controller。


如果process.roles没有设置。那么集群就假定是运行在ZooKeeper模式下。


如果需要从zookeeper模式转换成为KRaft模式,那么需要进行重新格式化。如果一个节点同时是Broker和Controller节点,那么就称之为组合节点。


实际工作当中,如果有条件的话,尽量还是将Broker和Controller节点进行分离部署。避免由于服务器资源不够的情况导致OOM等一系列的问题


Quorum Voters

通过controller.quorum.voters配置来实习哪些节点是Quorum的投票节点,所有想要成为控制器的节点,都必须放到这个配置里面。


每个Broker和每个Controller都必须配置Controller.quorum.voters,该配置当中提供的节点ID必须与提供给服务器的节点ID保持一直。


每个Broker和每个Controller 都必须设置 controller.quorum.voters。需要注意的是,controller.quorum.voters 配置中提供的节点ID必须与提供给服务器的节点ID匹配。


比如在Controller1上,node.Id必须设置为1,以此类推。注意,控制器id不强制要求你从0或1开始。然而,分配节点ID的最简单和最不容易混淆的方法是给每个服务器一个数字ID,然后从0开始。


(二)下载并解压安装包


bigdata01下载kafka的安装包,并进行解压:


[hadoop@bigdata01 kraft]$ cd /opt/soft/[hadoop@bigdata01 soft]$ wget http://archive.apache.org/dist/kafka/3.1.0/kafka_2.12-3.1.0.tgz[hadoop@bigdata01 soft]$ tar -zxf kafka_2.12-3.1.0.tgz -C /opt/install/


修改kafka的配置文件broker.properties:


[hadoop@bigdata01 kafka_2.12-3.1.0]$ cd /opt/install/kafka_2.12-3.1.0/config/kraft/[hadoop@bigdata01 kraft]$ vim broker.properties

修改编辑内容如下:


node.id=1controller.quorum.voters=1@bigdata01:9093listeners=PLAINTEXT://bigdata01:9092advertised.listeners=PLAINTEXT://bigdata01:9092log.dirs=/opt/install/kafka_2.12-3.1.0/kraftlogs


创建两个文件夹:


[hadoop@bigdata01 kafka_2.12-3.1.0]$ mkdir -p /opt/install/kafka_2.12-3.1.0/kraftlogs[hadoop@bigdata01 kafka_2.12-3.1.0]$ mkdir -p /opt/install/kafka_2.12-3.1.0/topiclogs


同步安装包到其他机器上面去。



(三)服务器集群启动


启动kafka服务:


[hadoop@bigdata01 kafka_2.12-3.1.0]$  ./bin/kafka-storage.sh random-uuidYkJwr6RESgSJ


(四)创建kafka的topic


集群启动成功之后,就可以来创建kafka的topic了,使用以下命令来创建kafka的topic:


./bin/kafka-topics.sh --create --topic kafka_test --partitions 3 --replication-factor 2 --bootstr


(五)任意一台机器查看kafka的topic


组成集群之后,任意一台机器就可以通过以下命令来查看到刚才创建的topic了:


[hadoop@bigdata03 ~]$ cd /opt/install/kafka_2.12-3.1.0/[hadoop@bigdata03 kafka_2.12-3.1.0]$ bin/kafka-topics.sh  --list --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092

(六)消息生产与消费


使用命令行来生产以及消费kafka当中的消息:


[hadoop@bigdata01 kafka_2.12-3.1.0]$ bin/kafka-console-producer.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic kafka_test
[hadoop@bigdata02 kafka_2.12-3.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic kafka_test --from-beginning


三、Kafka当中Raft的介绍

(一)kafka强依赖zk所引发的问题


前面我们已经看到了kafka3集群在没有zk集群的依赖下,也可以正常运行,那么kafka2在zk当中保存的各种重要元数据信息,在kafka3当中如何实现保存的呢?


kafka一直都是使用zk来管理集群以及所有的topic的元数据,并且使用了zk的强一致性来选举集群的controller,controller对整个集群的管理至关重要,包括分区的新增,ISR列表的维护,等等很多功能都需要靠controller来实现,然后使用zk来维护kafka的元数据也存在很多的问题以及存在性能瓶颈。


以下是kafka将元数据保存在zk当中的诸多问题。


元数据存取困难

元数据的存取过于困难,每次重新选举的controller需要把整个集群的元数据重新restore,非常的耗时且影响集群的可用性。


元数据更新网络开销大

整个元数据的更新操作也是以全量推的方式进行,网络的开销也会非常大。


强耦合违背软件设计原则

Zookeeper对于运维来说,维护Zookeeper也需要一定的开销,并且kafka强耦合与zk也并不好,还得时刻担心zk的宕机问题,违背软件设计的高内聚,低耦合的原则。


网络分区复杂度高

Zookeeper本身并不能兼顾到broker与broker之间通信的状态,这就会导致网络分区的复杂度成几何倍数增长。


zk本身不适合做消息队列

zookeeper不适合做消息队列,因为zookeeper有1M的消息大小限制 zookeeper的children太多会极大的影响性能znode太大也会影响性能 znode太大会导致重启zkserver耗时10-15分钟 zookeeper仅使用内存作为存储,所以不能存储太多东西。


并发访问zk问题多

最好单线程操作zk客户端,不要并发,临界、竞态问题太多。


基于以上各种问题,所以提出了脱离zk的方案,转向自助研发强一致性的元数据解决方案,也就是KIP-500。


KIP-500议案提出了在Kafka中处理元数据的更好方法。基本思想是"Kafka on Kafka",将Kafka的元数据存储在Kafka本身中,无需增加额外的外部存储比如ZooKeeper等。


去zookeeper之后的kafka新的架构


0.png

在KIP-500中,Kafka控制器会将其元数据存储在Kafka分区中,而不是存储在ZooKeeper中。但是,由于控制器依赖于该分区,因此分区本身不能依赖控制器来进行领导者选举之类的事情。而是,管理该分区的节点必须实现自我管理的Raft仲裁。


在kafka3.0的新的版本当中,使用了新的KRaft协议,使用该协议来保证在元数据仲裁中准确的复制元数据,这个协议类似于zk当中的zab协议以及类似于Raft协议,但是KRaft协议使用的是基于事件驱动的模式,与ZAB协议和Raft协议还有点不一样


在kafka3.0之前的的版本当中,主要是借助于controller来进行leader partition的选举,而在3.0协议当中,使用了KRaft来实现自己选择leader,并最终令所有节点达成共识,这样简化了controller的选举过程,效果更加高效。


(二)kakfa3 Raft


前面我们已经知道了在kafka3当中可以不用再依赖于zk来保存kafka当中的元数据了,转而使用Kafka Raft来实现元数据的一致性,简称KRaft,并且将元数据保存在kafka自己的服务器当中,大大提高了kafka的元数据管理的性能。


KRaft运行模式的Kafka集群,不会将元数据存储在Apache ZooKeeper中。即部署新集群的时候,无需部署ZooKeeper集群,因为Kafka将元数据存储在Controller节点的KRaft Quorum中。KRaft可以带来很多好处,比如可以支持更多的分区,更快速的切换Controller,也可以避免Controller缓存的元数据和Zookeeper存储的数据不一致带来的一系列问题。


在新的版本当中,控制器Controller节点我们可以自己进行指定,这样最大的好处就是我们可以自己选择一些配置比较好的机器成为Controller节点,而不像在之前的版本当中,我们无法指定哪台机器成为Controller节点,而且controller节点与broker节点可以运行在同一台机器上,并且控制器controller节点不再向broker推送更新消息,而是让Broker从这个Controller Leader节点进行拉去元数据的更新。




1.png


(三)如何查看kafka3当中的元数据信息


在kafka3当中,不再使用zk来保存元数据信息了,那么在kafka3当中如何查看元数据信息呢,我们也可以通过kafka自带的命令来进行查看元数据信息,在KRaft中,有两个命令常用命令脚本,kafka-dump-log.sh和kakfa-metadata-shell.sh需要我们来进行关注,因为我们可以通过这两个脚本来查看kafka当中保存的元数据信息。


Kafka-dump-log.sh脚本来导出元数据信息


KRaft模式下,所有的元数据信息都保存到了一个内部的topic上面,叫做@metadata,例如Broker的信息,Topic的信息等,我们都可以去到这个topic上面进行查看,我们可以通过kafka-dump-log.sh这个脚本来进行查看该topic的信息。


Kafka-dump-log.sh是一个之前就有的工具,用来查看Topic的的文件内容。这工具加了一个参数–cluster-metadata-decoder用来,查看元数据日志,如下所示:

[hadoop@bigdata01 kafka_2.12-3.1.0]$ cd /opt/install/kafka_2.12-3.1.0[hadoop@bigdata01 kafka_2.12-3.1.0]$ bin/kafka-dump-log.sh  --cluster-metadata-decoder --skip-record-metadata  --files  /opt/install/kafka_2.12-3.1.0/topiclogs/__cluster_metadata-0/00000000000000000000.index,/opt/install/kafka_2.12-3.1.0/topiclogs/__cluster_metadata-0/00000000000000000000.log  >>/opt/metadata.txt


kafka-metadata-shell.sh直接查看元数据信息


平时我们用zk的时候,习惯了用zk命令行查看数据,简单快捷。bin目录下自带了kafka-metadata-shell.sh工具,可以允许你像zk一样方便的查看数据。


使用kafka-metadata-shell.sh脚本进入kafka的元数据客户端


[hadoop@bigdata01 kafka_2.12-3.1.0]$ bin/kafka-metadata-shell.sh --snapshot /opt/install/kafka_2.12-3.1.0/topiclogs/__cluster_metadata-0/00000000000000000000.log

四、Raft算法介绍


raft算法中文版本翻译介绍:


https://github.com/maemual/raft-zh_cn/blob/master/raft-zh_cn.md


著名的CAP原则又称CAP定理的提出,真正奠基了分布式系统的诞生,CAP定理指的是在一个分布式系统中,[一致性]、[可用性](Availability)、[分区容错性](Partition tolerance),这三个要素最多只能同时实现两点,不可能三者兼顾(nosql)。


分布式系统为了提高系统的可靠性,一般都会选择使用多副本的方式来进行实现,例如hdfs当中数据的多副本,kafka集群当中分区的多副本等,但是一旦有了多副本的话,那么久面临副本之间一致性的问题,而一致性算法就是 用于解决分布式环境下多副本的数据一致性的问题。业界最著名的一致性算法就是大名鼎鼎的Paxos,但是Paxos比较晦涩难懂,不太容易理解,所以还有一种叫做Raft的算法,更加简单容易理解的实现了一致性算法。


(一)Raft协议的工作原理

  • Raft协议当中的角色分布


Raft协议将分布式系统当中的角色分为Leader(领导者),Follower(跟从者)以及Candidate(候选者)


Leader:主节点的角色,主要是接收客户端请求,并向Follower同步日志,当日志同步到过半及以上节点之后,告诉follower进行提交日志。


Follower:从节点的角色,接受并持久化Leader同步的日志,在Leader通知可以提交日志之后,进行提交保存的日志。


Candidate:Leader选举过程中的临时角色。


2.png


Raft协议当中的底层原理


Raft协议当中会选举出Leader节点,Leader作为主节点,完全负责replicate log的管理。Leader负责接受所有客户端的请求,然后复制到Follower节点,如果leader故障,那么follower会重新选举leader,Raft协议的一致性,概括主要可以分为以下三个重要部分:


Leader选举


日志复制


安全性


其中Leader选举和日志复制是Raft协议当中最为重要的。


Raft协议要求系统当中,任意一个时刻,只有一个leader,正常工作期间,只有Leader和Follower角色,并且Raft协议采用了类似网络租期的方式来进行管理维护整个集群,Raft协议将时间分为一个个的时间段(term),也叫作任期,每一个任期都会选举一个Leader来管理维护整个集群,如果这个时间段的Leader宕机,那么这一个任期结束,继续重新选举leader。


Raft算法将时间划分成为任意不同长度的任期(term)。任期用连续的数字进行表示。每一个任期的开始都是一次选举(election),一个或多个候选人会试图成为领导人。如果一个候选人赢得了选举,它就会在该任期的剩余时间担任领导人。在某些情况下,选票会被瓜分,有可能没有选出领导人,那么,将会开始另一个任期,并且立刻开始下一次选举。Raft算法保证在给定的一个任期最多只有一个领导人。


3.png


Leader选举的过程


Raft使用心跳来进行触发leader选举,当服务器启动时,初始化为follower角色。leader向所有Follower发送周期性心跳,如果Follower在选举超时间内没有收到Leader的心跳,就会认为leader宕机,稍后发起leader的选举。


每个Follower都会有一个倒计时时钟,是一个随机的值,表示的是Follower等待成为Leader的时间,倒计时时钟先跑完,就会当选成为Leader,这样做得好处就是每一个节点都有机会成为Leader。



4.png


当满足以下三个条件之一时,Quorum中的某个节点就会触发选举:


向Leader发送Fetch请求后,在超时阈值quorum.fetch.timeout.ms之后仍然没有得到Fetch响应,表示Leader疑似失败。


从当前Leader收到了EndQuorumEpoch请求,表示Leader已退位。


Candidate状态下,在超时阈值quorum.election.timeout.ms之后仍然没有收到多数票,也没有Candidate赢得选举,表示此次选举作废,重新进行选举。


具体详细过程实现描述如下:


增加节点本地的current term,切换到candidate状态。


自己给自己投一票。


给其他节点发送RequestVote RPCs,要求其他节点也投自己一票。


等待其他节点的投票回复。


整个过程中的投票过程可以用下图进行表述。


5.png


leader节点选举的限制

  • 每个节点只能投一票,投给自己或者投给别人。
  • 候选人所知道的日志信息,一定不能比自己的更少,即能被选举成为leader节点,一定包含了所有已经提交的日志。
  • 先到先得的原则


数据一致性保证(日志复制机制)


前面通过选举机制之后,选举出来了leader节点,然后leader节点对外提供服务,所有的客户端的请求都会发送到leader节点,由leader节点来调度这些并发请求的处理顺序,保证所有节点的状态一致,leader会把请求作为日志条目(Log entries)加入到他的日志当中,然后并行的向其他服务器发起AppendEntries RPC复制日志条目。当这条请求日志被成功复制到大多数服务器上面之后,Leader将这条日志应用到它的状态机并向客户端返回执行结果。


客户端的每个请求都包含被复制状态机执行的指令


leader将客户端请求作为一条心得日志添加到日志文件中,然后并行发起RPC给其他的服务器,让他们复制这条信息到自己的日志文件中保存。


如果这条日志被成功复制,也就是大部分的follower都保存好了执行指令日志,leader就应用这条日志到自己的状态机中,并返回给客户端。


如果follower宕机或者运行缓慢或者数据丢失,leader会不断地进行重试,直至所有在线的follower都成功复制了所有的日志条目。


6.png


目录
相关文章
|
2月前
|
消息中间件 Kafka Linux
Kafka【付诸实践 03】Offset Explorer Kafka 的终极 UI 工具安装+简单上手+关键特性测试(一篇学会使用 Offset Explorer)
【2月更文挑战第21天】Kafka【付诸实践 03】Offset Explorer Kafka 的终极 UI 工具安装+简单上手+关键特性测试(一篇学会使用 Offset Explorer)
234 2
|
5月前
|
消息中间件 Kafka Linux
Kafka【应用 01】Offset Explorer Kafka 的终极 UI 工具安装+简单上手+关键特性测试(一篇学会使用 Offset Explorer)
Kafka【应用 01】Offset Explorer Kafka 的终极 UI 工具安装+简单上手+关键特性测试(一篇学会使用 Offset Explorer)
226 0
|
5月前
|
消息中间件 NoSQL Kafka
初学Kafka:特性介绍
初学Kafka:特性介绍
42 1
|
10月前
|
消息中间件 存储 缓存
RabbitMQ和Kafka特性比较分析
RabbitMQ和Kafka特性比较分析
80 0
|
消息中间件 Kafka
|
消息中间件 存储 缓存
【kafka】浅谈kafka常考特性
【kafka】浅谈kafka常考特性
276 2
【kafka】浅谈kafka常考特性
|
消息中间件 存储 缓存
Kafka 3.0新特性 详解(二)
导语 | kafka3.0的版本已经试推行去zk的kafka架构了,如果去掉了zk,那么在kafka新的版本当中使用什么技术来代替了zk的位置呢,接下来我们一起来一探究竟,了解kafka的内置共识机制和raft算法
833 0
Kafka 3.0新特性 详解(二)
|
消息中间件 监控 Kafka
【干货】Kafka 事务特性分析
特性背景 消息事务是指一系列的生产、消费操作可以要么都完成,要么都失败,类似数据库的事务。这个特性在0.10.2的版本是不支持的,从0.11版本开始才支持。
1289 0
|
存储 消息中间件 Kafka
kafka0.9.0 新特性(对比0.8)
image.png 1、引入新的Consumer API 0.9.0相比0.8.2,引入了一个新的Consumer API,这个API不再使用high level和low level的基于zookeeper的client;不过仍然支持0.8.0的client。
861 0
|
消息中间件 安全 Kafka
kafka0.8--0.11各个版本特性预览介绍
kafka-0.8.2 新特性  producer不再区分同步(sync)和异步方式(async),所有的请求以异步方式发送,这样提升了客户端效率。producer请求会返回一个应答对象,包括偏移量或者错误信。
1694 0

热门文章

最新文章