一、kafka消息丢失的原因
kafka以其高性能、高吞吐、可扩展等出色能力,被广泛应用在各行各业,是事件流处理平台和消息队列中的佼佼者,但是经常可以看到有人在吐槽kafka消息丢失,但是真的是kafka的锅吗,本文我们就来认真分析一下到底哪些场景可能导致消息丢失以及使用怎么样的方案可以防止消息丢失。
我们知道使用kafka设计到三大模块,kafka producer、kafka broker和kafka consumer,分别负责消息的发送、接收和消费,在这三个节点上都有可能有消息丢失。
1、kafka producer
kafka producer消息丢失最常见的案例是producer将消息发送出去,因为网络抖动等原因,消息实际上没有发送到broker,这种情况下当然是不能怪kafka本身了。
kafka producer发送消息的时候主要有三种模式:发后即忘模式(fire-and-forget)、同步模式(sync)和异步模式(async)。
fire-and-forget模式下producer发送消息以后不管消息是否送达brokder的状态,直接返回,这种场景下是无法保证消息不丢失的。
同步和异步模式下,可以通过回调函数或者Future对象来获取发送的状态,从而决定是否对数据进行重发、数据格式进行修改等操作,保证数据一定送达broker。
kafka保证消息不丢失的前提是消息正常被送到broker进行了持久化,如果连消息都没有送达,这显然是producer编程端的问题,而不是kafka本身的问题。
另外对于kafka broker集群来说,数据是被发送到leader副本,但是leader副本也可能存在宕机的问题,如果这时候副本没有及时同步消息,那么也相当于这部分消息丢失了,所以怎么样算是"发送成功",需要一个稳妥的定义,这个可以靠acks客户端参数来控制,下文详述。
2、kafka broker
broker是一个集群,kafka采用分区副本机制,broker中的数据分片partition都有对应的副本,这些副本分散在不同的broker上,一方面是使得kafka具备扩展性,另外一方面则是数据的副本冗余提供了高可用的能力。副本的引导导致了一些问题,例如副本的同步、重新选主等,一旦处理不好也容易导致消息丢失,最典型的如:replica未及时同步leader的信息,此时leader挂掉,重新选择其他的replica担任leader,此时由于新的leader数据落后,导致consumer无法消费到未同步的部分消息,表现为消息丢失。
3、kafka consumer
kafka consumer消息丢失的原因普遍是因为开启了自动位移提交。如果在消费端开启了多个线程进行消费,consumer程序开启了自动位移提交,假如其中某个线程运行失败了,它所消费的消息没有成功处理,但是位移却正常更新了,那么这个消息无法再次被消费到,从consumer的角度来说就是消息丢失了。
要理解好这一点就需要理解kafka consumer的机制,如下图所示,kafka consumer维护一个offset,代表的是数据在broker中的消费位置,随着消息不断被消费,offset逐渐增加,这种设计带来的好处是可以通过设置offset进行消息的重复消费,这是一般的MQ框架所不具备的。
二、kafka producer防消息丢失方案
针对prodcuer端消息丢失的可能原因,可以进行如下的方案优化:
1、使用producer.send(msg, callback)替代producer.send(msg),使用带有回调函数确认机制的方案替代fire-and-forget的模式。
2、设置 acks = all。acks是producer端一个重要的参数,它涉及消息的可靠性和吞吐量之间的权衡。acks有三种值:
- acks=1。这是默认值,代表只要leader写入成功,就能收到broker端的响应。
- acks=0。代表不等待任何broker端的响应。
- acks=-1或者acks=all,代表producer发送消息以后,需要等待ISR种的所有副本全部都写入成功以后才能接收到来自broker端的响应。
很明显,acks=1和acks=0都是容易造成数据丢失的情况的,所以这个值一定要设置成all,除非是对于消息丢失可接受。
3、设置retries参数,设置一个较大的值,可以在网络抖动等情况下,通过重试完成消息的发送。
三、kafaka broker防消息丢失方案
针对broker落后的副本选主导致消息丢失的情况,可以通过一下配置方案来避免:
- 设置 unclean.leader.election.enable = false,这个参数控制了哪些副本可以参与选主,设置为false代表如果数据落后太多则不能参与选主。
- 设置 min.insync.replicas > 1,控制的是消息至少要被写入到多少个副本才算是“已提交”,设置成大于1,提高可用性。
- 保证 replication.factor > min.insync.replicas,replication.factor是指副本分片数,包括leader副本,一般情况下建议至少为3,replication.factor > min.insync.replicas的原理是,如果两者相等,那么只要有一个副本分片挂掉,那么就无法达成min.insync.replicas的条件,集群将会无法运行。
四、kafka consumer防消息丢失方案
针对consumer自动位移导致的消息丢失情况,可以通过设置consumer参数 enable.auto.commit,将此参数设置成 false,并采用手动提交位移的方式来解决。