Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群


概述

对于kafka来说,一个单独的broker意味着kafka集群中只有一个节点。要想增加kafka集群中的节点数量,只需要多启动几个broker实例即可。

单个节点的安装: Kafka-初体验Kafka(02)-Centos7下搭建单节点kafka_2.11‐1.1.0

这里我们来搭建个3个节点的kafka集群来体验下吧


部署信息

192.168.18.130 —> kafka 、 zookeeper(单节点的zk)

192.168.18.131 —> kafka

192.168.18.132 —> kafka

单节点的 zk , 部署上 130上,事实上生产环境的话,zk也是要搭建集群的,这里演示用的话,用单个节点的zk先。

3个节点的kafka注册到 单节点的zk上。


配置信息

kafka的配置文件主要是配置文件 server.properties

130

[root@artisan config]# grep -Ev '^$|^[#;]' server.properties 
broker.id=0
listeners=PLAINTEXT://192.168.18.130:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.18.130:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
[root@artisan config]# 


131

[root@artisan config]#  grep -Ev '^$|^[#;]' server.properties
broker.id=1
listeners=PLAINTEXT://192.168.18.131:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.18.130:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.de


132

[root@artisan config]#  grep -Ev '^$|^[#;]' server.properties
broker.id=2
listeners=PLAINTEXT://192.168.18.132:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.18.130:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
[root@artisan config]# 

还有一篇博主写的 zk也是高可用的,可以参考下,写的很不错 ZooKeeper+Kafka 高可用集群搭建


验证

启动zookeeper 和 3个 kafka 后,我们创建一个新的topic,副本数设置为3,分区数设置为2

[root@artisan bin]# ./kafka-topics.sh --create --zookeeper 192.168.18.130:2181 --replication-factor 3 --partitions 2 --topic artisan-replicated-topic
Created topic "artisan-replicated-topic".
[root@artisan bin]# 

查看topic的情况

[root@artisan bin]# ./kafka-topics.sh --describe --zookeeper 192.168.18.130:2181 --topic artisan-replicated-topic
Topic:artisan-replicated-topic  PartitionCount:2  ReplicationFactor:3 Configs:
  Topic: artisan-replicated-topic Partition: 0  Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
  Topic: artisan-replicated-topic Partition: 1  Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
[root@artisan bin]# 

我们来解释下上面的输出内容

第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。

所有分区的概要信息:

Topic:artisan-replicated-topic  PartitionCount:2  ReplicationFactor:3 Configs

每一个partition的信息:

Topic: artisan-replicated-topic Partition: 0  Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: artisan-replicated-topic Partition: 1  Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
  • leader节点负责给定partition的所有读写请求。
  • replicas 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出
  • isr 是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。

我们可以运行相同的命令查看之前创建的名称为”artisan“的topic

[root@artisan bin]# ./kafka-topics.sh --describe --zookeeper 192.168.18.130:2181 --topic artisan
Topic:artisan PartitionCount:1  ReplicationFactor:1 Configs:
  Topic: artisan  Partition: 0  Leader: 0 Replicas: 0 Isr: 0
[root@artisan bin]# 

之前设置了topic的partition数量为1,备份因子为1,因此显示就如上所示了。当然我们也可以通过如下命令增加topic的分区数量(目前kafka不支持减少分区):

[root@artisan bin]# ./kafka-topics.sh --alter --partitions 3 --zookeeper 192.168.18.130:2181 --topic artisan
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

重新查看

[root@artisan bin]# ./kafka-topics.sh --describe --zookeeper 192.168.18.130:2181 --topic artisan
Topic:artisan PartitionCount:3  ReplicationFactor:1 Configs:
  Topic: artisan  Partition: 0  Leader: 0 Replicas: 0 Isr: 0
  Topic: artisan  Partition: 1  Leader: 1 Replicas: 1 Isr: 1
  Topic: artisan  Partition: 2  Leader: 2 Replicas: 2 Isr: 2
[root@artisan bin]# 

现在我们向新建的 artisan-replicated-topic 中发送一些message,kafka集群可以加上所有kafka节点:

