前言
各位清明 快乐呀,近期博主也是学习了一下kafka,以下是博主的一些学习笔记,希望对你有所帮助
前置知识
线程中的数据交互以及进程中的数据交互
我们知道线程之间可以使用堆空间进行数据交互的
但是如果发送方和接收方处理数据的效率差距过大,这里就会造成消息积压的问题,怎么处理呢?
存入文件显然是不可取的,因为这里文件的大小也是有上限的,所以我们加上一个中间件,也就是kafka
这里进程呢?
进程之间肯定是不可以使用共用内存进行交互的,这里就采用网络传输的方式进行交互,因为他们的内存都是独立存在的,使用socket网络传输即可
我们知道一个一般处理消息的不止一个消费者,这样直接让消费者和生产者进行交互耦合度也就太高了,我们也引入了消息中间件来降低消息的耦合度吧
JMS Java Message Service
JMS包含了p2p和消息订阅发布模型,基本上很多mq都是遵循这个模型的
我们kafka没有加上mq的的后缀,他其实不是完全遵循这个模型
p2p 点对点模型
这里指的是一条消息只能被消费者消费一次,然后消费者会给生产者一个反馈
sub/pub订阅发布模型
生产者生产的消息会发送到对应的topic,订阅了这个topic的消费者都可以消费数据,同样的数据可以被不同的消费者进行消费
注:本文基于的Windows的kafka进行演示学习,kafka一般部署在linux操作系统上
kafka的生产者消费者模型
kafka在底层大量的使用生产者消费者模型
并且为了保证数据的安全性,其还使用了日志文件进行了数据的保存
下面我们通过一个简单的helloworld程序来感受一下
我们先启动zookeeper 再启动kafka即可
注意,先进行修改 两个配置文件,存放对应的data
注:记得进入对应的文件夹,使用对应的bat脚本文件
先演示一下单机
开启zookeeper脚本
call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
开启kafka脚本
call bin/windows/kafka-server-start.bat config/server.properties
创建主题
查看主题
执行经典helloworld
注:启动完一定要先创建主题,主题是kafka一个基本的逻辑分类单位,先开启zookeeper再开启kafka
如果这里kafka客户端一闪而过启动失败的情况,直接删除data文件即可
maven项目简单搭建
引入依赖
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.6.1</version> </dependency> </dependencies>
注:在kafka中提供服务的节点就称之为broker
producer代码
package kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.HashMap; import java.util.Map; public class testProducer { public static void main(String[] args) { //TODO 创建配置对象 Map<String, Object> configMap = new HashMap<>(); configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); //TODO 对生产的KV操作进序列化 configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //TODO 创建生产者对象 //生产者对象需要确定泛型,是kv类型的 KafkaProducer<String, String> producer = new KafkaProducer<>(configMap); //TODO 创建数据 //构建数据时,需要传入三个参数,主题,key,value for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("test", "key"+i,"hello kafka"+i); //TODO 通过生产者对象将数据发送给kafka producer.send(record); } //TODO 关闭生产者对象 producer.close(); } }
consumer代码
package kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.HashMap; import java.util.Map; public class testConsumer { public static void main(String[] args) { //TODO 创建消费者对象 //消费者也需要相应的配置 Map<String,Object> consumerConfig = new HashMap<>(); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); //反序列化 consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); //配置groupID consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig); //TODO 订阅主题 consumer.subscribe(Collections.singletonList("test")); //TODO 从kafka的主题中获取数据 //消费者从kafka拉取数据 不是推送的概念 while(true) { ConsumerRecords<String, String> dates = consumer.poll(100); for (ConsumerRecord<String, String> date : dates) { System.out.println(date); } } //TODO 关闭消费者对象 //consumer.close(); } }
先启动consumer再启动producer,让我们在可视化工具上查看一下信息是否存在了
这里使用的是kafkatool
执行完成就可以发现数据已经存在了
Kafka系统架构以及核心组件
我们都知道kafka肯定不是只有一个生产者和一个消费者呀
当这里的数据频繁生产消费就可能造成IO热点问题
最后这个节点可能就成为分布式系统的性能瓶颈,一旦节点挂了,就可能造成数据丢失
tips:这里的挂了可能只是网络不稳定,资源耗尽等问题导致的长时间连接不上
解决方案
横向扩展和纵向扩展
横向扩展:使用更快的网络,更大的磁盘...无法根本解决问题
纵向扩展:使用集群的方式,也是kafka的解决方案
这还没有结束,光增加节点是没用的,生产和消费请求还是指向同一个节点,我们需要将这里的数据分散到各个节点,实现同一个主题在不同的broker中
区分不同的数据,加上编号就称之为分区,这也是kafka物理上的存储单位
例如 partition-0
一个主题可以有多个分区,分散在不同的broker中
但是每个消费者也不能只消费一部分数据呀,每个消费者向每个分区发送请求,这里效率也是很低的,所以又提出了消费组的概念
并且为了数据的安全考虑,也提出了将数据进行备份的方案,但是并不在自己的节点进行备份,因为在自己的节点进行备份的话,自己挂了,备份也没了,所以这里是在其他节点保存备份文件,称之为foller副本,kafka中备份统称为副本,主文件叫做leader副本,只有leader副本可以读写,foller副本只负责备份.
基础组件
每一个kafka节点中都包含很多个组件,下面我们来介绍一下经典的几个组件
首先就是我们的Controller了,在多个节点中我们得选举出一个管理者
这里的选举也很简单粗暴,哪个节点先和他建立连接,他就是Controller
Controller的备份
1.采用备份的方式
2.升级,每个节点都能做备份
这里假设一个节点挂了,可以通过Zookeeper的一个选举功能在选举出新的Controller
Controller的备份
1.采用备份的方式
2.升级,每个节点都能做备份
这里假设一个节点挂了,可以通过Zookeeper的一个选举功能在选举出新的Controller
broker架构
为啥生产者和消费者指向同一个broker呢
因为数据是有主题的,主题是有分区的,分区是有副本的,一个叫leader,一个叫foller
指向同一个broker是因为他对应的分区是leader副本,分区管理器会将其同步到文件
集群部署
我们知道kafka一般是以集群方式出现
为了模拟,我们也部署一下集群
这时候解压三个kafka到不同文件夹,修改data配置以及端口即可
可以设置为9091 9092 9093
注意Zookeeper也得配置
可以写批处理脚本,这样运行起来更加方便
出现以下问题就将其放在根目录下或者将文件夹名改短
可视化工具创建主题
注:这里副本数量超过节点数量不会创建成功,因为一个节点放多个副本是无意义的
Zookeeper的作用
我们简述一下Zookeeper的作用
1.Controller的选举
选举规则就是比较随意,第一个建立和zookeeper建立连接的broker节点就是 controller 然后其他的节点来建立连接的时候也想创建,但是controller已经有了,之后的节点就是放一个监听器,假设现在的Controller挂了,这个监听器就起作用了,从其余的broker中选举出新的broker2.对节点的监听
Znode节点有个监听功能 可以使用kafka对节点进行监听到节点的变化 数据的变化 连接超时... 监听到以后马上通知kafka进行对应的处理
Controller和Broker之间的通信
第一个broker启动的流程 1.注册broker节点 监听controller节点 2.注册controller节点 选举成为controller,监听/broker/ids节点 因为broker启动就会创建ids,所以这里的监听主要就是看看ids的变化,是否有新的节点创建了 第二个节点加入之后监听器就知道了,会通知broker1集群的变化 然后在第二个broker进来之后还会和第一个节点连接 传输一些集群的信息等等 但是第三个节点连接上来之后,controller会给两个broker都发送相关的集群信息
这也就是说,每当有节点连上了之后,controller就会向各个节点发送对应的集群信息
Broker组件
主要是包含日志组件 网络客户端 副本管理器 controller信息 kafka apis(负责处理数据) Zookeeper的客户端等等
手动创建主题
我们之前的主题使用的都是默认参数自动创建的,我们如果想修改其中的参数就得手动创建对应的admin管理员对象 从而对他的副本信息分区信息进行设置
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import java.util.Arrays; import java.util.HashMap; import java.util.Map; public class AdminTopicTest { public static void main(String[] args) { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); //TODO 创建管理员 Admin admin = Admin.create(configs); //TODO 创建主题 //第一个参数是主题名 //第二个参数是分区的数量 int //第三个参数是副本的因子(本质是数量) short String topicName = "test1"; int partitionCount = 1; short replicationCount = 1; NewTopic topic = new NewTopic(topicName,partitionCount,replicationCount); String topicName2 = "test2"; int partitionCount2 = 2; short replicationCount2 = 2; NewTopic topic2 = new NewTopic(topicName2,partitionCount2,replicationCount2); CreateTopicsResult result = admin.createTopics( Arrays.asList(topic, topic2) ); //TODO 关闭管理者对象 admin.close(); } }
副本分配策略
主题只是逻辑上的分类,只有分区才能在物理文件上以及存储中有所体现
我们知道我们是使用多个副本冗余来提高数据的可靠性的
那么副本在节点中又是如何分配的呢,咱们接下来慢慢说
先说理想的情况
我们知道副本也分为leader和follower
我们这里的均衡指的是leader的分布应该是均匀的
我们先说理想情况下
我们希望每个节点的leader数量都是相近的
实际上kafka并不是这样的
因为副本的创建是有顺序的,我们无法再一开始就预测浩好这里的副本分配
kafka是采用一个简单的分配算法来进行的副本分配
例子
注:我们也可以自己手动分配
一个重要的名词 ISR in-sync-Replication 就是同步副本列表的意思
主题创建流程
大概就是先问一下controller在哪,通过controller来创建topic
但是底层有很多生产者消费者模型
这里的具体操作由apis接口来实现
生产数据
一般主题都是提前创建好的,如果使用自动创建的话很可能导致IO热点问题
因为副本的leader都在同一个节点上
具体流程如下图
生产者数据先通过拦截器的拦截,然后去元数据区获取controller的信息,然后进行序列化(因为是通过网络传输的数据),在通过分区器确定分区,最后加入缓冲区等待发送
注:拦截器是对数据进行了一些规范化的处理,但是出现错误之后不会导致程序的停止,不影响数据的发送,捕捉到异常也不会进行处理
然后通过发送线程继续发送数据,这里的在途请求缓冲区的大小表示一个节点在同一时间最多处理的请求数量,默认是5,这是经过压力测试的,这样性能最优
分区器
我们刚刚看到数据会经过分区器处理来知道放到哪个分区,下面我们介绍一下分区器是怎么工作的,分区器是从元数据区获取到主题信息再开始计算分区的,注意这里根据的主题信息直接指定分区的话是不会做校验而是直接使用的
算法
分区算法,将key使用散列算法后和分区数进行一次取模运算,在写入数据收集器的时候,就需要进行处理了,如果当前主题分区是未知分区,就会根据当前主题分区的负载情况动态进行分区(粘性分区策略)如果当前发送的时候没有分区负载情况,这时候就是随机选择的,选择以后就尽可能向这里添加,超过阈值就会切换另外一个分区,阈值默认是16k,后面不为空之后就会根据每个分区的负载情况生成一个随机的权重,然后通过一个二分查找找到一个和这个值相近的,然后算出来分区编号
缓冲区
缓冲区中对数据的追加是只要批次大小足够,没到达阈值,直接向后追加即可
批次对象空间不足 将满了的批次对象锁定并关闭,等着sender线程来拿,然后重新开一个批次对象来追加这里的数据 数据是可以超过16k的,比如60k的数据,直接装,然后关闭准备发车,是不可以拆开的
sender发送线程
会将符合发送条件的数据重新进行整合,前面是因为相同主题的不同分区可能在不同的broker中,但是不同主题的分区可能在相同的broker中,用topic进行区分效率更高一点
批次对象到达大小或者是时间阈值之后就会被发送
应答机制 ACKS
本质上是使用异步的方式
发送数据无需等待应答以后再继续发送
这样数据的发送效率高了,但是安全性无法保证
Kafka就面对不同的场景给出了三个ACKS处理等级
分别是 0 1 all
0就是优先考虑效率
all就是优先考虑数据的安全性
1就是两者之间的折中考虑
ACKS = 0
表示只是将数据放到网络中了,根本不关心其是否发送完成,直到放到网络中就给main线程发送一个应答
ACKS等级为1的时候是需要数据在leader中进行保存到文件中之后才能应答
all等级是等待数据进行备份之后才进行对应的应答
retry重试机制
我们知道数据既然是在网络中传输的,那么数据丢包是很正常的,假设网络不稳定等等情况就很容易导致数据的丢包等等
我们这时候和tcp协议一样定义了数据的重传机制
只要主线程没有收到acks,到达一定的超时时间,这时候就会将数据再次放回缓冲区重新进行一次发送
但是这也会导致一定的问题,比如数据重复多次或者是数据乱序问题
好处是让数据更安全,但是也有坏处
数据重复:
假设这里leader写入了磁盘了,但是传ack的时候网络不稳定,没发成,这里就会再传一次 这里数据就在文件中放了两次
数据乱序:
还有一个问题就是数据的顺序问题,发生顺序是 a b c 但是可能 a发送失败重发了 结果就是b c a 的顺序了
这在某种情况下不是我们想看到的,于是我们又引入了幂等性操作和事务的概念
幂等性
幂等性要求数据的ACKS等级一定是all或者-1 (这俩等级一个意思),并且必须开启retry ,并且要求在途请求缓冲区的数量必须小于等于5
实现?
就是给数据标上生产者编号,标上数据序号,但是注意这里的幂等性不可以跨越客户端或者是跨越分区来起作用
这里的幂等性只是在同一个分区内的幂等
数据在发送给Kafka的时候,kafka会记录生产者的状态
重复是靠在加入在途数据缓冲区的时候判断一下contains
有序是按照序号来的,下一个加入的数据必须大于当前的最后一个数据
缺陷:
只能保证一个分区的数据是有序且不重复的
但是如果这时候生产者重启了,此时仍然会导致数据的重复
事务
这里的事务是基于幂等性的,和数据库中的事务完全不是一个意思
基本原理是保证生产者id重启前后不会改变
执行顺序如下
注:事务这里的发送数据不是通过send方法进行发送,而是commit才会发送,send只是将数据放到缓冲区