【Kakfa集群】

简介: 【Kakfa集群】


本文内容:


对于kafka来说,一个单独的broker意味着kafka集群中只有一个节点。要想增加kafka集群中的节点数量,只需要多启动几个broker实例即可。为了有更好的理解,现在我们在一台机器上同时启动三个broker实例。

集群搭建

首先,我们需要建立好其他2个broker的配置文件:

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

配置文件的需要修改的内容分别如下:

config/server-1.properties:

#broker.id属性在kafka集群中必须要是唯一
broker.id=1
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://106.14.132.94:9093   
log.dir=/usr/local/data/kafka-logs-1
#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=106.14.132.94:2181

config/server-2.properties:

broker.id=2
listeners=PLAINTEXT://106.14.132.94:9094
log.dir=/usr/local/data/kafka-logs-2
zookeeper.connect=106.14.132.94:2181

目前我们已经有一个zookeeper实例和一个broker实例在运行了,现在我们只需要在启动2个broker实例即可:

bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties

查看zookeeper确认集群节点是否都注册成功:

ls /brokers/ids

现在我们创建一个新的topic,副本数设置为3,分区数设置为2:

bin/kafka-topics.sh --create --zookeeper 106.14.132.94:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic

查看下topic的情况

 bin/kafka-topics.sh --describe --zookeeper 106.14.132.94:2181 --topic my-replicated-topic

以下是输出内容的解释,第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。

leader节点负责给定partition的所有读写请求,同一个主题不同分区leader副本一般不一样(为了容灾)

replicas 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。

isr 是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。

现在我们向新建的 my-replicated-topic 中发送一些message,kafka集群可以加上所有kafka节点:

bin/kafka-console-producer.sh --broker-list 106.14.132.94:9092,106.14.132.94:9093,106.14.132.94:9094 --topic my-replicated-topic

my test msg 1

my test msg 2

现在开始消费:

bin/kafka-console-consumer.sh --bootstrap-server 106.14.132.94:9092,106.14.132.94:9093,106.14.132.94:9094 --from-beginning --topic my-replicated-topic

my test msg 1

my test msg 2

现在我们来测试我们容错性,因为broker1目前是my-replicated-topic的分区0的leader,所以我们要将其kill

ps -ef | grep server.properties
kill 14776

现在再执行命令:

bin/kafka-topics.sh --describe --zookeeper 106.14.132.94:9092 --topic my-replicated-topic

我们可以看到,分区0的leader节点已经变成了broker 0。要注意的是,在Isr中,已经没有了1号节点。leader的选举也是从ISR(in-sync replica)中进行的。

此时,我们依然可以 消费新消息:

bin/kafka-console-consumer.sh --bootstrap-server 106.14.132.94:9092,106.14.132.94:9093,106.14.132.94:9094 --from-beginning --topic my-replicated-topic

my test msg 1

my test msg 2

查看主题分区对应的leader信息:

get /brokers/topics/my-relicated-topic/partitions/0/state

kafka将很多集群关键信息记录在zookeeper里,保证自己的无状态,从而在水平扩容时非常方便。

集群消费

log的partitions分布在kafka集群中不同的broker上,每个broker可以请求备份其他broker上partition上的数据。kafka集群支持配置一个partition备份的数量。针对每个partition,都有一个broker起到“leader”的作用,0个或多个其他的broker作为“follwers”的作用。leader处理所有的针对这个partition的读写请求,而followers被动复制leader的结果,不提供读写(主要是为了保证多副本数据与消费的一致性)。如果这个leader失效了,其中的一个follower将会自动的变成新的leader。

Producers

生产者将消息发送到topic中去,同时负责选择将message发送到topic的哪一个partition中。通过round-robin做简单的负载均衡。也可以根据消息中的某一个关键字来进行区分。通常第二种方式使用的更多。

Consumers

传统的消息传递模式有2种:队列( queue) 和(publish-subscribe)

queue模式:多个consumer从服务器中读取数据,消息只会到达一个consumer。

publish-subscribe模式:消息会被广播给所有的consumer。

Kafka基于这2种模式提供了一种consumer的抽象概念:消费组(consumer group)。

queue模式:所有的consumer都位于同一个consumer group 下。

publish-subscribe模式:所有的consumer都有着自己唯一的consumer group。

上图说明:由2个broker组成的kafka集群,某个主题总共有4个partition(P0-P3),分别位于不同的broker上。这个集群由2个Consumer Group消费, A有2个consumer instances ,B有4个。通常一个topic会有几个consumer group,每个consumer group都是一个逻辑上的订阅者( logical subscriber )。每个consumer group由多个consumer instance组成,从而达到可扩展和容灾的功能。

总结

以上就是今天要讲的内容,还希望各位读者大大能够在评论区积极参与讨论,给文章提出一些宝贵的意见或者建议📝,合理的内容,我会采纳更新博文,重新分享给大家。

相关文章
|
消息中间件 存储 数据可视化
kafka高可用集群搭建
kafka高可用集群搭建
375 0
|
监控 时序数据库
Telegraf+Influxdb+Chronograf+Kapacitor主机性能监控告警
一.简述 通过TICK(Telegraf+Influxdb+Chronograf+Kapacitor)进行主机性能监控告警,职责描述如下: Telegraf的职能是数据采集,用于主机性能数据,包括主机CPU、内存、IO、进程状态、服务状态等 Influxdb的职能是时序数据库,用于存储Teleg.
5360 0
|
5月前
|
机器学习/深度学习 人工智能 自然语言处理
47_历史里程碑:从ELIZA到Transformer
在当今的数字时代,我们已经习惯于与智能助手对话、向大语言模型提问,甚至依赖它们生成创意内容。然而,这看似理所当然的人机对话能力,实际上经历了长达半个多世纪的曲折发展历程。从1966年麻省理工学院的简陋程序,到2017年Google提出的革命性架构,聊天AI的演变不仅是技术的进步,更是人类对自身语言本质探索的缩影。
|
消息中间件 安全 Kafka
Kafka启动后需要开放什么端口?
Kafka启动后需要开放什么端口?
4803 7
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之如何通过savepoint恢复Flink CDC任务
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
关系型数据库 MySQL
Mysql any、some、in、all、exists 关键字
Mysql any、some、in、all、exists 关键字
380 0
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之如何从checkpoint启动
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
Kubernetes 网络协议 网络虚拟化
WireGuard 系列文章(九):基于 K3S+WireGuard+Kilo 搭建跨多云的统一 K8S 集群
WireGuard 系列文章(九):基于 K3S+WireGuard+Kilo 搭建跨多云的统一 K8S 集群
主机状态监控,通过top命令查看CPU、内存使用情况,ctrl + c退出,输入top整个页面就变成一个任务管理器的形式了,Ctrl + C直接退出,Q也可以退掉了
主机状态监控,通过top命令查看CPU、内存使用情况,ctrl + c退出,输入top整个页面就变成一个任务管理器的形式了,Ctrl + C直接退出,Q也可以退掉了
|
存储 分布式数据库 Apache
【Flink】Flink的Checkpoint 存在哪里?
【4月更文挑战第19天】【Flink】Flink的Checkpoint 存在哪里?

热门文章

最新文章