一起来学kafka之Kafka集群搭建

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 前言目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~本节给大家讲一下Kafka的一些核心概念以及如何利用docker快速的搭建Kafka集群~好了, 废话不多说直接开整吧~什么是 KafkaKafka是一种高吞吐量、分布式、可扩展的消息中间件系统,最初由LinkedIn公司开发。随着不断的发展,在最新的版本中它定义为分布式的流处理平台,现在在大数据应用中也是十分广泛。

前言

目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~

本节给大家讲一下Kafka的一些核心概念以及如何利用docker快速的搭建Kafka集群~

好了, 废话不多说直接开整吧~

什么是 Kafka

Kafka是一种高吞吐量分布式可扩展消息中间件系统,最初由LinkedIn公司开发。随着不断的发展,在最新的版本中它定义为分布式的流处理平台,现在在大数据应用中也是十分广泛。

它可以处理大量的实时数据流,被广泛应用于日志收集、事件处理、流处理、消息队列等场景。

Kafka的架构包含producer(生产者)、consumer(消费者)、broker(代理服务器)等组件。生产者可以将消息发送到Kafka集群消费者可以从Kafka集群订阅消息并进行处理,而broker则是消息的中转服务器,负责存储和转发消息。

Kafka的特点包括:

  • 高吞吐量:Kafka可以处理海量的数据流,支持每秒百万级别的消息处理。
  • 可扩展性:Kafka的集群可以根据需要进行水平扩展,从而提高系统的性能和容量。
  • 可靠性:Kafka支持多副本机制,可以保证数据的可靠性和高可用性。
  • 灵活性:Kafka支持多种消息格式和协议,可以与各种系统和工具进行集成。

Kafka是一个开源的项目,已经成为了Apache软件基金会的顶级项目.

Kafka & 核心概念

接着,我们看下它的核心概念,这些概念都很重要,在后边的学习中都会遇到,概念一定要搞明白,对于理解Kafka的工作原理和使用方法非常重要。不然学习起来比较懵, 下面一起看一下核心概念:

Topic

Topic是消息的逻辑容器,用于对消息进行分类和存储。在Kafka中,消息会被发布到指定的topic中,并且可以被一个或多个消费者订阅Topic是Kafka的核心概念之一,是实现消息传递的基础。

Producer

Producer是消息的生产者,用于向指定的topic中发送消息。Producer负责将消息发送到Kafka集群中的broker节点,并且可以在发送消息时指定消息的key,以便Kafka将消息分配到指定的partition中。

Consumer

Consumer是消息的消费者,用于从指定的topic中接收消息。Consumer负责从Kafka集群中的broker节点获取消息,并且可以指定从哪个partition中获取消息。消费者可以以不同的方式进行消息消费,例如批量消费、轮询消费等。

Broker

BrokerKafka集群中的一个节点,用于存储和管理消息。Broker是Kafka的核心组件之一,负责接收和处理生产者发送的消息,并将其存储到磁盘中,同时还负责将消息转发给消费者

Partition

PartitionKafka中实现数据分片的机制,一个topic可以被分成多个partition,每个partition都是一个有序消息队列。消息在被发送到一个topic时,会被根据指定的key进行hash计算,然后被分配到对应的partition中。

Offset

Offset是Kafka中的一个重要概念,用于标识每个消息在一个partition中的位置。每个partition都有一个唯一的offset值,消费者可以根据offset来获取指定位置的消息。Kafka还提供了一种特殊的topic,称为__consumer_offsets,用于存储消费者消费的位置信息

Kafka & 主要架构

如图:

                    +---------+        +---------+
                    |Producer |        |Consumer |
                    +---------+        +---------+
                         |                   |
                         |                   |
                    +---------+        +---------+
                    |  Broker |        |  Broker |
                    +---------+        +---------+
                     /          \       /         \
                    /            \     /           \
          +---------+   +---------+ +---------+   +---------+
          |Partition|   |Partition| |Partition|   |Partition|
          +---------+   +---------+ +---------+   +---------+
            /     \        /     \       /     \       /     \
           /       \      /       \     /       \     /       \
   +----------+  +----------+ +----------+ +----------+ +----------+
   |Replica   |  |Replica   | |Replica   | |Replica   | |Replica   |
   +----------+  +----------+ +----------+ +----------+ +----------+
        Leader          Follower      Follower      Follower      Follower
            |                   |            |           |
            |                   |            |           |
          Write               Read         Read       Read
            |                   |            |           |
            +--------+----------+------------+-----------+
                     |                       |
                     |                       |
                 +---------+            +---------+
                 |  Disk   |            |  Memory |
                 +---------+            +---------+

