前言
目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~
本节给大家讲一下Kafka的一些核心概念以及如何利用docker快速的搭建Kafka集群~
好了, 废话不多说直接开整吧~
什么是 Kafka
Kafka是一种高吞吐量、分布式、可扩展的消息中间件系统,最初由LinkedIn公司开发。随着不断的发展,在最新的版本中它定义为分布式的流处理平台,现在在大数据应用中也是十分广泛。
它可以处理大量的实时数据流,被广泛应用于日志收集、事件处理、流处理、消息队列等场景。
Kafka的架构包含producer(生产者)、consumer(消费者)、broker(代理服务器)等组件。生产者可以将消息发送到Kafka集群,消费者可以从Kafka集群订阅消息并进行处理,而broker则是消息的中转服务器,负责存储和转发消息。
Kafka的特点包括:
高吞吐量:Kafka可以处理海量的数据流,支持每秒百万级别的消息处理。可扩展性:Kafka的集群可以根据需要进行水平扩展,从而提高系统的性能和容量。可靠性:Kafka支持多副本机制,可以保证数据的可靠性和高可用性。灵活性:Kafka支持多种消息格式和协议,可以与各种系统和工具进行集成。
Kafka是一个开源的项目,已经成为了Apache软件基金会的顶级项目.
Kafka & 核心概念
接着,我们看下它的核心概念,这些概念都很重要,在后边的学习中都会遇到,概念一定要搞明白,对于理解Kafka的工作原理和使用方法非常重要。不然学习起来比较懵, 下面一起看一下核心概念:
Topic
Topic是消息的逻辑容器,用于对消息进行分类和存储。在Kafka中,消息会被发布到指定的topic中,并且可以被一个或多个消费者订阅。Topic是Kafka的核心概念之一,是实现消息传递的基础。
Producer
Producer是消息的生产者,用于向指定的topic中发送消息。Producer负责将消息发送到Kafka集群中的broker节点,并且可以在发送消息时指定消息的key,以便Kafka将消息分配到指定的partition中。
Consumer
Consumer是消息的消费者,用于从指定的topic中接收消息。Consumer负责从Kafka集群中的broker节点获取消息,并且可以指定从哪个partition中获取消息。消费者可以以不同的方式进行消息消费,例如批量消费、轮询消费等。
Broker
Broker是Kafka集群中的一个节点,用于存储和管理消息。Broker是Kafka的核心组件之一,负责接收和处理生产者发送的消息,并将其存储到磁盘中,同时还负责将消息转发给消费者。
Partition
Partition是Kafka中实现数据分片的机制,一个topic可以被分成多个partition,每个partition都是一个有序的消息队列。消息在被发送到一个topic时,会被根据指定的key进行hash计算,然后被分配到对应的partition中。
Offset
Offset是Kafka中的一个重要概念,用于标识每个消息在一个partition中的位置。每个partition都有一个唯一的offset值,消费者可以根据offset来获取指定位置的消息。Kafka还提供了一种特殊的topic,称为__consumer_offsets,用于存储消费者消费的位置信息。
Kafka & 主要架构
如图:
+---------+ +---------+ |Producer | |Consumer | +---------+ +---------+ | | | | +---------+ +---------+ | Broker | | Broker | +---------+ +---------+ / \ / \ / \ / \ +---------+ +---------+ +---------+ +---------+ |Partition| |Partition| |Partition| |Partition| +---------+ +---------+ +---------+ +---------+ / \ / \ / \ / \ / \ / \ / \ / \ +----------+ +----------+ +----------+ +----------+ +----------+ |Replica | |Replica | |Replica | |Replica | |Replica | +----------+ +----------+ +----------+ +----------+ +----------+ Leader Follower Follower Follower Follower | | | | | | | | Write Read Read Read | | | | +--------+----------+------------+-----------+ | | | | +---------+ +---------+ | Disk | | Memory | +---------+ +---------+
在这个流程图中,主要有以下几个流程:
Producer将消息发送到Broker节点,Broker将消息存储到对应的Partition中。- 每个
Partition可以有多个Replica,其中一个Replica被选为Leader,其余Replica为Follower。 Leader负责处理消息的写操作,将消息追加到Partition中。Follower负责与Leader保持同步,定期从Leader中拉取消息并复制到本地副本中,以保证数据的一致性。Consumer从Broker中读取消息,可以指定消费某个Topic中的指定Partition中的消息,也可以进行批量消费或实时消费。Broker将消息存储在磁盘中,同时也会缓存部分消息到内存中,以提高读写性能。
为了提高集群的可用性和稳定性, 架构中还会引入ZooKeeper, ZooKeeper用于维护Kafka集群中的Broker节点信息、Partition信息、Topic信息等。
Leader & Follower
上述提到了leader和Follower,有的小伙伴可能不知道是啥,这里讲下为啥会有这个?
在Kafka的分布式架构中,每个Partition可以有多个Replica,其中一个Replica被选为Leader,其余Replica为Follower。
Leader是指在一个Partition中,负责处理该Partition所有消息的读写操作的Replica。当Producer发送消息到该Partition时,消息会首先被发送到Leader所在的Replica,Leader再将消息追加到Partition中,然后将消息复制到所有Follower的Replica中。在读取数据时,Consumer也只能从Leader所在的Replica中读取消息,而Follower只负责与Leader保持同步,不参与读写操作。
Leader的选举方式与ZooKeeper密切相关,当Leader所在的Replica出现故障时,ZooKeeper会自动选举新的Leader,以保证Partition中的数据一致性和可用性。由于只有Leader负责读写操作,因此可以有效避免数据的冲突和重复`。
ZooKeeper
如果有小伙伴不知道ZooKeeper是啥,给大家简要介绍一下:
ZooKeeper是一个分布式协调服务,常用于分布式系统中的协调与通知。Kafka使用ZooKeeper来进行集群管理、Leader选举、存储Metadata信息等。ZooKeeper是Kafka的重要组成部分,没有ZooKeeper的支持,Kafka集群无法正常运行,往后的发展趋势可能会不用强依赖ZooKeeper`。
在Kafka中,每个Broker都会向ZooKeeper注册自己的节点信息,包括Broker ID、IP地址和端口号等。同时,每个Partition的Metadata信息也会存储在ZooKeeper中,包括该Partition的Replica信息、Leader信息、ISR信息等。当Broker加入或退出集群时,ZooKeeper会自动通知其他Broker更新集群的状态信息。在Leader选举时,ZooKeeper会根据预设的算法选举出新的Leader,并通知其他Broker更新Partition`的状态信息。
除了Kafka之外,ZooKeeper还被广泛应用于Hadoop、HBase、Solr等其他分布式系统中,是一个非常成熟和稳定的分布式协调服务。再举一个,Dubbo RPC服务开发框架,如果有用过Dubbo的小伙伴,ZooKeeper一定不会陌生。
这块知识点,大家一定要搞懂,也是面试的热点问题~
Kafka & 集群搭建
这里教大家如何使用docker部署Kafka集群,需要大家安装好docker, 如果不会安装的可以参考之前的文章es集群搭建。
给大家提前准备好了docker-compose.yml文件,配置有点多,如果没有一些docker基础,可能会看不懂,不过没关系,不影响我们的部署,
直接执行docker-compose up -d就完了。因为安装的东西比较多,包含zookeeper集群,kafka集群,kafka-ui管理后台,这个ui后台是一个开源的系统,界面比较整洁,推荐给大家,命令执行后稍稍等待一会~
version: '3.1' services: zoo1: image: confluentinc/cp-zookeeper:7.3.2 hostname: zoo1 container_name: zoo1 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888 volumes: - ./data/zookeeper/zoo1/data:/data - ./data/zookeeper/zoo1/datalog:/datalog zoo2: image: confluentinc/cp-zookeeper:7.3.2 hostname: zoo2 container_name: zoo2 ports: - "2182:2182" environment: ZOOKEEPER_CLIENT_PORT: 2182 ZOOKEEPER_SERVER_ID: 2 ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888 volumes: - ./data/zookeeper/zoo2/data:/data - ./data/zookeeper/zoo2/datalog:/datalog zoo3: image: confluentinc/cp-zookeeper:7.3.2 hostname: zoo3 container_name: zoo3 ports: - "2183:2183" environment: ZOOKEEPER_CLIENT_PORT: 2183 ZOOKEEPER_SERVER_ID: 3 ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888 volumes: - ./data/zookeeper/zoo3/data:/data - ./data/zookeeper/zoo3/datalog:/datalog kafka1: image: confluentinc/cp-kafka:7.3.2 hostname: kafka1 container_name: kafka1 ports: - "9092:9092" - "29092:29092" environment: KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183" KAFKA_BROKER_ID: 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" volumes: - ./data/kafka_data1:/kafka/data depends_on: - zoo1 - zoo2 - zoo3 kafka2: image: confluentinc/cp-kafka:7.3.2 hostname: kafka2 container_name: kafka2 ports: - "9093:9093" - "29093:29093" environment: KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183" KAFKA_BROKER_ID: 2 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" volumes: - ./data/kafka_data2:/kafka/data depends_on: - zoo1 - zoo2 - zoo3 kafka3: image: confluentinc/cp-kafka:7.3.2 hostname: kafka3 container_name: kafka3 ports: - "9094:9094" - "29094:29094" environment: KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183" KAFKA_BROKER_ID: 3 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" volumes: - ./data/kafka_data3:/kafka/data depends_on: - zoo1 - zoo2 - zoo3 kafka-ui: container_name: kafka-ui image: provectuslabs/kafka-ui:latest ports: - 9999:8080 depends_on: - kafka1 - kafka2 - kafka3 environment: KAFKA_CLUSTERS_0_NAME: k1 KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092 KAFKA_CLUSTERS_1_NAME: k2 KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka2:29093 KAFKA_CLUSTERS_2_NAME: k3 KAFKA_CLUSTERS_2_BOOTSTRAPSERVERS: kafka3:29094
然后浏览器打开localhost:9999可以访问UI后台, 我们可以通过后台新建topic来验证集群是否工作。
这里再给大家推荐一个GUI工具,Kafka Assistant这个工具方便我们日常开发测试使用,界面也很简洁,直接桌面端安装,利用它发送几条消息。
然后我们到后台页面,观察三个节点,发现topic里都有test,并且消息都是存在的
结束语
本节到这里就结束, 概念有点多,需要好好理理,后边会结合实际案例给大家继续讲它的概念和工作原理。下节带大家看下Springboot整合Kafka实战
本着把自己知道的都告诉大家,如果本文对您有所帮助,点赞+关注鼓励一下呗~
ElasticSearch 专题学习
- 利用docker搭建es集群
- 一起来学ElasticSearch(一)
- 一起来学ElasticSearch(二)
- 一起来学ElasticSearch(三)
- 一起来学ElasticSearch(四)
- 一起来学ElasticSearch(五)
- 一起来学ElasticSearch(六)
- 一起来学ElasticSearch(七)
- 一起来学ElasticSearch(八)
- 一起来学ElasticSearch(九)
- 一起来学ElasticSearch(十)
- 一起来学ElasticSearch之整合SpringBoot(一)
- 一起来学ElasticSearch之整合SpringBoot(二)
- 一起来学ElasticSearch之整合SpringBoot(三)
项目源码(源码已更新 欢迎star⭐️)
往期并发编程内容推荐
- Java多线程专题之线程与进程概述
- Java多线程专题之线程类和接口入门
- Java多线程专题之进阶学习Thread(含源码分析)
- Java多线程专题之Callable、Future与FutureTask(含源码分析)
- 面试官: 有了解过线程组和线程优先级吗
- 面试官: 说一下线程的生命周期过程
- 面试官: 说一下线程间的通信
- 面试官: 说一下Java的共享内存模型
- 面试官: 有了解过指令重排吗,什么是happens-before
- 面试官: 有了解过volatile关键字吗 说说看
- 面试官: 有了解过Synchronized吗 说说看
- Java多线程专题之Lock锁的使用
- 面试官: 有了解过ReentrantLock的底层实现吗?说说看
- 面试官: 有了解过CAS和原子操作吗?说说看
- Java多线程专题之线程池的基本使用
- 面试官: 有了解过线程池的工作原理吗?说说看
- 面试官: 线程池是如何做到线程复用的?有了解过吗,说说看
- 面试官: 阻塞队列有了解过吗?说说看
- 面试官: 阻塞队列的底层实现有了解过吗? 说说看
- 面试官: 同步容器和并发容器有用过吗? 说说看
- 面试官: CopyOnWrite容器有了解过吗? 说说看
- 面试官: Semaphore在项目中有使用过吗?说说看(源码剖析)
- 面试官: Exchanger在项目中有使用过吗?说说看(源码剖析)
- 面试官: CountDownLatch有了解过吗?说说看(源码剖析)
- 面试官: CyclicBarrier有了解过吗?说说看(源码剖析)
- 面试官: Phaser有了解过吗?说说看
- 面试官: Fork/Join 有了解过吗?说说看(含源码分析)
- 面试官: Stream并行流有了解过吗?说说看
推荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)
- springboot-all
地址: github.com/qiuChenglei…- SpringBoot系列教程合集
- 一起来学SpringCloud合集
- SpringCloud整合 Oauth2+Gateway+Jwt+Nacos 实现授权码模式的服务认证(一)
- SpringCloud整合 Oauth2+Gateway+Jwt+Nacos 实现授权码模式的服务认证(二)