生产者
1、 Kafka 中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?
分区器
- 消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。
- 如果消息 ProducerRecord 中没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。
序列化器
- 生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给 Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象。
- 生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的。
拦截器
- 拦截器(Interceptor)是早在 Kafka 0.10.0.0 中就已经引入的一个功能,Kafka 一共有两种拦截器:生产者拦截器和消费者拦截器。
- 生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
处理顺序
- 一条消息在通过 send() 方法发往 broker 的过程中,需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。
2、 Kafka 生产者客户端的整体结构是什么样子的?
生产者客户端的整体架构:
3、 Kafka 生产者客户端中使用了几个线程来处理?分别是什么?
两个: 分别为主线程和 Sender 线程(发送线程)。
- 在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。
- Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。
4、 kafka的acks参数的三种机制?
acks参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。
acks 参数有3种类型的值(都是字符串类型):
- acks = 1:默认值即为1。生产者发送消息之后,只要分区的 leader 副本成功写入消息,那么它就会收到来自服务端的成功响应。
- acks = 0:生产者发送消息之后不需要等待任何服务端的响应。
- acks = -1 或 acks = all:生产者在消息发送之后,需要等待 ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。
消费者
1、消费者和消费者组有什么关系?
消费者(Consumer)负责订阅 Kafka 中的主题(Topic),并且从订阅的主题上拉取消息。
每个消费者都有一个对应的消费组 ( Consumer Group) , 当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。
每个消费者只能消费所分配到的分区中的消息。换言之,每一个分区只能被一个消费组中的一个消费者所消费。
如下图所示,某个主题中共有4个分区(Partition):P0、P1、P2、P3
2、消息队列模型知道吗?Kafka 是怎么做到支持这两种模型的?
对于消息中间件而言,一般有两种消息投递模式:点对点(P2P,Point-to-Point)模式和发布/订阅(Pub/Sub)模式。
- 点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息。
- 发布订阅模式定义了如何向一个主题(Topic)发布和订阅消息,主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者从主题中订阅消息。
Kafka 同时支持两种消息投递模式 :
- 如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。
- 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。
3、消费者是否可以消费指定分区消息?
可以的, 在 KafkaConsumer 中还提供了一个 assign() 方法来实现这些功能,此方法的具体定义如下:
public void assign(Collection<TopicPartition> partitions)
这个方法只接受一个参数 partitions,用来指定需要订阅的分区集合。
4、Kafka消费是采用Pull( 拉)模式,还是Push(推)模式?
Kafka 中的消费是基于拉(Pull)模式的。消息的消费一般有两种模式:推模式和拉模式。
- 推(Push)模式是服务端主动将消息推送给消费者;
- 拉(Pull)模式是消费者主动向服务端发起请求来拉取消息。
拉(Pull)模式优缺点:
- 优点:在Pull模式下,消费者可以根据自身速率选择如何拉取数据,避免了低速率的消费者发生崩溃的问题
- 缺点:消费者需要不断的轮询broker是否有新数据,容易发生死循环,内存溢出。
5、 消费者提交消费位移时提交的是当前消费到的最新消息的 offset 还是 offset+1?
消费者使用 offset 来表示消费到分区中某个消息所在的位置。
如下图所示: x表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了x位置的消息,那么我们就可以说消费者的消费位移为x 。
当前消费者需要提交的消费位移并不是x,而是x+1,对应于下图中的 position,它表示下一条需要拉取的消息的位置。
因此消费者提交消费位移时提交的是当前消费到的最新消息的 offset+1。
6、 Kafka 消费位移自动提交方式和手动提交方式了解多少?
自动提交
Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数 enable.auto.commit 配置,默认值为 true。
优点
- 自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。
缺点
会有重复消费和消息丢失的问题
- 重复消费: 假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象 。
- 消息丢失: 假设拉取线程不断地拉取消息并提交消费位移 ,但是处理线程还没来得及处理新拉取的消息就发生了异常, 待其恢复之后会从提交消费位移处进行处理,这样中间的部分消息就没有得到相应的处理,便发生消息丢失的现象。
手动提交
在 Kafka 中还提供了手动位移提交的方式,这样可以使得开发人员对消费位移的管理控制更加灵活。
优点
- 很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费,手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。
如何开启?
开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为 false,示例如下:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
7、 KafkaConsumer 是非线程安全的,那么怎么样实现多线程消费?
KafkaProducer 是线程安全的,然而 KafkaConsumer 却是非线程安全的。 KafkaConsumer 中定义了一个 acquire() 方法,用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出 ConcurrentModifcationException 异常:
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access.
KafkaConsumer 非线程安全并不意味着我们在消费消息的时候只能以单线程的方式执行。 我们可以通过多线程的方式来实现消息消费,多线程的目的就是为了提高整体的消费能力。
第一种实现方式: 线程封闭,即为每个线程实例化一个 KafkaConsumer 对象 。
- 一个线程对应一个 KafkaConsumer 实例,我们可以称之为消费线程。一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。
- 缺点: 这种实现方式的并发度受限于分区的实际个数 ,当消费线程的个数大于分区数时,就有部分消费线程一直处于空闲的状态。
第二种实现方式:是多个消费线程同时消费同一个分区, 这个通过 assign()、seek() 等方法实现 。
- 优点: 可以打破原有的消费线程的个数不能超过分区数的限制,进一步提高了消费的能力 ;
- 缺点: 对于位移提交和顺序控制的处理就会变得非常复杂,实际应用中使用得极少 。
第三种实现方式:将处理消息模块改成多线程的实现方式
- 优点: 可以横向扩展,通过开启多个 KafkaConsumerThread 实例来进一步提升整体的消费能力。 还可以减少TCP连接对系统资源的消耗 。
- 缺点: 对于消息的顺序处理就比较困难了 。
8、Kafka 怎么保证顺序消费?
对于 Kafka 来说,一个 topic 下同一个 partition 中的消息肯定是有序的,生产者在写的时候可以指定一个 key,通过我们会用订单号作为 key,这个 key 对应的消息都会发送到同一个 partition 中,所以消费者消费到的消息也一定是有序的。
那么为什么 Kafka 还会存在消息错乱的问题呢?
- Kafka 从生产者到消费者消费消息这一整个过程其实都是可以保证有序的,导致最终乱序是由于消费者端需要使用多线程并发处理消息来提高吞吐量,比如消费者消费到了消息以后,开启 32 个线程处理消息,每个线程线程处理消息的快慢是不一致的,所以才会导致最终消息有可能不一致。
- 所以对于 Kafka 的消息顺序性保证,其实我们只需要保证同一个订单号的消息只被同一个线程处理的就可以了。由此我们可以在线程处理前增加个内存队列,每个线程只负责处理其中一个内存队列的消息,同一个订单号的消息发送到同一个内存队列中即可。
9、Kafka 有哪些情形会造成重复消费?
对于消费者而言,有一个 offset 的概念,消费者使用 offset 来表示消费到分区中某个消息所在的位置。
消费者消费该数据之后,隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了。下次我要是重启,就会继续从上次消费到的offset来继续消费。
2种情形会造成重复消费
- 位移提交的动作是在消费完所有拉取到的消息之后才执行的
- 假设x+2 代表上一次提交的消费位移 , 当消费 x+5 的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从 x+2 开始的。也就是说,x+2 至 x+4 之间的消息又重新消费了一遍,故而又发生了重复消费的现象。
- 自动提交消费位移会导致重复消费
- 假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象 。
10、Kafka 怎么保证不重复消费?
其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。
幂等(Idempotence):
- 一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。
- 一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。所以,对于幂等的方法,不用担心重复执行会对系统造成任何改变。
常用的设计幂等操作的方法:
- 利用数据库的唯一约束实现幂等: 建立唯一约束,相同的插入操作只能执行一次, 后续重复的插入操作都会失败, 样就实现了一个幂等的操作。
- 为更新的数据设置前置条件:如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。 这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。
- 记录并检查操作: 具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
10、Kafka在什么情况下会出现消息丢失?
Kafka可能会在三个阶段丢失消息:
1、生产者发送数据:
导致 Producer 端消息没有发送成功有以下原因:
- 网络原因:由于网络抖动导致数据根本就没发送到 Broker 端。
- 数据原因:消息体太大超出 Broker 承受范围而导致 Broker 拒收消息。
Kafka Producer 端可以通过配置acks来确认消息是否生产成功,那么这里就可能会丢数据:
- 如果acks配置为0,发生网络抖动消息丢了,生产者不校验ACK就无从得知消息丢失了。
- 如果acks配置为1,但是leader 副本在被其他 follower 副本拉取之前崩溃了 , 新选举的 leader 副本中并没有这条对应的消息 ,此时发生了消息丢失。
- 如果acks配置为all: 生产者在消息发送之后,需要等待 ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。 但这并不意味着消息就一定可靠,因为ISR中可能只有 leader 副本,这样就退化成了 acks=1 的情况。
2、Kafka Broker 存储数据
Kafka Broker 集群接收到数据后会将数据进行持久化存储到磁盘,为了提高吞吐量和性能,采用的是异步批量刷盘的策略,那么这里就可能会丢数据:
- 由于 Kafka 中并没有提供同步刷盘的方式,所以说从单个 Broker 来看还是很有可能丢失数据的。
- kafka 通过多 Partition (分区)多 Replica(副本)机制已经可以最大限度的保证数据不丢失,如果数据已经写入 PageCache 中但是还没来得及刷写到磁盘,此时如果所在 Broker 突然宕机挂掉或者停电,极端情况还是会造成数据丢失。
3、消费者消费消息
消费者消费主要分为两个阶段:
- 标识消息已被消费,commit offset坐标;
- 处理消息。
那么这里就可能会丢数据:
- 先commit再处理消息。如果在处理消息的时候异常了,但是offset 已经提交了,这条消息对于该消费者来说就是丢失了,再也不会消费到了。
- 先处理消息再commit。如果在commit之前发生异常,下次还会消费到该消息,重复消费的问题可以通过业务保证消息幂等性来解决。
11、Kafka 怎么保证不消息丢失?
1、生产者发送数据:
更换调用方式:使用带回调通知函数的方法进行发送消息
- 网络抖动导致消息丢失,Producer 端可以进行重试。
- 消息大小不合格,可以进行适当调整,符合 Broker 承受范围再发送。
acks配置为all:all有多少个副本 Broker 全部收到消息,才认为是消息提交成功的标识。
将 retries 设置为大于0的数, 在 Kafka 2.4 版本中默认设置为Integer.MAX_VALUE。
2、Kafka Broker 存储数据
replication.factor:
- 该参数表示分区副本的个数。建议设置 replication.factor >=3, 这样如果 Leader 副本异常 Crash 掉,Follower 副本会被选举为新的 Leader 副本继续提供服务。
min.insync.replicas:
- 该参数表示消息至少要被写入成功到 ISR 多少个副本才算"已提交",建议设置min.insync.replicas > 1, 这样才可以提升消息持久性,保证数据不丢失。
3、消费者消费消息
为了不丢数据,正确的做法是:先拉取数据、然后业务逻辑处理、最后提交消费 Offset 位移信息。
设置参数 enable.auto.commit = false , 采用手动提交位移的方式。
另外对于消费消息重复的情况,业务自己保证幂等性, 保证只成功消费一次即可。
12、Kafka 的再均衡?
- 再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。
- 在再均衡发生期间,消费组内的消费者是无法读取消息的。也就是说,在再均衡发生期间的这一小段时间内,消费组会变得不可用。
- 另外,当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。 一般情况下,应尽量避免不必要的再均衡的发生。
13、Kafka 的消费流程?
- 消费者(Consumer)负责订阅 Kafka 中的主题(Topic),并且从订阅的主题上拉取消息。
- 在 Kafka 的消费理念中还有一层消费组(Consumer Group)的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。
- 每一个分区只能被一个消费组中的一个消费者所消费。
- Kafka有两种消息投递模式:点对点(P2P,Point-to-Point)模式和发布/订阅(Pub/Sub)模式:
- 如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。
- 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。