一个6年工作经验的小伙伴,被问到这样一个问题,说Kafka是如何避免消息重复消费的?面试完之后,这位小伙伴来找到我,希望我能给一个思路。今天,我给大家分享一下我的思路。
另外,我花了1个多星期,准备了一份500页的PDF面试题解析配套文档,
如何获取? :
扫描文章底部二维码领取!
1、原因分析
我认为,导致Kafka消息重复消费有以下两个原因:
第1个原因是:Kafka消费端重复提交导致消息重复消费。
如图所示,在Broker上存储的消息,都有一个Offset标记,用来记录消费者消费消息的位置。Kafka的消费者是通过offSet标记来维护当 前已经消费的数据,每消费一批数据,Broker就会更新offSet的值,避免重复消费。
而默认情况下,消息消费完以后,会自动提交Offset的值,避免重复消费。
但是Kafka消费端的自动提交,会有一个默认的5秒间隔,也就是说在5秒之后的下一次向Broker拉取消息的时候才提交上一批消费的offset。
所以在消费者消费的过程中,如果遇到应用程序被强制kill掉或者宕机的情况,可能会导致Offset没有及时提交,从而产生重复提交的问题。
第2个原因是:Kafka服务端的Partition再均衡机制导致消息重复消费。
如图所示,在Kafka中有一个Partition Balance机制,就是把多个Partition均衡的分配给多个消费者。消费端会从分配到的Partition里面去消费消息,如果消费者在默认的5分钟内没有处理完这一批消息。就会触发Kafka的Rebalance机制,从而导致offset自动提交失败。而Rebalance之后,消费者还是会从之前没提交的offset位置开始消费,从而导致消息重复消费。
2、解决方案
基于对Kafka消息重复消费的原因分析,我认为可以通过以下两个方法来解决这个问题:
基于这样的背景下,我认为解决重复消费消息问题的方法有几个。
- 提高消费端的处理性能避免触发Balance,比如可以用多线程的方式来处理消息,缩短单个消息消费的时长。或者还可以调整消息处理的超时时间,也还可以减少一次性从Broker上拉取数据的条数。
- 使用ConsumerRebalanceListener,再均衡监听器,它可以用来设定发生再均衡动作前后的一些准备或者收尾工作。
- 开启Kafka的冥等性功能prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
或者将消息生成md5然后保存到MySQL或者Redis中,在处理消息之前先查MySQL或者Redis,进行判断看是否已经消费过。
以上就是我对Kafka避免消息重复消费的解决思路,小伙伴如果更好的思路也可以在评论区讨论一下。