简介
Apache 软件基金会发布了包含许多新特性和改进的 Kafka 3.3.1。这是第一个标志着可以在生产环境中使用 KRaft(Kafka Raft)共识协议的版本。在几年的开发过程中,它先是在 Kafka 2.8 早期访问版本中发布,然后又在 Kafka 3.0 预览版本中发布。
KRaft 是一种共识协议,可以直接在 Kafka 中管理元数据。元数据的管理被整合到了 Kafka 当中,而不需要使用像 ZooKeeper 这样的第三方工具,这大大简化了 Kafka 的架构。这种新的 KRaft 模式提高了分区的可伸缩性和弹性,同时简化了 Kafka 的部署,现在可以不依赖 ZooKeeper 单独部署 Kafka 了。
KRaft 使用了 Raft 共识算法的一种基于事件的变体,因此得名。
基础环境
服务器三台
172.16.1.1 bigdata-1
172.16.1.2 bigdata-2
172.16.1.3 bigdata-3
JDK 1.8
安装
下载
下载地址:https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz
在/opt
目录下解压
tar -zxvf kafka_2.12-3.4.0.tgz
安装
进入/opt/kafka_2.12-3.4.0/config/kraft
目录
cd /opt/kafka_2.12-3.4.0/config/kraft
修改server.properties
配置文件
process.roles=broker,controller
# 节点ID,自己设置每个节点的值要不同
node.id=1
# Controller节点配置,用于管理状态的节点(替换Zookeeper作用)
controller.quorum.voters=1@172.16.1.1:9093,2@172.16.1.2:9093,3@172.16.1.3:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
# 使用IP端口,每个节点填写自己节点的IP
advertised.listeners=PLAINTEXT://172.16.1.1:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# 数据存储位置
log.dirs=/opt/kafka/data
三台服务器配置文件都需要修改,不同的地方为
# 节点ID,自己设置每个节点的值要不同
node.id=1
# 使用IP端口,每个节点填写自己节点的IP
advertised.listeners=PLAINTEXT://172.16.1.1:9092
初始化集群
在其中一台服务器上执行下面命令生成一个uuid
> sh bin/kafka-storage.sh random-uuid
2kBbskpoS0aYrSeJk-HVfw
用该uuid格式化kafka
存储目录,三台服务器都要执行以下命令
sh bin/kafka-storage.sh format -t 2kBbskpoS0aYrSeJk-HVfw -c config/kraft/server.properties
启动集群
三台都需要启动
sh bin/kafka-server-start.sh -daemon config/kraft/server.properties
在日志中可以查看到启动成功
[2023-04-06 21:10:22,705] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)
在进程中可以看到Kafka进程
# jps
29128 Jps
28875 Kafka
验证
创建Topic
# sh bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --partitions 3 --replication-factor 3
Created topic test.
查看Topic详情
# sh bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe test
Topic: test TopicId: LB20VbGWTvOi7IgVqTsC3g PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: test Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
运行之后额外添加一个数据目录
添加数据目录不能直接更改配置直接运行,直接运行会提示下面这个错误
org.apache.kafka.common.KafkaException: No `meta.properties` found in /data2 (have you run `kafka-storage.sh` to format the directory?)
at kafka.server.BrokerMetadataCheckpoint$.$anonfun$getBrokerMetadataAndOfflineDirs$2(BrokerMetadataCheckpoint.scala:179)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at kafka.server.BrokerMetadataCheckpoint$.getBrokerMetadataAndOfflineDirs(BrokerMetadataCheckpoint.scala:168)
at kafka.server.KafkaRaftServer$.initializeLogDirs(KafkaRaftServer.scala:141)
at kafka.server.KafkaRaftServer.<init>(KafkaRaftServer.scala:57)
at kafka.Kafka$.buildServer(Kafka.scala:83)
at kafka.Kafka$.main(Kafka.scala:91)
at kafka.Kafka.main(Kafka.scala)
需要先执行下面这个命令,重新加载配置的目录,然后再启动kafka服务
cluster-id 在原配置的data目录下 meta.properties 文件中
--ignore-formatted 命令会跳过之前已经创建的目录
> bin/kafka-storage.sh format --ignore-formatted --config /data/kafka_2.12-3.5.1/config/kraft/server.properties --cluster-id Bu0LsdAcTc2mZbecpe6fUg
Formatting /data2 with metadata.version 3.5-IV2.