2024年了,如何更好的搭建Kafka集群?

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
简介: 我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.6.1来搭建集群。

Kafka的Kraft模式简单来说就是基于raft协议重新实现了zookeeper的功能。传统的zookeeper集群已经被标记为弃用,将在kafka4.0中完全移除。由于去掉了zk组件,部署也简化了不少。我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.6.1来搭建集群。

Kafka集群概念介绍

Kraft模式集群由两种角色的节点组成,分别是broker和controller角色。角色类型在节点启动时通过process.roles配置参数指定,允许指定brokercontroller,或者同时指定二者,可就是一个节点兼顾两种角色。

  1. broker类型节点的职责是为客户端提供服务,即接收生产者消息,并推送给消费者。还可以与其它broker之间同步副本消息。broker内就是我们熟悉的topic分区副本等结构了。
  2. controller类型的节点可以参与Leader选举,有投票权和被投票权,只有被选举称为Leader节点时才能为集群提供服务。一个集群中只能有一个Leader节点,如果出现多个Leader的异常情况通常被称为脑裂,这不是我们关注的主题。Leader节点负责维护整个集群的元数据与调度broker协同工作,例如创建删除topic、分区重分配、preferred leader选举、topic分区拓展、broker上线下线等。

Kafka集群带来的好处是允许横向扩展broker节点,并且在线就可以完成扩展。

集群拓扑图

我们在宿主机上搭建5个节点的Kafka集群,集群由两个[broker]节点,两个[broker,controller]节点和一个[controller]节点组成,并且使用Docker容器来部署这五个节点。

image.png

我们规划了三种流量路径:

  1. 外部流量 EXTERNAL://:19092,允许外部客户端(生产者、消费者、集群管理工具都算是外部客户端)连接到集群。我们将容器中的端口映射出来,由于集群部署在单机单网卡电脑上,所有端口不允许重复,我们暴漏出4各端口:19092、29092、39092、49092
  2. 控制流量 CONTROLLER://:9093,我们为控制节点间的通信设置了单独的端口9093,以防数据流量过载造成控制信息下发延迟或失败。
  3. 内部流量 INTERNAL://:9092,broker之间同步副本数据走内部端口9092。

实验软件与版本说明

kafka: 3.6.1
docker: 25.0.1
docker compose: v2.24.2

参数解释

如果你的Docker环境已经备好,那么直接将下面的内容复制到yaml,稍作修改就能快速搭建起Kafka集群。