在这个流程图中,主要有以下几个流程:

  • Producer将消息发送到Broker节点,Broker将消息存储到对应的Partition中。
  • 每个Partition可以有多个Replica,其中一个Replica被选为Leader,其余ReplicaFollower
  • Leader负责处理消息的写操作,将消息追加到Partition中。
  • Follower负责与Leader保持同步,定期从Leader中拉取消息并复制到本地副本中,以保证数据的一致性。
  • ConsumerBroker中读取消息,可以指定消费某个Topic中的指定Partition中的消息,也可以进行批量消费或实时消费。
  • Broker将消息存储在磁盘中,同时也会缓存部分消息到内存中,以提高读写性能。

为了提高集群的可用性和稳定性, 架构中还会引入ZooKeeper, ZooKeeper用于维护Kafka集群中的Broker节点信息、Partition信息、Topic信息等。

Leader & Follower

上述提到了leaderFollower,有的小伙伴可能不知道是啥,这里讲下为啥会有这个?

Kafka分布式架构中,每个Partition可以有多个Replica,其中一个Replica被选为Leader,其余Replica为Follower

Leader是指在一个Partition中,负责处理该Partition所有消息的读写操作的Replica。当Producer发送消息到该Partition时,消息会首先被发送到Leader所在的ReplicaLeader再将消息追加到Partition中,然后将消息复制到所有FollowerReplica中。在读取数据时,Consumer也只能从Leader所在的Replica中读取消息,而Follower只负责与Leader保持同步,不参与读写操作。

Leader的选举方式与ZooKeeper密切相关,当Leader所在的Replica出现故障时,ZooKeeper会自动选举新的Leader,以保证Partition中的数据一致性可用性。由于只有Leader负责读写操作,因此可以有效避免数据的冲突和重复`。

ZooKeeper

如果有小伙伴不知道ZooKeeper是啥,给大家简要介绍一下:

ZooKeeper是一个分布式协调服务,常用于分布式系统中的协调与通知Kafka使用ZooKeeper来进行集群管理Leader选举存储Metadata信息等。ZooKeeper是Kafka的重要组成部分,没有ZooKeeper的支持,Kafka集群无法正常运行,往后的发展趋势可能会不用强依赖ZooKeeper`。

