一、前言
大家好,我是老周,有快二十多天没有更新文章了,很多小伙伴一直在催更。先说明下最近的情况,最近项目上线很忙,没有时间写,并且组里有个同事使用 Kafka 不当,导致线上消息丢失,在修复一些线上的数据,人都麻了。事情是这样,有个 Kafka 消费者实例,部署到线上去,消费到了线上的数据,而新版本做了新的逻辑,新版本的业务逻辑与老版本的业务逻辑不兼容,直接导致消费失败,没有进行重试操作,关键还提交了 offset。直接这部分数据没有被业务处理,导致消息丢失,然后紧急修复线上数据。
刚好这些天忙完了有空,所以记录一下,同时看是否对大家能起到避免踩坑的作用,能有一些作用,那我写的也就值了。
我们下面会从以下三个方面来说一下 Kafka 消息丢失的场景以及最佳实践。
- 生产者丢失消息
- Kafka Broker 服务端丢失消息
- 消费者丢失消息
二、Kafka 的三种消息语义
先说 Kafka 消息丢失的场景之前,我们先来说下 Kafka 的三种消息语义,不会还有人不知道吧?这个不应该了,消息系统基本上抽象成这以下三种消息语义了:
- 最多传递一次
- 最少传递一次
- 仅有一次传递
三、Kafka 消息丢失的场景
3.1 生产者丢失消息
- 目前 Kafka Producer 是异步发送消息的,如果你的 Producer 客户端使用了
producer.send(msg)
方法来发送消息,方法会立即返回,但此时并不能代表消息已经发送成功了。 - 如果消息在发送的过程中发生了网络抖动,那么消息可能没有传递到 Broker,那么消息可能会丢失。
- 如果发送的消息本身不符合,如大小超过了 Broker 的承受能力等。
3.2 Kafka Broker 服务端丢失消息
- Leader Broker 宕机了,触发选举过程,集群选举了一个落后 Leader 太多的 Broker 作为 Leader,那么落后的那些消息就会丢失了。
- Kafka 为了提升性能,使用页缓存机制,将消息写入页缓存而非直接持久化至磁盘,采用了异步批量刷盘机制,也就是说,按照一定的消息量和时间间隔去刷盘,刷盘的动作由操作系统来调度的,如果刷盘之前,Broker 宕机了,重启后在页缓存的这部分消息则会丢失。
3.3 消费者丢失消息
- 消费者拉取了消息,并处理了消息,但处理消息异常了导致失败,并且提交了偏移量,消费者重启后,会从之前已提交的位移的下一个位置重新开始消费,消费失败的那些消息不会再次处理,即相当于消费者丢失了消息。
- 消费者拉取了消息,并提交了消费位移,但是在消息处理结束之前突然发生了宕机等故障,消费者重启后,会从之前已提交的位移的下一个位置重新开始消费,之前未处理完成的消息不会再次处理,即相当于消费者丢失了消息。
四、最佳实践
4.1 生产端
- 不要使用
producer.send(msg)
,而要使用producer.send(msg, callback)
。带有回调通知的 send 方法可以针对发送失败的消息进行重试处理。 - 设置
acks = all
。代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。 - 设置
retries = 3
,当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。不过并不是所有的异常都是可以通过重试来解决的,比如消息太大,超过max.request.size
参数配置的值时,这种方式就不可行了。 - 设置
retry.backoff.ms = 300
,合理估算重试的时间间隔,可以避免无效的频繁重试。
它用来设定两次重试之间的时间间隔,避免无效的频繁重试。在配置retries
和retry.backoff.ms
之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。
4.2 Broker 端
- 设置
unclean.leader.election.enable = false
。它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。 - 设置
replication.factor >= 3
。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。 - 设置
min.insync.replicas > 1
。这控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。 - 确保
replication.factor > min.insync.replicas
。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成replication.factor = min.insync.replicas + 1
。
4.3 消费端
- 确保消息消费完成再提交。最好把它设置成
enable.auto.commit = false
,并采用手动提交位移的方式。这对于单 Consumer 多线程处理的场景而言是至关重要的。
虽然采用手动提交位移的方式可以解决消费端消息丢失的场景,但同时会存在重复消费问题,关于重复消费问题我们下一篇再讲。 - 像我们上面说的那个线上问题,即使你设置了手动提交,消费异常了同时也提交了位移,还是会存在消息丢失。
Kafka 没有重试机制不支持消息重试,也没有死信队列,因此使用 Kafka 做消息队列时,需要自己
实现消息重试的功能。这里我先说下大致的思路,后续有时间再分享代码出来:
- 创建一个 Topic 作为重试 Topic,用于接收等待重试的消息。
- 普通 Topic 消费者设置待重试消息的下一个重试 Topic。
- 从重试 Topic 获取待重试消息存储到 Redis 的 ZSet 中,并以下一次消费时间排序。
- 定时任务从 Redis 获取到达消费时间的消息,并把消息发送到对应的 Topic。
- 同一个消息重试次数过多则不再重试。
欢迎大家关注我的公众号【老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。