前言
目前正在出一个Kafka专题
系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~
本节给大家讲一下Kafka
的一些核心概念以及如何利用docker
快速的搭建Kafka集群
~
好了, 废话不多说直接开整吧~
什么是 Kafka
Kafka
是一种高吞吐量
、分布式
、可扩展
的消息中间件系统
,最初由LinkedIn
公司开发。随着不断的发展,在最新的版本中它定义为分布式的流处理平台
,现在在大数据
应用中也是十分广泛。
它可以处理大量的实时数据流
,被广泛应用于日志收集、事件处理、流处理、消息队列
等场景。
Kafka
的架构包含producer(生产者)、consumer(消费者)、broker(代理服务器)
等组件。生产者
可以将消息发送到Kafka集群
,消费者
可以从Kafka
集群订阅
消息并进行处理,而broker
则是消息的中转服务器,负责存储和转发
消息。
Kafka
的特点包括:
高吞吐量
:Kafka可以处理海量的数据流,支持每秒百万级别的消息处理。可扩展性
:Kafka的集群可以根据需要进行水平扩展,从而提高系统的性能和容量。可靠性
:Kafka支持多副本机制,可以保证数据的可靠性和高可用性。灵活性
:Kafka支持多种消息格式和协议,可以与各种系统和工具进行集成。
Kafka
是一个开源的项目,已经成为了Apache
软件基金会的顶级项目.
Kafka & 核心概念
接着,我们看下它的核心概念,这些概念都很重要,在后边的学习中都会遇到,概念一定要搞明白,对于理解Kafka
的工作原理和使用方法非常重要。不然学习起来比较懵, 下面一起看一下核心概念:
Topic
Topic
是消息的逻辑容器
,用于对消息进行分类和存储。在Kafka
中,消息会被发布到指定的topic
中,并且可以被一个或多个消费者订阅
。Topic
是Kafka的核心概念之一,是实现消息传递的基础。
Producer
Producer
是消息的生产者
,用于向指定的topic
中发送消息。Producer
负责将消息发送到Kafka集群
中的broker
节点,并且可以在发送消息时指定消息的key
,以便Kafka
将消息分配到指定的partition
中。
Consumer
Consumer
是消息的消费者
,用于从指定的topic
中接收消息。Consumer
负责从Kafka
集群中的broker
节点获取消息,并且可以指定从哪个partition
中获取消息。消费者可以以不同的方式进行消息消费,例如批量消费、轮询消费
等。
Broker
Broker
是Kafka集群
中的一个节点
,用于存储和管理消息
。Broker是Kafka的核心组件
之一,负责接收和处理
生产者发送的消息,并将其存储到磁盘
中,同时还负责将消息转发给消费者
。
Partition
Partition
是Kafka
中实现数据分片
的机制,一个topic
可以被分成多个partition
,每个partition
都是一个有序
的消息队列
。消息在被发送到一个topic
时,会被根据指定的key
进行hash
计算,然后被分配到对应的partition
中。
Offset
Offset
是Kafka中的一个重要概念,用于标识
每个消息在一个partition
中的位置。每个partition
都有一个唯一的offset
值,消费者可以根据offset
来获取指定位置的消息。Kafka还提供了一种特殊的topic
,称为__consumer_offsets
,用于存储消费者消费的位置信息
。
Kafka & 主要架构
如图:
+---------+ +---------+ |Producer | |Consumer | +---------+ +---------+ | | | | +---------+ +---------+ | Broker | | Broker | +---------+ +---------+ / \ / \ / \ / \ +---------+ +---------+ +---------+ +---------+ |Partition| |Partition| |Partition| |Partition| +---------+ +---------+ +---------+ +---------+ / \ / \ / \ / \ / \ / \ / \ / \ +----------+ +----------+ +----------+ +----------+ +----------+ |Replica | |Replica | |Replica | |Replica | |Replica | +----------+ +----------+ +----------+ +----------+ +----------+ Leader Follower Follower Follower Follower | | | | | | | | Write Read Read Read | | | | +--------+----------+------------+-----------+ | | | | +---------+ +---------+ | Disk | | Memory | +---------+ +---------+
在这个流程图中,主要有以下几个流程:
Producer
将消息发送到Broker
节点,Broker
将消息存储到对应的Partition
中。- 每个
Partition
可以有多个Replica
,其中一个Replica
被选为Leader
,其余Replica
为Follower
。 Leader
负责处理消息的写操作,将消息追加到Partition
中。Follower
负责与Leader
保持同步,定期从Leader
中拉取消息并复制到本地副本中,以保证数据的一致性。Consumer
从Broker
中读取消息,可以指定消费某个Topic
中的指定Partition
中的消息,也可以进行批量消费或实时消费。Broker
将消息存储在磁盘中,同时也会缓存部分消息到内存中,以提高读写性能。
为了提高集群的可用性和稳定性
, 架构中还会引入ZooKeeper
, ZooKeeper
用于维护Kafka集群
中的Broker节点
信息、Partition
信息、Topic
信息等。
Leader & Follower
上述提到了leader
和Follower
,有的小伙伴可能不知道是啥,这里讲下为啥会有这个?
在Kafka
的分布式
架构中,每个Partition
可以有多个Replica
,其中一个Replica
被选为Leader
,其余Replica为Follower
。
Leader
是指在一个Partition
中,负责处理该Partition
所有消息的读写操作的Replica
。当Producer
发送消息到该Partition
时,消息会首先被发送到Leader
所在的Replica
,Leader
再将消息追加到Partition
中,然后将消息复制到所有Follower
的Replica
中。在读取数据时,Consumer
也只能从Leader
所在的Replica
中读取消息,而Follower
只负责与Leader
保持同步,不参与读写操作。
Leader
的选举方式与ZooKeeper
密切相关,当Leader
所在的Replica
出现故障时,ZooKeeper
会自动选举新的Leader
,以保证Partition
中的数据一致性
和可用性
。由于只有Leader
负责读写操作,因此可以有效避免数据的
冲突和重复`。
ZooKeeper
如果有小伙伴不知道ZooKeeper
是啥,给大家简要介绍一下:
ZooKeeper
是一个分布式协调服务
,常用于分布式系统中的协调与通知
。Kafka
使用ZooKeeper
来进行集群管理
、Leader选举
、存储Metadata
信息等。ZooKeeper是
Kafka的重要组成部分,没有
ZooKeeper的支持,
Kafka集群无法正常运行,往后的发展趋势可能会不用强依赖
ZooKeeper`。
在Kafka
中,每个Broker
都会向ZooKeeper注册自己的节点信息
,包括Broker ID、IP地址和端口号
等。同时,每个Partition
的Metadata
信息也会存储在ZooKeeper
中,包括该Partitio
n的
Replica信息、
Leader信息、
ISR信息等。当
Broker加入或退出集群时,
ZooKeeper会自动通知其他
Broker更新集群的状态信息。在
Leader选举时,
ZooKeeper会根据预设的算法选举出新的
Leader,并通知其他
Broker更新
Partition`的状态信息。
除了Kafka
之外,ZooKeeper
还被广泛应用于Hadoop、HBase、Solr
等其他分布式系统中,是一个非常成熟和稳定的分布式协调服务。再举一个,Dubbo RPC
服务开发框架,如果有用过Dubbo
的小伙伴,ZooKeeper
一定不会陌生。
这块知识点,大家一定要搞懂,也是面试
的热点问题~
Kafka & 集群搭建
这里教大家如何使用docker
部署Kafka集群
,需要大家安装好docker
, 如果不会安装的可以参考之前的文章es集群搭建
。
给大家提前准备好了docker-compose.yml
文件,配置有点多,如果没有一些docker
基础,可能会看不懂,不过没关系,不影响我们的部署,
直接执行docker-compose up -d
就完了。因为安装的东西比较多,包含zookeeper集群,kafka集群,kafka-ui管理后台
,这个ui
后台是一个开源的系统,界面比较整洁,推荐给大家,命令执行后稍稍等待一会~
version: '3.1' services: zoo1: image: confluentinc/cp-zookeeper:7.3.2 hostname: zoo1 container_name: zoo1 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888 volumes: - ./data/zookeeper/zoo1/data:/data - ./data/zookeeper/zoo1/datalog:/datalog zoo2: image: confluentinc/cp-zookeeper:7.3.2 hostname: zoo2 container_name: zoo2 ports: - "2182:2182" environment: ZOOKEEPER_CLIENT_PORT: 2182 ZOOKEEPER_SERVER_ID: 2 ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888 volumes: - ./data/zookeeper/zoo2/data:/data - ./data/zookeeper/zoo2/datalog:/datalog zoo3: image: confluentinc/cp-zookeeper:7.3.2 hostname: zoo3 container_name: zoo3 ports: - "2183:2183" environment: ZOOKEEPER_CLIENT_PORT: 2183 ZOOKEEPER_SERVER_ID: 3 ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888 volumes: - ./data/zookeeper/zoo3/data:/data - ./data/zookeeper/zoo3/datalog:/datalog kafka1: image: confluentinc/cp-kafka:7.3.2 hostname: kafka1 container_name: kafka1 ports: - "9092:9092" - "29092:29092" environment: KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183" KAFKA_BROKER_ID: 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" volumes: - ./data/kafka_data1:/kafka/data depends_on: - zoo1 - zoo2 - zoo3 kafka2: image: confluentinc/cp-kafka:7.3.2 hostname: kafka2 container_name: kafka2 ports: - "9093:9093" - "29093:29093" environment: KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183" KAFKA_BROKER_ID: 2 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" volumes: - ./data/kafka_data2:/kafka/data depends_on: - zoo1 - zoo2 - zoo3 kafka3: image: confluentinc/cp-kafka:7.3.2 hostname: kafka3 container_name: kafka3 ports: - "9094:9094" - "29094:29094" environment: KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183" KAFKA_BROKER_ID: 3 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" volumes: - ./data/kafka_data3:/kafka/data depends_on: - zoo1 - zoo2 - zoo3 kafka-ui: container_name: kafka-ui image: provectuslabs/kafka-ui:latest ports: - 9999:8080 depends_on: - kafka1 - kafka2 - kafka3 environment: KAFKA_CLUSTERS_0_NAME: k1 KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092 KAFKA_CLUSTERS_1_NAME: k2 KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka2:29093 KAFKA_CLUSTERS_2_NAME: k3 KAFKA_CLUSTERS_2_BOOTSTRAPSERVERS: kafka3:29094
然后浏览器打开localhost:9999
可以访问UI后台
, 我们可以通过后台新建topic
来验证集群是否工作。
这里再给大家推荐一个GUI
工具,Kafka Assistant
这个工具方便我们日常开发测试使用,界面也很简洁,直接桌面端安装,利用它发送几条消息。
然后我们到后台页面,观察三个节点,发现topic
里都有test
,并且消息都是存在的
结束语
本节到这里就结束, 概念有点多,需要好好理理,后边会结合实际案例给大家继续讲它的概念和工作原理。下节带大家看下Springboot整合Kafka
实战
本着把自己知道的都告诉大家,如果本文对您有所帮助,点赞+关注
鼓励一下呗~
ElasticSearch 专题学习
- 利用docker搭建es集群
- 一起来学ElasticSearch(一)
- 一起来学ElasticSearch(二)
- 一起来学ElasticSearch(三)
- 一起来学ElasticSearch(四)
- 一起来学ElasticSearch(五)
- 一起来学ElasticSearch(六)
- 一起来学ElasticSearch(七)
- 一起来学ElasticSearch(八)
- 一起来学ElasticSearch(九)
- 一起来学ElasticSearch(十)
- 一起来学ElasticSearch之整合SpringBoot(一)
- 一起来学ElasticSearch之整合SpringBoot(二)
- 一起来学ElasticSearch之整合SpringBoot(三)
项目源码(源码已更新 欢迎star⭐️)
往期并发编程内容推荐
- Java多线程专题之线程与进程概述
- Java多线程专题之线程类和接口入门
- Java多线程专题之进阶学习Thread(含源码分析)
- Java多线程专题之Callable、Future与FutureTask(含源码分析)
- 面试官: 有了解过线程组和线程优先级吗
- 面试官: 说一下线程的生命周期过程
- 面试官: 说一下线程间的通信
- 面试官: 说一下Java的共享内存模型
- 面试官: 有了解过指令重排吗,什么是happens-before
- 面试官: 有了解过volatile关键字吗 说说看
- 面试官: 有了解过Synchronized吗 说说看
- Java多线程专题之Lock锁的使用
- 面试官: 有了解过ReentrantLock的底层实现吗?说说看
- 面试官: 有了解过CAS和原子操作吗?说说看
- Java多线程专题之线程池的基本使用
- 面试官: 有了解过线程池的工作原理吗?说说看
- 面试官: 线程池是如何做到线程复用的?有了解过吗,说说看
- 面试官: 阻塞队列有了解过吗?说说看
- 面试官: 阻塞队列的底层实现有了解过吗? 说说看
- 面试官: 同步容器和并发容器有用过吗? 说说看
- 面试官: CopyOnWrite容器有了解过吗? 说说看
- 面试官: Semaphore在项目中有使用过吗?说说看(源码剖析)
- 面试官: Exchanger在项目中有使用过吗?说说看(源码剖析)
- 面试官: CountDownLatch有了解过吗?说说看(源码剖析)
- 面试官: CyclicBarrier有了解过吗?说说看(源码剖析)
- 面试官: Phaser有了解过吗?说说看
- 面试官: Fork/Join 有了解过吗?说说看(含源码分析)
- 面试官: Stream并行流有了解过吗?说说看
推荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)
- springboot-all
地址
: github.com/qiuChenglei…- SpringBoot系列教程合集
- 一起来学SpringCloud合集
- SpringCloud整合 Oauth2+Gateway+Jwt+Nacos 实现授权码模式的服务认证(一)
- SpringCloud整合 Oauth2+Gateway+Jwt+Nacos 实现授权码模式的服务认证(二)