Kafka中,每个Broker都会向ZooKeeper注册自己的节点信息,包括Broker ID、IP地址和端口号等。同时,每个PartitionMetadata信息也会存储在ZooKeeper中,包括该PartitionReplica信息、Leader信息、ISR信息等。当Broker加入或退出集群时,ZooKeeper会自动通知其他Broker更新集群的状态信息。在Leader选举时,ZooKeeper会根据预设的算法选举出新的Leader,并通知其他Broker更新Partition`的状态信息。

除了Kafka之外,ZooKeeper还被广泛应用于Hadoop、HBase、Solr等其他分布式系统中,是一个非常成熟和稳定的分布式协调服务。再举一个,Dubbo RPC服务开发框架,如果有用过Dubbo的小伙伴,ZooKeeper一定不会陌生。

这块知识点,大家一定要搞懂,也是面试的热点问题~

Kafka & 集群搭建

这里教大家如何使用docker部署Kafka集群,需要大家安装好docker, 如果不会安装的可以参考之前的文章es集群搭建

给大家提前准备好了docker-compose.yml文件,配置有点多,如果没有一些docker基础,可能会看不懂,不过没关系,不影响我们的部署,

直接执行docker-compose up -d就完了。因为安装的东西比较多,包含zookeeper集群,kafka集群,kafka-ui管理后台,这个ui后台是一个开源的系统,界面比较整洁,推荐给大家,命令执行后稍稍等待一会~

version: '3.1'
services:
  zoo1:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo1
    container_name: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
    volumes:
      - ./data/zookeeper/zoo1/data:/data
      - ./data/zookeeper/zoo1/datalog:/datalog  
  zoo2:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo2
    container_name: zoo2
    ports:
      - "2182:2182"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2182
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
    volumes:
      - ./data/zookeeper/zoo2/data:/data
      - ./data/zookeeper/zoo2/datalog:/datalog    
  zoo3:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo3
    container_name: zoo3
    ports:
      - "2183:2183"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2183
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
    volumes:
      - ./data/zookeeper/zoo3/data:/data
      - ./data/zookeeper/zoo3/datalog:/datalog    
  kafka1:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    volumes:
      - ./data/kafka_data1:/kafka/data  
    depends_on:
      - zoo1
      - zoo2
      - zoo3
  kafka2:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka2
    container_name: kafka2
    ports:
      - "9093:9093"
      - "29093:29093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 2
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    volumes:
      - ./data/kafka_data2:/kafka/data    
    depends_on:
      - zoo1
      - zoo2
      - zoo3
  kafka3:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka3
    container_name: kafka3
    ports:
      - "9094:9094"
      - "29094:29094"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 3
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    volumes:
      - ./data/kafka_data3:/kafka/data    
    depends_on:
      - zoo1
      - zoo2
      - zoo3
  kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:latest
    ports:
      - 9999:8080
    depends_on:
      - kafka1
      - kafka2
      - kafka3
    environment:
      KAFKA_CLUSTERS_0_NAME: k1
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092
      KAFKA_CLUSTERS_1_NAME: k2
      KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka2:29093
      KAFKA_CLUSTERS_2_NAME: k3
      KAFKA_CLUSTERS_2_BOOTSTRAPSERVERS: kafka3:29094

然后浏览器打开localhost:9999可以访问UI后台, 我们可以通过后台新建topic来验证集群是否工作。

这里再给大家推荐一个GUI工具,Kafka Assistant这个工具方便我们日常开发测试使用,界面也很简洁,直接桌面端安装,利用它发送几条消息。

然后我们到后台页面,观察三个节点,发现topic里都有test,并且消息都是存在的

结束语

本节到这里就结束, 概念有点多,需要好好理理,后边会结合实际案例给大家继续讲它的概念和工作原理。下节带大家看下Springboot整合Kafka实战

本着把自己知道的都告诉大家,如果本文对您有所帮助,点赞+关注鼓励一下呗~

ElasticSearch 专题学习

项目源码(源码已更新 欢迎star⭐️)

往期并发编程内容推荐

推荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)

博客(阅读体验较佳)















相关文章
|
6月前
|
消息中间件 安全 Kafka
2024年了,如何更好的搭建Kafka集群?
我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.6.1来搭建集群。
1352 2
2024年了,如何更好的搭建Kafka集群?
|
18天前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
51 4
|
1月前
|
消息中间件 监控 数据可视化
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
51 2
|
16天前
|
消息中间件 存储 Prometheus
Kafka集群如何配置高可用性
Kafka集群如何配置高可用性
|
1月前
|
消息中间件 分布式计算 监控
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
61 6
|
3月前
|
消息中间件 Kafka 测试技术
【Kafka揭秘】Leader选举大揭秘!如何打造一个不丢失消息的强大Kafka集群?
【8月更文挑战第24天】Apache Kafka是一款高性能分布式消息系统,利用分区机制支持数据并行处理。每个分区含一个Leader处理所有读写请求,并可有多个副本确保数据安全与容错。关键的Leader选举机制保障了系统的高可用性和数据一致性。选举发生于分区创建、Leader故障或被手动移除时。Kafka提供多种选举策略:内嵌机制自动选择最新数据副本为新Leader;Unclean选举快速恢复服务但可能丢失数据;Delayed Unclean选举则避免短暂故障下的Unclean选举;Preferred选举允许基于性能或地理位置偏好指定特定副本为首选Leader。
69 5
|
3月前
|
消息中间件 监控 Java
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
|
3月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
70 2
|
4月前
|
消息中间件 Kafka
kafka 集群环境搭建
kafka 集群环境搭建
69 8
|
3月前
|
消息中间件 Java Kafka
Linux——Kafka集群搭建
Linux——Kafka集群搭建
48 0