[root@artisan bin]# ./kafka-console-producer.sh --broker-list 192.168.18.130:9092,192.168.18.131:9092,192.168.18.132:9092 --topic  artisan-replicated-topic
>artisan message test 1
>artisan message test 2
>artisan message test 3
>

现在131broker 开始消费:

[root@artisan bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.18.131:9092 --from-beginning --topic artisan-replicated-topic
artisan message test 1
artisan message test 3
artisan message test 2

现在我们来测试我们 容错性 ,因为broker1目前是artisan-replicated-topic的分区0的 leader,所以我们要将其kill , kill 掉 131 节点的 kafka

再执行

[root@artisan bin]# ./kafka-topics.sh --describe --zookeeper 192.168.18.130:2181 --topic artisan-replicated-topic
Topic:artisan-replicated-topic  PartitionCount:2  ReplicationFactor:3 Configs:
  Topic: artisan-replicated-topic Partition: 0  Leader: 2 Replicas: 1,2,0 Isr: 0,2
  Topic: artisan-replicated-topic Partition: 1  Leader: 2 Replicas: 2,0,1 Isr: 2,0
[root@artisan bin]# 

我们可以看到,分区0的leader节点已经变成了broker 2。要注意的是,在Isr中,已经没有了1号节点。leader的选举也是从ISR(in-sync replica)中进行的。

131 重新起来以后,还是可以消费消息

[root@artisan bin]# ./kafka-server-start.sh -daemon ../config/server.properties 
[root@artisan bin]# jps
24436 Kafka
24455 Jps
[root@artisan bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.18.131:9092 --from-beginning --topic artisan-replicated-topic
artisan message test 1
artisan message test 3
artisan message test 2

再看下

[root@artisan bin]# ./kafka-topics.sh --describe --zookeeper 192.168.18.130:2181 --topic artisan-replicated-topic
Topic:artisan-replicated-topic  PartitionCount:2  ReplicationFactor:3 Configs:
  Topic: artisan-replicated-topic Partition: 0  Leader: 1 Replicas: 1,2,0 Isr: 0,2,1
  Topic: artisan-replicated-topic Partition: 1  Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
[root@artisan bin]# 
[root@artisan bin]# 


相关文章
|
2天前
|
消息中间件 Java Kafka
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
本文介绍了Kafka集群的搭建过程,涵盖从虚拟机安装到集群测试的详细步骤。首先规划了集群架构,包括三台Kafka Broker节点,并说明了分布式环境下的服务进程配置。接着,通过VMware导入模板机并克隆出三台虚拟机(kafka-broker1、kafka-broker2、kafka-broker3),分别设置IP地址和主机名。随后,依次安装JDK、ZooKeeper和Kafka,并配置相应的环境变量与启动脚本,确保各组件能正常运行。最后,通过编写启停脚本简化集群的操作流程,并对集群进行测试,验证其功能完整性。整个过程强调了自动化脚本的应用,提高了部署效率。
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
|
26天前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
63 5
|
6天前
|
消息中间件 存储 Kafka
2024最全Kafka集群方案汇总
Apache Kafka 是一个高吞吐量、可扩展、可靠的分布式消息系统,广泛应用于数据驱动的应用场景。Kafka 支持集群架构,具备高可用性和容错性。其核心组件包括 Broker(服务器实例)、Topic(消息分类)、Partition(有序消息序列)、Producer(消息发布者)和 Consumer(消息消费者)。每个分区有 Leader 和 Follower,确保数据冗余和高可用。Kafka 2.8+ 引入了不依赖 Zookeeper 的 KRaft 协议,进一步简化了集群管理。常见的集群部署方案包括单节点和多节点集群,后者适用于生产环境以确保高可用性。
13 0
|
28天前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
43 1
|
1月前
|
消息中间件 存储 Prometheus
Kafka集群如何配置高可用性
Kafka集群如何配置高可用性
|
1月前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
13天前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
296 33
The Past, Present and Future of Apache Flink
|
2月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
851 13
Apache Flink 2.0-preview released
|
2月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
92 3
|
3月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。