# docker-compose.yaml
services:
  kafka1:
    image: 'bitnami/kafka:3.6.1'
    ports:
      - '19092:19092'
    environment:
      - KAFKA_KRAFT_CLUSTER_ID=EX5bq5NfRe2IX1nhxrSO6g
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_CFG_PROCESS_ROLES=broker
      - KAFKA_CFG_LISTENERS=INTERNAL://:9092, EXTERNAL://:19092
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka1:9092, EXTERNAL://192.168.56.103:19092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT, CONTROLLER:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=3@kafka3:9093, 4@kafka4:9093, 5@kafka5:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
    networks:
      - kfk-network
  kafka2:
    image: 'bitnami/kafka:3.6.1'
    ports:
      - '29092:19092'
    environment:
      - KAFKA_KRAFT_CLUSTER_ID=EX5bq5NfRe2IX1nhxrSO6g
      - KAFKA_CFG_NODE_ID=2
      - KAFKA_CFG_PROCESS_ROLES=broker
      - KAFKA_CFG_LISTENERS=INTERNAL://:9092, EXTERNAL://:19092
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka2:9092, EXTERNAL://192.168.56.103:29092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT, CONTROLLER:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=3@kafka3:9093, 4@kafka4:9093, 5@kafka5:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
    networks:
      - kfk-network
  kafka3:
    image: 'bitnami/kafka:3.6.1'
    ports:
      - '39092:19092'
    environment:
      - KAFKA_KRAFT_CLUSTER_ID=EX5bq5NfRe2IX1nhxrSO6g
      - KAFKA_CFG_NODE_ID=3
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_LISTENERS=INTERNAL://:9092, EXTERNAL://:19092, CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka3:9092, EXTERNAL://192.168.56.103:39092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT, CONTROLLER:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=3@kafka3:9093, 4@kafka4:9093, 5@kafka5:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
    networks:
      - kfk-network
  kafka4:
    image: 'bitnami/kafka:3.6.1'
    ports:
      - '49092:19092'
    environment:
      - KAFKA_KRAFT_CLUSTER_ID=EX5bq5NfRe2IX1nhxrSO6g
      - KAFKA_CFG_NODE_ID=4
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_LISTENERS=INTERNAL://:9092, EXTERNAL://:19092, CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka4:9092, EXTERNAL://192.168.56.103:49092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT, CONTROLLER:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=3@kafka3:9093, 4@kafka4:9093, 5@kafka5:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
    networks:
      - kfk-network
  kafka5:
    image: 'bitnami/kafka:3.6.1'
    ports:
      - '59092:19092'
    environment:
      - KAFKA_KRAFT_CLUSTER_ID=EX5bq5NfRe2IX1nhxrSO6g
      - KAFKA_CFG_NODE_ID=5
      - KAFKA_CFG_PROCESS_ROLES=controller
      - KAFKA_CFG_LISTENERS=CONTROLLER://:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT, CONTROLLER:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=3@kafka3:9093, 4@kafka4:9093, 5@kafka5:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    networks:
      - kfk-network
networks:
  kfk-network:
    name: kfk-network
    external: true

我这边宿主机IP是192.168.56.103,你需要将它改为你IP地址,执行下面指令应该就能启动成功:

# 创建网络
docker network create kfk-network
# 拉取镜像
docker compose pull
# 运行容器
docker compose up -d

如果你的运行过程中遇到问题可以欢迎留言讨论。

我们不但要把集群运行起来,还要理解这些参数的含义。上面的compose文件中使用环境变量作为启动参数传递给Kafka,与直接修改配置文件效果一样。在Kraft模式下,Kafka有如下三个配置文件可以配置。

  • config/kraft/broker.properties 当节点作为[broker]角色运行时,会去读该文件中的配置项启动。
  • config/kraft/controller.properties 当节点作为[controller]角色运行时,会去读该文件中的配置项启动。
  • config/kraft/server.properties 当节点同时作为[broker, controller]角色运行时,会去读该文件中的配置项启动。

接下来我们来看看这些文件内常用的配置项:

  • KAFKA_KRAFT_CLUSTER_ID:配置集群ID,这个配置项比较特殊,并未在上面列举的配置文件中,而是位于一个自动生成的配置文件meta.properties中,位置不固定,具体位置由bin/kafka-storage.sh命令执行完打印的参数决定。该配置文件中定义了cluster.id,即同一个集群中的所有节点都应该指定相同的集群ID。
  • KAFKA_CFG_NODE_ID:配置节点ID,同一集群内节点ID不允许重复。
  • KAFKA_CFG_PROCESS_ROLES:指定节点的角色,允许指定brokercontroller,或者同时指定二者,可就是一个节点兼顾两种角色。
  • KAFKA_CFG_LISTENERS:列出节点监听器,通常不同的角色会监听不同的端口,它的格式是{LISTENER_NAME}://{hostname}:{port},要搞懂它的配置策略你可以阅读一文搞懂Kafka中的listeners配置策略
  • KAFKA_CFG_ADVERTISED_LISTENERS:listeners选项配置了以怎样的协议监听某个端口。这个配置项类似防火墙的作用,对外公开允许它们使用怎样的IP和端口访问集群,这里外部指的是客户端(包括生产者、消费者、管理脚本等)或其它broker节点。
  • KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:配置监听器对应的安全协议。
  • KAFKA_CFG_CONTROLLER_QUORUM_VOTERS:指定集群内所有的controller节点,格式为:nodeId@ip:port。
  • KAFKA_CFG_CONTROLLER_LISTENER_NAMES:指定controller监听器名称列表。
  • KAFKA_CFG_INTER_BROKER_LISTENER_NAME:指定其它broker节点与本节点通信的监听器。

