架构图
一个kafka集群中包含一个或多个Producer、一个或多个broker、一个或多个ConsumerGrop以及一个Zookeeper集群。kafka通过Zookeeper管理kafka集群配置、leader副本的选举、生产者的负载均衡等。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
专业术语
- kafkaCluster : kafka集群,由一个或多个Broker节点组成。
- Broker : 一个Kafka集群包括一个或多个服务器,一台服务器就是一个Broker节点。Broker用于保存Producer发送的消息。
- Producer :生产者,用来发送指定的Topic的消息到Broker。生产者可以是代码,还可以是命令行工具。本质上是一个进程或者线程。
- Consumer :消费者,用来接收/消费Kafka集群中的消息。每个Consumer属于一个ConsumerGroup(如果在创建消费者时没有指定Consumer,系统会默认分配一个ConsumerGroup),消费者可以是代码,还可以是命令行工具,本质上就是一个进程/线程。
- ConsumerGroup :消费者组,由一个或多个Consumer组成(在同一个消费者组的消费者具有相同的
group.id
),便于管理Consumer。 - Zookeeper :在Kafka集群中用来存储元数据,如:有Broker节点信息、分区的信息、分区与Broker的对应关系、生产者的负载均衡等等。
- Topic :主题,主题用于区分业务,比如订单主题业务,购物车主题业务,物流主题业务……方便对消息进行分类管理
- Partition :分区,一个Topic的消息由一个或多个Partition存储。分区的作用是提高读写并行度/读写效率。
- Segment :分段,发送到kafka集群的消息会先存到内存中,然后划分文件夹、划分文件存入磁盘中
备注: Kafka中有分区和分段的概念,分区就是分文件夹,分段就是分文件。这个思想在Hive中也有:Hive中的分区就是分文件夹,Hive中的分桶就是分文件。
Replication :副本,副本的作用是保证数据的安全性,副本分为Leader(主副本)和Follower(从副本),Leader只有一个,Follower可以有多个,但是副本数一般都为1-3个(副本数过多会占用大量的存储空间)。
注意:读写都只能从Leader进行,Follower在Leader宕机后自动选举出新的Leader。
扩展: 为什么读写都只能从Leader进行?
答:保证数据的一致性,只在Leader中进行写入数据,Follow同步Leader中的数据,在写过程中避免了多个副本中存储的数据不同的问题。Leader 和 Follow之间同步数据存在延时,所以读操作也需要在Leader中进行。
- ISR : 表示目前Alive(活着的)且与Leader能够 “Catch-up”(跟得上)的Replicas(Follower)集合。
- Record :记录,就是发送到Kafka集群的消息。一条消息就是一条记录。
- offset : 偏移量,用于记录消息的序号,各个分区的偏移量都是从0开始。
分区原理
在Kafka生产者代码演示(具体看上一篇代码帖子)中,我们将发送到Kafka的消息封装为record对象,即:、
//将需要发送到Kafka的消息封装为record对象 ProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "key_" + i, "value_" + i);
在new ProducerRecord<>()有多个不同的构造方法,可以指定传入多个不同的参数。如下图所示:
这里我们思考一个问题:key有什么作用?如果不指定key如何分区的?指定了key不指定partition又是如何分区的?
默认分区策略各种分区现象
- 当我们不指定partition(分区)时,仅仅指定topic、key、value时,即
ProducerRecord<String, String> record = new ProducerRecord<>("order", "key_", "value_" + i);
现象为:
key如果相同,会将所有消息发送到同一个分区下。那么key如果不同呢?
ProducerRecord<String, String> record = new ProducerRecord<>("order", "key_" + i, "value_" + i);
现象为:
消息会被发送到多个不同的分区下
- 不指定partition(分区),不指定key时,仅指定topic、value时,即
ProducerRecord<String, String> record = new ProducerRecord<>("order", "value_" + i);
现象为:
消息被发送到不同分区下
- 指定topic、partition、key、value时,即
ProducerRecord<String, String> record = new ProducerRecord<>("order", 0,"key_" + i, "value_" + i);
现象为:
消息被全部发送到指定的分区下。
默认分区策略下分区现象原因
- 没有key
默认使用轮询的方式将消息发送到各个分区
new ProducerRecord<>("order", "value_" + i);
- 有key,没有指定分区
使用Hash取余的方法将消息发送到各个分区。公式为:分区编号 = key的hash % 分区数
new ProducerRecord<>("order","key_"+i, "value_" + i);//key不一样,分区编号结果不一样 new ProducerRecord<>("order","key_", "value_" + i);//key一样,分区编号结果一样
- 有key,有指定的分区
将消息发送到指定的分区下
new ProducerRecord<>("order", 0,"key_"+i, "value_" + i);//只要指定了分区就发送到指定的分区
回答之前留下的问题:key的作用
- 如果没有指定分区,可以根据key将数据发送到各个分区,让数据均匀分布!
- 如果指定了分区,那么key就不起到分区的作用,但是可以进一步区分业务,如order主题下的,不同地区,可以用key来表示。
自定义分区策略
这是系统默认的分区策略,我们可以参考public class DefaultPartitioner implements Partitioner
编写自定义的分区策略。自定分区策略需要实现Partitioner
接口。
架构详解
1.生产者
- 生产者采用push(推模式)向集群发送消息,并且消息是被顺序写磁盘追加到分区中,提高了kafka的写效率(吞吐量)
备注:顺序写效率>>随机写效率 - 生产者只需要连接上任意一个活着的Broker就可以连接上Kafka集群
- 生产者发送消息时可以指定Topic、分区编号、key、value
- 分区编号和key都可以决定消息或者说是记录进入到哪个分区,具体规则如下:
- 没有key,默认轮询方式写入到分区
- 有key,没有分区编号,使用key的hash % 分区数得到分区编号
- 有key,有分区编号,直接使用指定的分区编号
- 也可以使用自定义分区策略,可以参考DefaultPartitioner实现Partitioner接口即可
- 分区的作用:
- 提高读写效率/并行度
- 方便集群扩展,业务扩张,数据增加的时候,可以增加机器,并增加分区数,以提升Kafka处理能力
- 分区的目的是为了提高并行度,数据的安全由副本保证,且副本是以分区来备份的!所以就有了:partition0的leader副本,partition0的follow副本! 注意:只能从leader读写,follow只负责备份
- 消息是局部有序(分区内有序)
生产者发送消息到Kafka的各个分区中,根据消息发送的分区策略,不能保证发送的分区是有序的,但是在分区内按照offset的顺序追加写的。每个主题的每个分区中offset都是从0开始。 - 消息确认机制
acks=0,意思就是KafkaProducer客户端,只要把消息发送出去,不管那条数据有没有在Partition Leader上落到磁盘,都不管他了,直接就认为这个消息发送成功了
acks=1,只要Partition Leader接收到消息,就认为成功了,不管他其他的Follower有没有同步过去这条消息了。
acks=all/-1,意思就是说Partition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功了。
all即所有副本都同步到数据时send方法才返回, 以此来完全判断数据是否发送成功, 理论上来讲数据不会丢失
2.Broker节点
- 分区体现在分文件夹,分段体现在分文件
数据发送到Kafka集群,最终会存储在分区下的分段中,也就是partition下的segment文件中,而kafka是一个消息系统,并不是一个存储系统,所以这些消息/数据/记录是有生命周期
默认配置如下所示:
1)基于时间:log.retention.hours=168 # 7天
2)基于大小:log.retention.bytes=1073741824 #单个segment达到1个G
满足上面任意一个就会被删除!
数据存储在在ZK中
注意:新版本中逐渐的将部分数据存在Kafka的自己主题中,如consumer_offsets中
如何根据offset从分段文件中找到需要读取的数据
1)根据segment的index文件后缀,使用offset去二分查找,确定文件。
2)根据index文件中记录的offset=3对应的.log文件的756(文件字节/便宜),去对应的.log文件中从756开始读offset为3的message-3
3.消费者细节
1.消费者从Kafka消费消息,使用的pull拉取模式
注意:Kafka为什么生产者是push推模式,消费者是pull拉模式,因为这样Kafka的压力较小,性能较高!类似与日常生活中的快递柜!
2.消费者只需要连接上任意一个活着的Broker就可以连接上整个Kafka集群
3.消费者消费消息的时候可以指定从哪个offset开始消费,如果有记录则从记录的位置开始消费,如果没有记录,取决于auto.offset.reset,值为earliest表示从最开始的数据,latest表示从最新的数据,none报错
4.offset可以自动提交也可以手动提交:
自动提交
//是否自动提交offset,true表示自动提交 props.put("enable.auto.commit", "true"); //自动提交偏移量时的时间间隔ms值 props.put("auto.commit.interval.ms", "1000");
手动提交
//是否自动提交offset,true表示自动提交,false表示使用手动提交 props.put("enable.auto.commit", "false"); kafkaConsumer.commitSync();//每消费5条就提交一次!//同步 //kafkaConsumer.commitAsync();//每消费5条就提交一次!//异步
5.老版本offset提交到zk中,新版本提交到默认主题__consumer_offsets中
6.消费者消费消息的时候,可以指定订阅主题–用的较多!
kafkaConsumer.subscribe(Arrays.asList("order"));
也可以指定订阅主题和分区
String topic = "foo"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); kafkaConsumer.assign(Arrays.asList(partition0, partition1));
7.消费者可以指定消费者组名,方便对消费者进行管理
注意:
- 1.一个消费者组中可以有1~n个消费者
- 2.一个主题可以被多个消费者组订阅
- 3.一个消费者组可以订阅多个主题
- 4.同一个主题的同一个分区下的某一条消息,只能被同一个消费者组中的一个消费者消费!
- 5.同一个消费者组消费某个主题的多个分区时是无序的
- 6.如果要有序消费怎么办?–只设置一个分区,但是需要注意:一般不这么做!因为分区是为了提升性能,单分区性能较差!
- 7.分区数最好==消费者组中的消费者数!