1.简述kafka的序列化?
kafka自带的序列化器都实现了org.apache.kafka.common.serialization.Serializer接口
2.简述kafka事务?
kafka 的事务是从0.11 版本开始支持的,kafka 的事务是基于 Exactly Once 语义的,它能保证生产或消费消息在跨分区和会话的情况下要么全部成功要么全部失败
生产者事务
当生产者投递一条事务性的消息时,会先获取一个 transactionID ,并将Producer 获得的PID 和 transactionID 绑定,当 Producer 重启,Producer 会根据当前事务的 transactionID 获取对应的PID。kafka 管理事务是通过其组件 Transaction Coordinator 来实现的,这个组件管理每个事务的状态,Producer 可以通过transactionID 从这个组件中获得 对应事务的状态,该组件还会将事务状态持久化到kafka一个内部的 Topic 中。生产者事务的场景:一批消息写入 a、b、c 三个分区,如果 ab写入成功而c失败,那么kafka就会根据事务的状态对消息进行回滚,将ab写入的消息剔除掉并通知 Producer 投递消息失败。
消费者事务
消费者事务的一致性比较弱,只能够保证消费者消费消息是精准一次的(有且只有一次)。消费者有一个参数 islation.level,这个参数指定的是事务的隔离级别。它的默认值是 read_uncommitted(未提交读),意思是消费者可以消费未commit的消息。当参数设置为 read_committed,则消费者不能消费到未commit的消息。
3.数据采集的过程中,为什么使用kafka?
采集层 主要可以使用 Flume, Kafka 等技术。
Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展 API.
Kafka:Kafka 是一个可持久化的分布式的消息队列。 Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题 Topics。
相比之下,Flume 是一个专用工具被设计为旨在往 HDFS,HBase 发送数据。它对HDFS 有特殊的优化,并且集成了 Hadoop 的安全特性。
所以,Cloudera 建议如果数据被多个系统消费的话,使用 kafka;如果数据被设计给Hadoop 使用,使用 Flume。
4.Kafka生产者消费者怎么做数据传递的?
发送确认(ACK应答机制)
producer在向kafka写⼊消息的时候,可以设置参数来确定是否确认kafka接收到数据,下面提供了三个可选的值
①acks=0:
意味着producer向集群中发送数据之后不需要等到集群的响应直接返回,不确保消息发送成功
在这种情况下还是有可能发生错误, 比如发送的对象无法被序列化或者网卡发生故障,但如果是分区离线或整个集群长时间不可用,那就不会收到任何错误。即使是在发生完全首领选举的情况下,这种模式仍然会丢失消息,因为在新首 领选举过程中它并不知道首领已经不可用了
在acks=0模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这 种模式,一定会丢失一些消息
②acks=1:
意味着producer往集群发送数据只要leader应答就可以发送下⼀条,只确保leader发送成功,意味着首领在收到消息并把它写入到分区数据文件(不一 定同步到磁盘上)时会返回确认或错误响应
在这个模式下,如果发生正常的首领选举,生产者会在选举时收到一个 LeaderNotAvailableException 异常,如果生产者能恰当地处理 这个错误(参考下面"配置生产者的重试参数"),它会重试发送消息,最终消息会安全 到达新的首领那里。不过在这个模式下仍然有可能丢失数据,比如 消息已经成功写入首领,但在消息被复制到跟随者副本之前首领发 生崩溃。
③acks=all:
意味着producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下⼀条,确保leader发送成功和所有的副本都完成备份(会等待所有同步副本都收到消息)
如果和 min.insync.replicas 参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到消息。这 是最保险的做法——生产者会一直重试直到消息被成功提交。不过 这也是最慢的做法,生产者在继续发送其他消息之前需要等待所有 副本都收到当前的消息。可以通过使用异步模式和更大的批次来加快速度,但这样做通常会降低吞吐量
5.Kafka如何进行分区?
1.默认的分区策略:Range Startegy(根据范围消费)
Range startegy是对每个主题而言的 , 首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母进行排序。在对十个分区排序的话是0-9;消费者线程排完序是C1-0,C2-0,C2-1。然后用partitions的总数除以消费者的总数来决定每个消费者线程消费几个分区。如果有余数,那么前面的几个消费者线程将会多消费一个分区。在我们的例子里面,我们有十个分区,三个消费者线程,10/3=3---1,那么消费者线程C1-0 将会多消费一个分区,所以最后分区分配的结构看起来是这样的:
C1-0将消费0,1,2,3分区
C2-0将消费4,5,6分区
C2-1将消费7,8,9分区
如果有第十一个分区的话,那么分区是这样的
C1-0将消费0,1,2,3分区
C2-0将消费4,5,6,7分区
C2-1将消费8,9,10分区
如果我们有2个主题(T1和T2),分别都有十个分区,那么最后的分配结果是:
C1-0将消费T1主题中的0,1,2,3分区以及T2主题中0,1,2,3分区
C2-0将消费T1主题中的4,5,6分区以及T2主题中的4,5,6,分区
C2-1将消费T1主题中的7,8,9分区以及T2主题中的7,8,9分区
这就是消费的策略! 就是用总的分区数/消费者线程总数=每个消费者线程应该消费的分区数。当还有余数的时候就将余数分别分发到另外的消费组线程中。
在这里我们不难看出来。C1-0消费者线程比其他消费者线程多消费了两个分区,这就是Range Strategy的一个明显的弊端。 当分区很多的时候,会有个别的线程压力巨大!
2.第二个默认的分区策略:RoundRobin strategy(轮询的消费策略)
在使用RoundRobin Starategy的时候我们必须满足两个条件:
1、同一个consumer Group里面的所有消费者的num.streams必须相等;
2、每个消费者订阅的 主题必须相同
在这里我们假设2个消费者的num.streams=2. RoundRobin starategy的工作原理: 将所有主题的分区组成TopicAndPartition列表,然后对TopAndPartition列表按照hashcode进行排序,
最后按照round-robin风格将分区分别分配给不同的消费者线程。
在我们的例子中,假如按照hashcode排序完的topic-partition组依次为T1-5,T1-3,T1-0.T1-8.T1-2,T1-1,T1-4,T1-6,T1-9,我们的消费者线程排序为C1-0, C1-1 ,C2-0,C2-1,最后分区分配结果为:
C1-0将消费T1-5 , T1-2 , T1-6 分区;
C1-1将消费T1-3 , T1-1 , T1-9 分区;
C2-0将消费T1-0 , T1-4分区;
C2-1将消费T1-8 , T1-7分区;
遗憾的是,目前我们还不能自定义分区分配策略,只能通过partition.assignment.strategy参数选择range或roundrobin。 partition.assignment.strategy参数默认的值是range。
6.Kafka中有多少个topic?
通常情况:多少个日志类型就多少个Topic。也有对日志类型进行合并的。
7.Kafka如何保证精准消费?
kafka中如何实现精准写入数据?
A:Producer 端写入数据的时候保证幂等性操作:
幂等性:对于同一个数据无论操作多少次都只写入一条数据,如果重复写入,则执行不成功
B:broker写入数据的时候,保证原子性操作, 要么写入成功,要么写入失败。(不成功不断 进行重试)
8.Kafka作为分布式消息系统,优势是什么?
可伸缩性: Kafka的两个重要特性造就了它的可伸缩性。
1、Kafka集群在运行期间可以轻松地扩展或收缩(可以添加或册删除代理),而不会宕机。
2、可以扩展一个Kafka主题来包含更多的分区。由于一个分区无法扩展到多个代理,所以它 的容量受到代理磁盘空间的限制。能够增加分区和代理的数量意味着单个主题可以存储的数 据量是没有限制的。
容错性和可靠性:Kafka的设计方式使某个代理的故障能够被集群中的其他代理检测到。由于 每个主题都可以在多个代理上复制,所以集群可以在不中断服务的情况下从此类故障中恢复 并继续运行。
吞吐量:代理能够以超快的速度有效地存储和检索数据。
9.Kafka如何保证服务器的负载均衡?
分区器:
分区器是生产者层面的负载均衡。Kafka 生产者生产消息时,根据分区器将消息投递到 指定的分区中,所以 Kafka 的负载均衡很大程度上依赖于分区器。
Kafka 默认的分区器是 Kafka 提供的 DefaultPartitioner。它的分区策略是根据 Key 值进行分区分配的:
如果 key 不为 null:对 Key 值进行 Hash 计算,从所有分区中根据 Key 的 Hash 值计算出一个分区号;拥有相同 Key 值的消息被写入同一个分区;
如果 key 为 null:消息将以轮询的方式,在所有可用分区中分别写入消息。
如果不想使用 Kafka 默认的分区器,用户可以实现 Partitioner 接口,自行实现分区方法。
注:
分区器的负载均衡与顺序性有着一定程度上的矛盾。
负载均衡的目的是将消息尽可能平均分配,对于 Kafka 而言,就是尽可能将消息平均分配给所有分区;
如果使用 Kafka 保证顺序性,则需要利用到 Kafka 的分区顺序性的特性。对于需要保 证顺序性的场景,通常会利用 Key 值实现分区顺序性,那么所有 Key 值相同的消 息就会进入同一个分区。这样的情况下,对于大量拥有相同 Key 值的消息,会涌 入同一个分区,导致一个分区消息过多,其他分区没有消息的情况,即与负载均衡 的思想相悖。
并非分区数量越多,效率越高:
Topic 每个 partition 在 Kafka 路径下都有一个自己的目录,该目录下有两个主要的文件:base_offset.log 和 base_offset.index。Kafka 服务端的 ReplicaManager 会为每个 Broker 节点保存每个分区的这两个文件的文件句柄。所以如果分区过多, ReplicaManager 需要保持打开状态的文件句柄数也就会很多。
每个 Producer, Consumer 进程都会为分区缓存消息,如果分区过多,缓存的消息越多, 占用的内存就越大;
n 个分区有 1 个 Leader,(n-1) 个 Follower,如果运行过程中 Leader 挂了,则会从剩余 (n-1) 个 Followers 中选举新 Leader;如果有成千上万个分区,那么需要很长时 间的选举,消耗较大的性能。
再均衡
再均衡是消费者层面的负载均衡
10.Kakfa如何保证消息的一致性?
使用ISR,ISR是一组与leader同步的消息副本集合,包括leader,一旦leader宕机,那么这个集合中的任何一个节点随时都可以被选为leader。Kafka中的follower从leader复制数据时,充分利用了磁盘的顺序读写和零拷贝技术sendfile(in,out)。
ISR的伸缩性:follower从leader同步数据有一些延迟,如果超过阈值就会把这个follower剔除出ISR, 存入OSR(超时连接列表),新加入的follower也会先存放在OSR中。
AR: 所有副本
11.Kakfa的压测你知道吗?
Kafka官方自带压力测试脚本(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。
12.如何保证kafka丢不丢失数据?
Ack=0,相当于异步发送,消息发送完毕即offset增加,继续生产。
Ack=1,leader收到leader replica 对一个消息的接受ack才增加offset,然后继续生产。
Ack=-1,leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产。
13.简述一下kafka的架构?
一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息
Kafka 架构分为以下几个部分
Producer :消息生产者,就是向 kafka broker 发消息的客户端。
Consumer :消息消费者,向 kafka broker 取消息的客户端。
Topic :可以理解为一个队列,一个 Topic 又分为一个或多个分区。
Consumer Group:这是 kafka 用来实现一个 topic 消息的广播(发给所有的 consumer)和单播(发给任意一个 consumer)的手段。一个 topic 可以有多个 Consumer Group。
Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker上,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的id(offset)。将消息发给 consumer,kafka 只保证按一个 partition 中的消息的顺序,不保证一个 topic 的整体(多个 partition 间)的顺序。
Offset:kafka 的存储文件都是按照 offset.kafka 来命名,用 offset 做名字的好处是方便查找。例如你想找位于 2049 的位置,只要找到 2048.kafka 的文件即可。当然 the first offset 就是 00000000000.kafka。
14.简述kafka中topic,consumer,cunsumer group的关系?
本质上kafka只支持Topic;
每个group中可以有多个consumer,每个consumer属于一个consumer group;
通常情况下,一个group中会包含多个consumer,这样不仅可以提高topic中消息的并发消费能力,而且还能提高"故障容错"性,如果group中的某个consumer失效那么其消费的partitions将会有其他consumer自动接管。
对于Topic中的一条特定的消息,只会被订阅此Topic的每个group中的其中一个consumer消费,此消息不会发送给一个group的多个consumer;
那么一个group中所有的consumer将会交错的消费整个Topic,每个group中consumer消息消费互相独立,我们可以认为一个group是一个"订阅"者。
在kafka中,一个partition中的消息只会被group中的一个consumer消费(同一时刻);
一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以同时消费多个partitions中的消息。
kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。
kafka只能保证一个partition中的消息被某个consumer消费时是顺序的;事实上,从Topic角度来说,当有多个partitions时,消息仍不是全局有序的。
15.一个topic可以被两个cunsumer group消费吗?一个consumer可以消费两个分区吗?
消费方式
pull模式从broker中读取数据,避免push方式造成的consumer来不及处理消息的问题。但是会造成空轮询(为了避免此种情况,当放回空消息时,消费者会等待一个timeout后再去轮询)。
分区分配策略
一个consumer group中有多个consumer,一个topic中有多个partition,相同消费群组内不可以消费相同的partition,涉及到partition如何分配。由参数partition.assignment.strategy指定,默认是range策略。
分类
RoundRobin———按组
RoundRobin策略的工作原理:将所有主题的分区组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode 进行排序。
使用RoundRobin策略有两个前提条件必须满足:
同一个Consumer Group里面的所有消费者的num.streams必须相等;
每个消费者订阅的主题必须相同。
Range———按主题
Range策略是对当前主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
策略触发
当以下事件发生时,Kafka 将会进行一次分区分配:
同一个 Consumer Group 内新增消费者
消费者离开当前所属的Consumer Group,包括shuts down 或 crashes
订阅的主题新增分区
offset存储
kafka消费者在会保存其消费的进度,也就是offset,存储的位置根据选用的kafka-api不同而 不同。
zookeeper存储
首先来说说消费者如果是根据java-api来消费,也就是 kafka.javaapi.consumer.ConsumerConnector,通过配置参数zookeeper.connect来消费。这种 情况下,消费者的offset会更新到zookeeper的consumers/{group}/offsets/{topic}/{partition} 目录下。
2.kafka topic存储
如果是根据kafka默认的api来消费,即org.apache.kafka.clients.consumer.KafkaConsumer, 我们会配置参数bootstrap.servers来消费。而其消费者的offset会更新到一个kafka自带的 topic:__consumer_offsets下面,查看当前group的消费进度,则要依靠kafka自带的工具 kafka-consumer-offset-checker
3.offset更新方式
offset更新的方式,不区分是用的哪种api,大致分为两类:
自动提交,设置enable.auto.commit=true,这种方式称为【at most once】,fetch到消息后就 可以更新offset,无论是否消费成功。
手动提交,设置enable.auto.commit=false,这种方式称为【at least once】。fetch到消息后, 等消费完成再调用方法【consumer.commitSync()】,手动更新offset;如果消费失败,则offset 也不会更新,此条消息会被重复消费一次
16.Kafka中master挂了如何解决?
先考虑业务是否受到影响
kafka 宕机了,首先我们考虑的问题应该是所提供的服务是否因为宕机的机器而受到影响,如果服务提供没问题,如果实现做好了集群的容灾机制,那么这块就不用担心了。
节点排错与恢复
想要恢复集群的节点,主要的步骤就是通过日志分析来查看节点宕机的原因,从而解决,重新恢复节点。
17.Kafka出现数据重复消费如何解决?
幂等性+ack-1+事务
Kafka数据重复,可以再下一级:SparkStreaming、redis或者hive中dwd层去重,去重的手段:分组、按照id开窗只取第一个值;
18.简述一下kafka的分区策略?
在 Kafka内部存在两种默认的分区分配策略:Range和 RoundRobin。
Range是默认策略。Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
例如:我们有10个分区,两个消费者(C1,C2),3个消费者线程,10 / 3 = 3而且除不尽。
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6 分区
C2-1 将消费 7, 8, 9 分区
第一步:将所有主题分区组成TopicAndPartition列表,然后对TopicAndPartition列表按照hashCode进行排序,最后按照轮询的方式发给每一个消费线程。
19.hw,isr,leo分别代表什么?
分区中的所有副本统称为AR(Assigned Repllicas)。所有与leader副本保持一定程度同步的副本(包括Leader)组成ISR(In-Sync Replicas),ISR集合是AR集合中的一个子集。
HW是High Watermak的缩写, 俗称高水位,它表示了一个特定消息的偏移量(offset),消费之只能拉取到这个offset之前的消息。
20.Kafka 消费过的消息如何再消费?
kafka 消费消息的 offset 是定义在 zookeeper 中的, 如果想重复消费 kafka 的消息, 可以在 redis 中自己记录 offset 的 checkpoint 点(n 个),当想重复消费消息时,通过读取 redis 中的 checkpoint 点进行 zookeeper 的 offset 重设,这样就可以达到重
复消费消息的目的了
21.kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
kafka 使用的是磁盘存储。速度快是因为:
顺序写入:因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是耗时的。所以硬盘 “讨厌”随机 I/O, 喜欢顺序 I/O。为了提高读写硬盘的速度,Kafka 就是使用顺序 I/O。Memory Mapped Files(内存映射文件):64 位操作系统中一般可以表示 20G 的数据文件,它的工作原理是直接利用操作系统的Page 来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上。Kafka 高效文件存储设计: Kafka 把 topic 中一个 parition 大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。通过索引信息可以快速定位
message 和确定 response 的 大 小。通过 index 元数据全部映射到 memory(内存
映射文件),
可以避免 segment file 的 IO 磁盘操作。通过索引文件稀疏存储,可以大幅降低index 文件元数据占用空间大小。
22.Kafka 数据怎么保障不丢失?
分三个点说,一个是生产者端,一个消费者端,一个 broker 端。
生产者数据的不丢失
kafka 的 ack 机制:在 kafka 发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到,其中状态有 0,1,-1。
如果是同步模式:
ack 设置为 0,风险很大,一般不建议设置为 0。即使设置为 1,也会随着 leader 宕机丢失数据。所以如果要严格保证生产端数据不丢失,可设置为-1。
如果是异步模式:
也会考虑 ack 的状态,除此之外,异步模式下的有个 buffer,通过 buffer 来进行控制数据的发送,有两个值来进行控制,时间阈值与消息的数量阈值,如果 buffer 满了数
据还没有发送出去,有个选项是配置是否立即清空 buffer。可以设置为-1,永久阻塞, 也就数据不再生产。异步模式下,即使设置为-1。也可能因为程序员的不科学操作,
操作数据丢失,比如 kill -9,但这是特别的例外情况。注:
ack=0:producer 不等待 broker 同步完成的确认,继续发送下一条(批)信息。
ack=1(默认):producer 要等待 leader 成功收到数据并得到确认,才发送下一条
message。
ack=-1:producer 得到 follwer 确认,才发送下一条数据。
消费者数据的不丢失
通过 offset commit 来保证数据的不丢失,kafka 自己记录了每次消费的 offset 数值, 下次继续消费的时候,会接着上次的 offset 进行消费。
而 offset 的信息在 kafka0.8 版本之前保存在 zookeeper 中,在 0.8 版本之后保存到topic 中,即使消费者在运行过程中挂掉了,再次启动的时候会找到 offset 的值,找到之前消费消息的位置,接着消费,由于 offset 的信息写入的时候并不是每条消息消费完成后都写入的,所以这种情况有可能会造成重复消费,但是不会丢失消息。
唯一例外的情况是,我们在程序中给原本做不同功能的两个 consumer 组设置KafkaSpoutConfig.bulider.setGroupid 的时候设置成了一样的 groupid,这种情况会导致这两个组共享同一份数据,就会产生组 A 消费 partition1,partition2 中的消息,组 B 消费 partition3 的消息,这样每个组消费的消息都会丢失,都是不完整的。为了保证每个组都独享一份消息数据,groupid 一定不要重复才行。
kafka 集群中的 broker 的数据不丢失
每个 broker 中的 partition 我们一般都会设置有 replication(副本)的个数,生产者写入的时候首先根据分发策略(有 partition 按 partition,有 key 按 key,都没有轮询)写入到 leader 中,follower(副本)再跟 leader 同步数据,这样有了备份,也可以保证消息数据的不丢失。
23.采集数据为什么选择 kafka?
采集层 主要可以使用 Flume, Kafka 等技术。
Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展 API.
Kafka:Kafka 是一个可持久化的分布式的消息队列。 Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题 Topics。
相比之下,Flume 是一个专用工具被设计为旨在往 HDFS,HBase 发送数据。它对HDFS 有特殊的优化,并且集成了 Hadoop 的安全特性。
所以,Cloudera 建议如果数据被多个系统消费的话,使用 kafka;如果数据被设计给Hadoop 使用,使用 Flume。
24.kafka 重启是否会导致数据丢失?
kafka 是将数据写到磁盘的,一般数据不会丢失。但是在重启 kafka 过程中,如果有消费者消费消息,那么 kafka 如果来不及提交 offset,可能会造成数据的不准确(丢失或者重复消费)。
25.kafka 宕机了如何解决?
先考虑业务是否受到影响
kafka 宕机了,首先我们考虑的问题应该是所提供的服务是否因为宕机的机器而受到影响,如果服务提供没问题,如果实现做好了集群的容灾机制,那么这块就不用担心了。
节点排错与恢复
想要恢复集群的节点,主要的步骤就是通过日志分析来查看节点宕机的原因,从而解决,重新恢复节点。
26.为什么 Kafka 不支持读写分离?
在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。
Kafka 并不支持主写从读,因为主写从读有 2 个很明显的缺点:
数据一致性问题:数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
延时问题:类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历 网络→主节点内存→网络→从节点内存 这几个阶段,整个过程会耗费一定的时间。而在Kafka 中,主从同步会比 Redis 更加耗时,它需要经历 网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘 这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。
而 kafka 的主写主读的优点就很多了:
可以简化代码的实现逻辑,减少出错的可能; 将负载粒度细化均摊,与主写从读相比, 不仅负载效能更好,而且对用户可控;没有延时的影响;在副本稳定的情况下,不会出现数据不一致的情况。
27.kafka 数据分区和消费者的关系?
每个分区只能由同一个消费组内的一个消费者(consumer)来消费,可以由不同的消费组的消费者来消费,同组的消费者则起到并发的效果。
28.kafka 的数据 offset 读取流程
连接 ZK 集群,从 ZK 中拿到对应 topic 的 partition 信息和 partition 的 Leader 的相关信息连接到对应 Leader 对应的 brokerconsumer 将⾃自自⼰己己保存的 offset 发送给LeaderLeader 根据 offset 等信息定位到 segment(索引⽂文文件和⽇日日志⽂文文件)根据索引⽂文文件中的内容,定位到⽇日日志⽂文文件中该偏移量量对应的开始位置读取相应
⻓长长度的数据并返回给 consumer
29.kafka 内部如何保证顺序,结合外部组件如何保证消费者的顺序?
kafka 只能保证 partition 内是有序的,但是 partition 间的有序是没办法的。爱奇艺的搜索架构,是从业务上把需要有序的打到同⼀一个 partition。
30.Kafka 消息数据积压,Kafka 消费能力不足怎么处理?
如果是 Kafka 消费能力不足,则可以考虑增加 Topic 的分区数,并且同时提升消费组
的消费者数量,消费者数=分区数。(两者缺一不可)如果是下游的数据处理不及时: 提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处
理的数据小于生产的数据,也会造成数据积压。
31.Kafka 单条日志传输大小
kafka 对于消息体的大小默认为单条最大值是 1M 但是在我们应用场景中, 常常会出现一条消息大于 1M,如果不对 kafka 进行配置。则会出现生产者无法将消息推送到kafka 或消费者无法去消费 kafka 里面的数据, 这时我们就要对 kafka 进行以下配置: server.properties
1replica.fetch.max.bytes: 1048576 broker 可复制的消息的最大字节数, 默认为 1M 2message.max.bytes: 1000012 kafka 会接收单个消息 size 的最大限制, 默认为1M 左右
注意:message.max.bytes 必须小于等于 replica.fetch.max.bytes,否则就会导致 replica 之间数据同步失败
32.副本数设定
一般我们设置成2个或3个,很多企业设置为2个。
副本的优势:提高可靠性;副本劣势:增加了网络IO传输
33.Kafka压测
Kafka官方自带压力测试脚本(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。
34.Kafka日志保存时间
默认保存7天;生产环境建议3天
- Kafka的ISR副本同步队列
ISR(In-Sync Replicas),副本同步队列。ISR中包括Leader和Follower。如果Leader进程挂掉,会在ISR队列中选择一个服务作为新的Leader。有replica.lag.max.messages(延迟条数)和replica.lag.time.max.ms(延迟时间)两个参数决定一台服务是否可以加入ISR副本队列,在0.10版本移除了replica.lag.max.messages参数,防止服务频繁的进去队列。
任意一个维度超过阈值都会把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也会先存放在OSR中。
36.Kafka分区分配策略
在 Kafka内部存在两种默认的分区分配策略:Range和 RoundRobin。
Range是默认策略。Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
例如:我们有10个分区,两个消费者(C1,C2),3个消费者线程,10 / 3 = 3而且除不尽。
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6 分区
C2-1 将消费 7, 8, 9 分区
第一步:将所有主题分区组成TopicAndPartition列表,然后对TopicAndPartition列表按照hashCode进行排序,最后按照轮询的方式发给每一个消费线程。
37.Kafka挂掉
1)Flume记录
2)日志有记录
3)短期没事
38.Kafka丢不丢数据
Ack=0,相当于异步发送,消息发送完毕即offset增加,继续生产。
Ack=1,leader收到leader replica 对一个消息的接受ack才增加offset,然后继续生产。
Ack=-1,leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产。
39.Kafka出现数据重复如何解决?
幂等性+ack-1+事务
Kafka数据重复,可以再下一级:SparkStreaming、redis或者hive中dwd层去重,去重的手段:分组、按照id开窗只取第一个值;
40.Kafka消息数据积压,Kafka消费能力不足怎么处理?
1)如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)
2)如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。
41.Kafka参数优化你都知道哪些?
1)Broker参数配置(server.properties)
1、日志保留策略配置
保留三天,也可以更短 (log.cleaner.delete.retention.ms)
log.retention.hours=72
2、Replica相关配置
default.replication.factor:1 默认副本1个
3、网络通信延时
replica.socket.timeout.ms:30000 #当集群之间网络不稳定时,调大该参数
replica.lag.time.max.ms= 600000# 如果网络不好,或者kafka集群压力较大,会出现副本丢失,然后会频繁复制副本,导致集群压力更大,此时可以调大该参数
2)Producer优化(producer.properties)
compression.type:none
默认发送不进行压缩,推荐配置一种适合的压缩算法,可以大幅度的减缓网络压力和Broker的存储压力。
GZIP、Snappy和LZ4。从2.1.0开始,kafka正式支持Zstandard算法(简写zstd)。它是Facebook开源的一个压缩算法,能够提供超高的压缩比。对于kafka测试而言,在吞吐方面:LZ4>Snappy> zstd、GZIP;在压缩比方面:zstd>lz4>gzip>snappy。具体到物理资源,使用snappy算法占用的网络带宽资源最多,zstd最少,这是合理的,毕竟zstd就是要提供超高的压缩比;在CPU使用率方面,各个算法表现得差不多,只是在压缩时snappy使用的CPU较多一些,而在解压缩时gzip算法则可能使用更多的CPU。
3)Kafka内存调整(kafka-server-start.sh)
默认内存1个G,生产环境尽量不要超过6个G。
export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g"
42.Kafka高效读写数据怎么做?
1)Kafka本身是分布式集群,同时采用分区技术,并发度高。
2)顺序写磁盘
Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。
3)零复制技术
43.Kafka单条日志传输大小怎么设置?
kafka对于消息体的大小默认为单条最大值是1M但是在我们应用场景中, 常常会出现一条消息大于1M,如果不对kafka进行配置。则会出现生产者无法将消息推送到kafka或消费者无法去消费kafka里面的数据, 这时我们就要对kafka进行以下配置:server.properties
replica.fetch.max.bytes: 1048576 broker可复制的消息的最大字节数, 默认为1M
message.max.bytes: 1000012 kafka 会接收单个消息size的最大限制, 默认为1M左右
注意:message.max.bytes必须小于等于replica.fetch.max.bytes,否则就会导致replica之间数据同步失败。
44.Kafka过期数据清理如何做?
保证数据没有被引用(没人消费他)
日志清理保存的策略只有delete和compact两种
log.cleanup.policy=delete启用删除策略
log.cleanup.policy=compact启用压缩策略
https://www.jianshu.com/p/fa6adeae8eb5
45.Kafka中的数据是有序的吗
单分区内有序;多分区,分区与分区间无序;
46.Kafka 判断一个节点是否还活着有那两个条件?
(1)节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接
(2)如果节点是个 follower,他必须能及时的同步 leader 的写操作,延时不能太久
47.Kafka 都有哪些特点?
高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
可扩展性:kafka集群支持热扩展
持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
高并发:支持数千个客户端同时读写
48.请简述下你在哪些场景下会选择 Kafka?
日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务 的方式开放给各种consumer,例如hadoop、HBase、Solr等。
消息系统:解耦和生产者和消费者、缓存消息等。
用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、 搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中, 然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、 数据仓库中做离线分析和挖掘。
运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生 产各种操作的集中反馈,比如报警和报告。
流式处理:比如spark streaming和 Flink
49.Kafka创建Topic时如何将分区放置到不同的Broker中?
副本因子不能大于 Broker 的个数;
第一个分区(编号为0)的第一个副本放置位置是随机从 brokerList 选择的;
其他分区的第一个副本放置位置相对于第0个分区依次往后移。也就是如果我们有5个 Broker,5个分区,假设第一个分区放在第四个 Broker 上,那么第二个分区将会放在第五个 Broker 上;第三个分区将会放在第一个 Broker 上;第四个分区将会放在第二个 Broker 上,依次类推;
剩余的副本相对于第一个副本放置位置其实是由 nextReplicaShift 决定的,而这个数也是随机产生的;
50.Kafka 分区数可以增加或减少吗?为什么?
我们可以使用 bin/kafka-topics.sh 命令对 Kafka 增加 Kafka 的分区数据,但是 Kafka 不支持减少分区数。Kafka 分区数据不支持减少是由很多原因的,比如减少的分区其数据放到哪里去?是删除,还是保留?删除的话,那么这些没消费的消息不就丢了。如果保留这些消息如何放到其他分区里面?追加到其他分区后面的话那么就破坏了 Kafka 单个分区的有序性。如果要保证删除分区数据插入到其他分区保证有序性,那么实现起来逻辑就会非常复杂。
51.为什么要使用 kafka?
缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka 在中间可以起到一个缓冲的作用,把消息暂存在 kafka 中,下游服务就可以按照自己的节奏进行慢慢处理。解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。冗余:可以采用一对多的方式,一个生产者发
布消息,可以被多个订阅 topic 的服务消费到,供多个毫无关联的业务使用。健壮性: 消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正
常进行。异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
52.Kafka 消费过的消息如何再消费?
kafka 消费消息的 offset 是定义在 zookeeper 中的, 如果想重复消费 kafka 的消息, 可以在 redis 中自己记录 offset 的 checkpoint 点(n 个),当想重复消费消息时,通过读取 redis 中的 checkpoint 点进行 zookeeper 的 offset 重设,这样就可以达到重
复消费消息的目的了
53.kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
kafka 使用的是磁盘存储。速度快是因为:
顺序写入:因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是耗时的。所以硬盘 “讨厌”随机 I/O, 喜欢顺序 I/O。为了提高读写硬盘的速度,Kafka 就是使用顺序 I/O。Memory Mapped Files(内存映射文件):64 位操作系统中一般可以表示 20G 的数据文件,它的工作原理是直接利用操作系统的Page 来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上。Kafka 高效文件存储设计: Kafka 把 topic 中一个 parition 大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。通过索引信息可以快速定位
message 和确定 response 的 大 小。通过 index 元数据全部映射到 memory(内存
映射文件),
可以避免 segment file 的 IO 磁盘操作。通过索引文件稀疏存储,可以大幅降低index 文件元数据占用空间大小。
54.Kafka 数据怎么保障不丢失?
分三个点说,一个是生产者端,一个消费者端,一个 broker 端。
生产者数据的不丢失
kafka 的 ack 机制:在 kafka 发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到,其中状态有 0,1,-1。
如果是同步模式:
ack 设置为 0,风险很大,一般不建议设置为 0。即使设置为 1,也会随着 leader 宕机丢失数据。所以如果要严格保证生产端数据不丢失,可设置为-1。
如果是异步模式:
也会考虑 ack 的状态,除此之外,异步模式下的有个 buffer,通过 buffer 来进行控制数据的发送,有两个值来进行控制,时间阈值与消息的数量阈值,如果 buffer 满了数
据还没有发送出去,有个选项是配置是否立即清空 buffer。可以设置为-1,永久阻塞, 也就数据不再生产。异步模式下,即使设置为-1。也可能因为程序员的不科学操作,
操作数据丢失,比如 kill -9,但这是特别的例外情况。注:
ack=0:producer 不等待 broker 同步完成的确认,继续发送下一条(批)信息。
ack=1(默认):producer 要等待 leader 成功收到数据并得到确认,才发送下一条
message。
ack=-1:producer 得到 follwer 确认,才发送下一条数据。
消费者数据的不丢失
通过 offset commit 来保证数据的不丢失,kafka 自己记录了每次消费的 offset 数值, 下次继续消费的时候,会接着上次的 offset 进行消费。
而 offset 的信息在 kafka0.8 版本之前保存在 zookeeper 中,在 0.8 版本之后保存到topic 中,即使消费者在运行过程中挂掉了,再次启动的时候会找到 offset 的值,找到之前消费消息的位置,接着消费,由于 offset 的信息写入的时候并不是每条消息消费完成后都写入的,所以这种情况有可能会造成重复消费,但是不会丢失消息。
唯一例外的情况是,我们在程序中给原本做不同功能的两个 consumer 组设置KafkaSpoutConfig.bulider.setGroupid 的时候设置成了一样的 groupid,这种情况会导致这两个组共享同一份数据,就会产生组 A 消费 partition1,partition2 中的消息,组 B 消费 partition3 的消息,这样每个组消费的消息都会丢失,都是不完整的。为了保证每个组都独享一份消息数据,groupid 一定不要重复才行。
kafka 集群中的 broker 的数据不丢失
每个 broker 中的 partition 我们一般都会设置有 replication(副本)的个数,生产者写入的时候首先根据分发策略(有 partition 按 partition,有 key 按 key,都没有轮询)写入到 leader 中,follower(副本)再跟 leader 同步数据,这样有了备份,也可以保证消息数据的不丢失。
55.采集数据为什么选择 kafka?
采集层 主要可以使用 Flume, Kafka 等技术。
Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展 API.
Kafka:Kafka 是一个可持久化的分布式的消息队列。 Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题 Topics。
相比之下,Flume 是一个专用工具被设计为旨在往 HDFS,HBase 发送数据。它对HDFS 有特殊的优化,并且集成了 Hadoop 的安全特性。
所以,Cloudera 建议如果数据被多个系统消费的话,使用 kafka;如果数据被设计给Hadoop 使用,使用 Flume。
55.kafka 重启是否会导致数据丢失?
kafka 是将数据写到磁盘的,一般数据不会丢失。但是在重启 kafka 过程中,如果有消费者消费消息,那么 kafka 如果来不及提交 offset,可能会造成数据的不准确(丢失或者重复消费)。
56.kafka 宕机了如何解决?
先考虑业务是否受到影响
kafka 宕机了,首先我们考虑的问题应该是所提供的服务是否因为宕机的机器而受到影响,如果服务提供没问题,如果实现做好了集群的容灾机制,那么这块就不用担心了。
节点排错与恢复
想要恢复集群的节点,主要的步骤就是通过日志分析来查看节点宕机的原因,从而解决,重新恢复节点。
57.为什么 Kafka 不支持读写分离?
在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。
Kafka 并不支持主写从读,因为主写从读有 2 个很明显的缺点:
数据一致性问题:数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。延时问题:类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历 网络→主节点内存→网络→从节点内存 这几个阶段,整个过程会耗费一定的时间。而在Kafka 中,主从同步会比 Redis 更加耗时,它需要经历 网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘 这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。
而 kafka 的主写主读的优点就很多了:
可以简化代码的实现逻辑,减少出错的可能; 将负载粒度细化均摊,与主写从读相比, 不仅负载效能更好,而且对用户可控;没有延时的影响;在副本稳定的情况下,不会出现数据不一致的情况。
58.kafka 数据分区和消费者的关系?
每个分区只能由同一个消费组内的一个消费者(consumer)来消费,可以由不同的消费组的消费者来消费,同组的消费者则起到并发的效果。
59.kafka 的数据 offset 读取流程
连接 ZK 集群,从 ZK 中拿到对应 topic 的 partition 信息和 partition 的 Leader 的相关信息连接到对应 Leader 对应的 brokerconsumer 将⾃自自⼰己己保存的 offset 发送给LeaderLeader 根据 offset 等信息定位到 segment(索引⽂文文件和⽇日日志⽂文文件)根据索引⽂文文件中的内容,定位到⽇日日志⽂文文件中该偏移量量对应的开始位置读取相应
⻓长长度的数据并返回给 consumer
60.kafka 内部如何保证顺序,结合外部组件如何保证消费者的顺序?
kafka 只能保证 partition 内是有序的,但是 partition 间的有序是没办法的。爱奇艺的搜索架构,是从业务上把需要有序的打到同⼀一个 partition。
61.Kafka 消息数据积压,Kafka 消费能力不足怎么处理?
如果是 Kafka 消费能力不足,则可以考虑增加 Topic 的分区数,并且同时提升消费组
的消费者数量,消费者数=分区数。(两者缺一不可)如果是下游的数据处理不及时: 提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处
理的数据小于生产的数据,也会造成数据积压。
62.Kafka 单条日志传输大小
kafka 对于消息体的大小默认为单条最大值是 1M 但是在我们应用场景中, 常常会出现一条消息大于 1M,如果不对 kafka 进行配置。则会出现生产者无法将消息推送到kafka 或消费者无法去消费 kafka 里面的数据, 这时我们就要对 kafka 进行以下配置: server.properties
1replica.fetch.max.bytes: 1048576 broker 可复制的消息的最大字节数, 默认为 1M 2message.max.bytes: 1000012 kafka 会接收单个消息 size 的最大限制, 默认为1M 左右
注意:message.max.bytes 必须小于等于 replica.fetch.max.bytes,否则就会导致 replica 之间数据同步失败。