由上面这些配置可以看出,凡是以KAFKA_CFG_开头的环境变量,在配置kraft配置文件中都有对应的配置项。例如KAFKA_CFG_NODE_ID在配置文件中就是node.id

测试生产消费

# 创建主题
bin/kafka-topics.sh --create --topic test-tip --bootstrap-server=192.168.56.103:19092
# 往cssyy主题推送消息
bin/kafka-console-producer.sh --bootstrap-server=192.168.56.103:19092 --topic cssyy
>程序饲养员放假啦
>程序饲养员上班啦
# 从头开始消费cssyy主题消息
bin/kafka-console-consumer.sh --bootstrap-server=192.168.56.103:19092 --topic cssyy --from-beginning
程序饲养员放假啦
程序饲养员上班啦

查看集群当前状态参数

# 查看集群信息
bin/kafka-metadata-quorum.sh --bootstrap-server 192.168.56.103:19092:19092 desbe --status
ClusterId:              EX5bq5NfRe2IX1nhxrSO6g
LeaderId:               4
LeaderEpoch:            1
HighWatermark:          2817
MaxFollowerLag:         0
MaxFollowerLagTimeMs:   341
CurrentVoters:          [3,4,5]
CurrentObservers:       [1,2]

# 查看节点复制信息
bin/kafka-metadata-quorum.sh --bootstrap-server 192.168.56.103:19092:19092 describe --replication
NodeId  LogEndOffset    Lag     LastFetchTimestamp      LastCaughtUpTimestamp   Status
4       2959            0       1706702371530           1706702371530           Leader
3       2959            0       1706702371114           1706702371114           Follower
5       2959            0       1706702371114           1706702371114           Follower
1       2959            0       1706702371112           1706702371112           Observer
2       2959            0       1706702371113           1706702371113           Observer

最好的办法是有个图形化的工具来查看这些信息,Kafka免费的工具实在没有好用的,也就是Offset Explorer还勉强能用,如果你有更好用的工具可以分享一下。

image.png

相关文章
|
18天前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
50 4
|
1月前
|
消息中间件 监控 数据可视化
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
51 2
|
16天前
|
消息中间件 存储 Prometheus
Kafka集群如何配置高可用性
Kafka集群如何配置高可用性
|
1月前
|
消息中间件 分布式计算 监控
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
61 6
|
6月前
|
消息中间件 数据可视化 关系型数据库
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
199 0
|
3月前
|
消息中间件 Kafka 测试技术
【Kafka揭秘】Leader选举大揭秘!如何打造一个不丢失消息的强大Kafka集群?
【8月更文挑战第24天】Apache Kafka是一款高性能分布式消息系统,利用分区机制支持数据并行处理。每个分区含一个Leader处理所有读写请求,并可有多个副本确保数据安全与容错。关键的Leader选举机制保障了系统的高可用性和数据一致性。选举发生于分区创建、Leader故障或被手动移除时。Kafka提供多种选举策略:内嵌机制自动选择最新数据副本为新Leader;Unclean选举快速恢复服务但可能丢失数据;Delayed Unclean选举则避免短暂故障下的Unclean选举;Preferred选举允许基于性能或地理位置偏好指定特定副本为首选Leader。
69 5
|
3月前
|
消息中间件 监控 Java
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
|
3月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
70 2
|
4月前
|
消息中间件 Kafka
kafka 集群环境搭建
kafka 集群环境搭建
69 8
|
3月前
|
消息中间件 Java Kafka
Linux——Kafka集群搭建
Linux——Kafka集群搭建
48 0