聊聊 Kafka:Kafka 消息丢失的场景以及最佳实践

简介: 聊聊 Kafka:Kafka 消息丢失的场景以及最佳实践

一、前言

大家好,我是老周,有快二十多天没有更新文章了,很多小伙伴一直在催更。先说明下最近的情况,最近项目上线很忙,没有时间写,并且组里有个同事使用 Kafka 不当,导致线上消息丢失,在修复一些线上的数据,人都麻了。事情是这样,有个 Kafka 消费者实例,部署到线上去,消费到了线上的数据,而新版本做了新的逻辑,新版本的业务逻辑与老版本的业务逻辑不兼容,直接导致消费失败,没有进行重试操作,关键还提交了 offset。直接这部分数据没有被业务处理,导致消息丢失,然后紧急修复线上数据。

刚好这些天忙完了有空,所以记录一下,同时看是否对大家能起到避免踩坑的作用,能有一些作用,那我写的也就值了。

我们下面会从以下三个方面来说一下 Kafka 消息丢失的场景以及最佳实践。

  • 生产者丢失消息
  • Kafka Broker 服务端丢失消息
  • 消费者丢失消息

二、Kafka 的三种消息语义

先说 Kafka 消息丢失的场景之前,我们先来说下 Kafka 的三种消息语义,不会还有人不知道吧?这个不应该了,消息系统基本上抽象成这以下三种消息语义了:

  • 最多传递一次
  • 最少传递一次
  • 仅有一次传递

三、Kafka 消息丢失的场景

3.1 生产者丢失消息

  • 目前 Kafka Producer 是异步发送消息的,如果你的 Producer 客户端使用了 producer.send(msg) 方法来发送消息,方法会立即返回,但此时并不能代表消息已经发送成功了。
  • 如果消息在发送的过程中发生了网络抖动,那么消息可能没有传递到 Broker,那么消息可能会丢失。
  • 如果发送的消息本身不符合,如大小超过了 Broker 的承受能力等。

3.2 Kafka Broker 服务端丢失消息

  • Leader Broker 宕机了,触发选举过程,集群选举了一个落后 Leader 太多的 Broker 作为 Leader,那么落后的那些消息就会丢失了。
  • Kafka 为了提升性能,使用页缓存机制,将消息写入页缓存而非直接持久化至磁盘,采用了异步批量刷盘机制,也就是说,按照一定的消息量和时间间隔去刷盘,刷盘的动作由操作系统来调度的,如果刷盘之前,Broker 宕机了,重启后在页缓存的这部分消息则会丢失。

3.3 消费者丢失消息

  • 消费者拉取了消息,并处理了消息,但处理消息异常了导致失败,并且提交了偏移量,消费者重启后,会从之前已提交的位移的下一个位置重新开始消费,消费失败的那些消息不会再次处理,即相当于消费者丢失了消息。
  • 消费者拉取了消息,并提交了消费位移,但是在消息处理结束之前突然发生了宕机等故障,消费者重启后,会从之前已提交的位移的下一个位置重新开始消费,之前未处理完成的消息不会再次处理,即相当于消费者丢失了消息。

四、最佳实践

4.1 生产端

  • 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。带有回调通知的 send 方法可以针对发送失败的消息进行重试处理。
  • acks = all代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。


  • 设置 retries = 3,当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。


    如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。不过并不是所有的异常都是可以通过重试来解决的,比如消息太大,超过max.request.size参数配置的值时,这种方式就不可行了。
  • 设置 retry.backoff.ms = 300,合理估算重试的时间间隔,可以避免无效的频繁重试。

    它用来设定两次重试之间的时间间隔,避免无效的频繁重试。在配置 retriesretry.backoff.ms之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。

4.2 Broker 端

  • 设置 unclean.leader.election.enable = false。它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。


  • 设置 replication.factor >= 3。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。


  • 设置 min.insync.replicas > 1。这控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。


  • 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1

