Kafka Raft集群搭建

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: Kafka Raft集群搭建

简介

Apache 软件基金会发布了包含许多新特性和改进的 Kafka 3.3.1。这是第一个标志着可以在生产环境中使用 KRaft(Kafka Raft)共识协议的版本。在几年的开发过程中,它先是在 Kafka 2.8 早期访问版本中发布,然后又在 Kafka 3.0 预览版本中发布。

KRaft 是一种共识协议,可以直接在 Kafka 中管理元数据。元数据的管理被整合到了 Kafka 当中,而不需要使用像 ZooKeeper 这样的第三方工具,这大大简化了 Kafka 的架构。这种新的 KRaft 模式提高了分区的可伸缩性和弹性,同时简化了 Kafka 的部署,现在可以不依赖 ZooKeeper 单独部署 Kafka 了。

KRaft 使用了 Raft 共识算法的一种基于事件的变体,因此得名。

基础环境

服务器三台

172.16.1.1 bigdata-1

172.16.1.2 bigdata-2

172.16.1.3 bigdata-3

JDK 1.8

安装

下载

下载地址:https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz

/opt目录下解压

tar -zxvf kafka_2.12-3.4.0.tgz

安装

进入/opt/kafka_2.12-3.4.0/config/kraft目录

cd /opt/kafka_2.12-3.4.0/config/kraft

修改server.properties配置文件

process.roles=broker,controller
# 节点ID,自己设置每个节点的值要不同
node.id=1
# Controller节点配置,用于管理状态的节点(替换Zookeeper作用)
controller.quorum.voters=1@172.16.1.1:9093,2@172.16.1.2:9093,3@172.16.1.3:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
# 使用IP端口,每个节点填写自己节点的IP
advertised.listeners=PLAINTEXT://172.16.1.1:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# 数据存储位置
log.dirs=/opt/kafka/data

三台服务器配置文件都需要修改,不同的地方为

# 节点ID,自己设置每个节点的值要不同
node.id=1
# 使用IP端口,每个节点填写自己节点的IP
advertised.listeners=PLAINTEXT://172.16.1.1:9092

初始化集群

在其中一台服务器上执行下面命令生成一个uuid

> sh bin/kafka-storage.sh random-uuid
 2kBbskpoS0aYrSeJk-HVfw

用该uuid格式化kafka存储目录,三台服务器都要执行以下命令

sh bin/kafka-storage.sh format -t 2kBbskpoS0aYrSeJk-HVfw -c config/kraft/server.properties

启动集群

三台都需要启动

sh bin/kafka-server-start.sh -daemon config/kraft/server.properties

在日志中可以查看到启动成功

[2023-04-06 21:10:22,705] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)

在进程中可以看到Kafka进程

# jps
29128 Jps
28875 Kafka

验证

创建Topic

# sh bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --partitions 3 --replication-factor 3
Created topic test.

查看Topic详情

# sh bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe test  
Topic: test     TopicId: LB20VbGWTvOi7IgVqTsC3g PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: test     Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: test     Partition: 1    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
        Topic: test     Partition: 2    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2

运行之后额外添加一个数据目录

添加数据目录不能直接更改配置直接运行,直接运行会提示下面这个错误

org.apache.kafka.common.KafkaException: No `meta.properties` found in /data2 (have you run `kafka-storage.sh` to format the directory?)
        at kafka.server.BrokerMetadataCheckpoint$.$anonfun$getBrokerMetadataAndOfflineDirs$2(BrokerMetadataCheckpoint.scala:179)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at kafka.server.BrokerMetadataCheckpoint$.getBrokerMetadataAndOfflineDirs(BrokerMetadataCheckpoint.scala:168)
        at kafka.server.KafkaRaftServer$.initializeLogDirs(KafkaRaftServer.scala:141)
        at kafka.server.KafkaRaftServer.<init>(KafkaRaftServer.scala:57)
        at kafka.Kafka$.buildServer(Kafka.scala:83)
        at kafka.Kafka$.main(Kafka.scala:91)
        at kafka.Kafka.main(Kafka.scala)

需要先执行下面这个命令,重新加载配置的目录,然后再启动kafka服务

cluster-id 在原配置的data目录下 meta.properties 文件中

--ignore-formatted 命令会跳过之前已经创建的目录

> bin/kafka-storage.sh format --ignore-formatted --config /data/kafka_2.12-3.5.1/config/kraft/server.properties --cluster-id Bu0LsdAcTc2mZbecpe6fUg

Formatting /data2 with metadata.version 3.5-IV2.
目录
相关文章
|
5月前
|
消息中间件 安全 Kafka
2024年了,如何更好的搭建Kafka集群?
我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.6.1来搭建集群。
1005 2
2024年了,如何更好的搭建Kafka集群?
|
5月前
|
消息中间件 存储 数据可视化
kafka高可用集群搭建
kafka高可用集群搭建
115 0
|
5月前
|
消息中间件 Kafka Linux
Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群
Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群
130 0
|
2月前
|
消息中间件 Kafka 测试技术
【Kafka揭秘】Leader选举大揭秘!如何打造一个不丢失消息的强大Kafka集群?
【8月更文挑战第24天】Apache Kafka是一款高性能分布式消息系统,利用分区机制支持数据并行处理。每个分区含一个Leader处理所有读写请求,并可有多个副本确保数据安全与容错。关键的Leader选举机制保障了系统的高可用性和数据一致性。选举发生于分区创建、Leader故障或被手动移除时。Kafka提供多种选举策略:内嵌机制自动选择最新数据副本为新Leader;Unclean选举快速恢复服务但可能丢失数据;Delayed Unclean选举则避免短暂故障下的Unclean选举;Preferred选举允许基于性能或地理位置偏好指定特定副本为首选Leader。
45 5
|
2月前
|
消息中间件 监控 Java
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
|
2月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
38 2
|
5月前
|
消息中间件 数据可视化 关系型数据库
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
193 0
|
3月前
|
消息中间件 Kafka
kafka 集群环境搭建
kafka 集群环境搭建
61 8
|
2月前
|
消息中间件 Java Kafka
Linux——Kafka集群搭建
Linux——Kafka集群搭建
38 0
|
2月前
|
消息中间件 Kafka Apache
部署安装kafka集群
部署安装kafka集群

相关实验场景

更多
下一篇
无影云桌面