4.3 消费端

  • 确保消息消费完成再提交。最好把它设置成 enable.auto.commit = false,并采用手动提交位移的方式。这对于单 Consumer 多线程处理的场景而言是至关重要的。

    虽然采用手动提交位移的方式可以解决消费端消息丢失的场景,但同时会存在重复消费问题,关于重复消费问题我们下一篇再讲。
  • 像我们上面说的那个线上问题,即使你设置了手动提交,消费异常了同时也提交了位移,还是会存在消息丢失。
    Kafka 没有重试机制不支持消息重试,也没有死信队列,因此使用 Kafka 做消息队列时,需要自己
    实现消息重试的功能。这里我先说下大致的思路,后续有时间再分享代码出来:
  • 创建一个 Topic 作为重试 Topic,用于接收等待重试的消息。
  • 普通 Topic 消费者设置待重试消息的下一个重试 Topic。
  • 从重试 Topic 获取待重试消息存储到 Redis 的 ZSet 中,并以下一次消费时间排序。
  • 定时任务从 Redis 获取到达消费时间的消息,并把消息发送到对应的 Topic。
  • 同一个消息重试次数过多则不再重试。


欢迎大家Java栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。

相关文章
|
21天前
|
消息中间件 监控 大数据
优化Apache Kafka性能:最佳实践与调优策略
【10月更文挑战第24天】作为一名已经对Apache Kafka有所了解并有实际使用经验的开发者,我深知在大数据处理和实时数据流传输中,Kafka的重要性不言而喻。然而,在面对日益增长的数据量和业务需求时,如何保证系统的高性能和稳定性成为了摆在我们面前的一个挑战。本文将从我的个人视角出发,分享一些关于如何通过合理的配置和调优来提高Kafka性能的经验和建议。
51 4
|
6月前
|
消息中间件 监控 大数据
Kafka消息队列架构与应用场景探讨:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Kafka的消息队列架构,包括Broker、Producer、Consumer、Topic和Partition等核心概念,以及消息生产和消费流程。此外,还介绍了Kafka在微服务、实时数据处理、数据管道和数据仓库等场景的应用。针对面试,文章解析了Kafka与传统消息队列的区别、实际项目挑战及解决方案,并展望了Kafka的未来发展趋势。附带Java Producer和Consumer的代码示例,帮助读者巩固技术理解,为面试做好准备。
619 0
|
3月前
|
消息中间件 存储 Java
场景题:如何提升Kafka效率?
场景题:如何提升Kafka效率?
66 0
场景题:如何提升Kafka效率?
|
3月前
|
分布式计算 搜索推荐 物联网
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
|
3月前
|
消息中间件 存储 Kafka
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决
|
5月前
|
消息中间件 存储 缓存
高性能、高可靠性!Kafka的技术优势与应用场景全解析
**Kafka** 是一款高吞吐、高性能的消息系统,擅长日志收集、消息传递和用户活动跟踪。其优点包括:零拷贝技术提高传输效率,顺序读写优化磁盘性能,持久化保障数据安全,分布式架构支持扩展,以及客户端状态维护确保可靠性。在实际应用中,Kafka常用于日志聚合、解耦生产者与消费者,以及实时用户行为分析。
189 3
|
5月前
|
消息中间件 Java Kafka
springboot整合kafka消费者最佳实践
springboot整合kafka消费者最佳实践
561 1
|
5月前
|
消息中间件 存储 运维
RocketMQ与Kafka深度对比:特性与适用场景解析
RocketMQ与Kafka深度对比:特性与适用场景解析
|
6月前
|
消息中间件 Kafka
【Kafka系列】Kafka事务一般在什么场景下使用呢
面试官:听说你精通Kafka,那我就考考你吧面试官:不用慌尽管说,错了也没关系😊。。。❤️。
100 2
【Kafka系列】Kafka事务一般在什么场景下使用呢
|
6月前
|
消息中间件 NoSQL Kafka
云原生最佳实践系列 5:基于函数计算 FC 实现阿里云 Kafka 消息内容控制 MongoDB DML 操作
该方案描述了一个大数据ETL流程,其中阿里云Kafka消息根据内容触发函数计算(FC)函数,执行针对MongoDB的增、删、改操作。