【MQ】Kafka如何保证消息不丢失

简介: 【MQ】Kafka如何保证消息不丢失

没时间的朋友建议直接看总结

Kafka存在丢消息的问题,消息丢失会发生在Broker,Producer和Consumer三种。

Producer端保证消息不丢失

为了提升效率,减少IO,producer在发送数据时可以将多个请求进行合并后发送。被合并的请求发送之前缓存在本地buffer中。


但是,buffer中的数据是危险的。一旦producer被非法的停止了,那么buffer中的数据将丢失,broker将无法收到该部分数据。


建议配置下参数

#每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,
#这有助于提升客户端和服务器上的性能,此配置控制默认批量大小(以字节为单位),默认值为16384
spring.kafka.producer.batch-size=16384
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
spring.kafka.producer.properties.linger.ms=0

生产者(Producer) 调用send方法发送消息之后,消息可能因为网络问题并没有发送过去。


所以,我们不能默认在调用send方法发送消息之后消息发送成功了。为了确定消息是否发送成功,我们要判断消息发送的结果。但是要注意的是 Kafka 生产者(Producer) 使用 send 方法发送消息实际上是异步的操作,我们可以通过 get()方法获取调用结果,但是这样也让它变为了同步操作,示例代码如下:

SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
if (sendResult.getRecordMetadata() != null) {
  logger.info("生产者成功发送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe
              sult.getProducerRecord().value().toString());
}

但是一般不推荐这么做!可以采用为其添加回调函数的形式,示例代码如下:

        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
        future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
                ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));

如果消息发送失败的话,我们检查失败的原因之后重新发送即可。


另外这里推荐为 Producer 的retries(重试次数)设置一个比较合理的值,一般是 3 ,但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波动一次你3次一下子就重试完了

Consumer端保证消息不丢失

消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移(offset)。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。


当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。


**解决办法也比较粗暴,我们手动关闭自动提交 offset,每次在真正消费完消息之后之后再自己手动提交 offset 。**这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。


所以开启手动提交的时候消费端需要去保证幂等性。

Broker端保证消息不丢失

除了生产端和消费端,kafka本身也可能会丢失消息。


kafka为了得到更高的性能和吞吐量,将数据异步批量的存储在磁盘中。消息的刷盘过程,为了提高性能,减少刷盘次数,kafka采用了批量刷盘的做法。即,按照一定的消息量,和时间间隔进行刷盘。这种机制也是由于linux操作系统决定的。将数据存储到linux操作系统种,会先存储到页缓存(Page cache)中,按照时间或者其他条件进行刷盘(从page cache到file),或者通过fsync命令强制刷盘。数据在page cache中时,如果系统挂掉,数据会丢失。


Kafka没有提供同步刷盘的方式。同步刷盘在RocketMQ中有实现


理论上,要完全让kafka保证单个broker不丢失消息是做不到的,只能通过调整刷盘机制的参数缓解该情况。比如,减少刷盘间隔,减少刷盘数据量大小。时间越短,性能越差,可靠性越好(尽可能可靠)。


我们知道 Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个 leader,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。


试想一种情况:假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。

解决办法就是我们设置 acks = all。acks 是 Kafka 生产者(Producer) 很重要的一个参数。

acks 的默认值即为1,代表我们的消息被leader副本接收之后就算被成功发送。当我们配置 acks = all 代表则所有副本都要接收到该消息之后该消息才算真正成功被发送。


一般情况下我们还需要设置 min.insync.replicas> 1 ,这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际生产中应尽量避免默认值 1。


为了保证整个 Kafka 服务的高可用性,你还需要确保 replication.factor > min.insync.replicas 。设想一下假如两者相等的话,只要是有一个副本挂掉,整个分区就无法正常工作了。这明显违反高可用性!一般推荐设置成 replication.factor = min.insync.replicas + 1。


设置 unclean.leader.election.enable = false


Kafka 0.11.0.0版本开始 unclean.leader.election.enable 参数的默认值由原来的true改为false


我们最开始也说了我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。多个 follower 副本之间的消息同步情况不一样,当我们配置了 unclean.leader.election.enable = false 的话,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,即只从ISR中选择leader,这样降低了消息丢失的可能性。


总结

Producer端

  1. 设置batch-size和linger.ms
  2. send方法添加回调函数
  3. 设置retries(重试次数)次数和重试间隔

Consume端

  1. 关闭自动提交 offset,手动去提交(注意保证幂等性).

Broker端

  1. 设置 acks = all
  2. 设置min.insync.replicas> 1
  3. 确保replication.factor > min.insync.replicas
  4. 设置 unclean.leader.election.enable = false


目录
相关文章
|
4天前
|
消息中间件 运维 Java
招行面试:RocketMQ、Kafka、RabbitMQ,如何选型?
45岁资深架构师尼恩针对一线互联网企业面试题,特别是招商银行的高阶Java后端面试题,进行了系统化梳理。本文重点讲解如何根据应用场景选择合适的消息中间件(如RabbitMQ、RocketMQ和Kafka),并对比三者的性能、功能、可靠性和运维复杂度,帮助求职者在面试中充分展示技术实力,实现“offer直提”。此外,尼恩还提供了《尼恩Java面试宝典PDF》等资源,助力求职者提升架构、设计、开发水平,应对高并发、分布式系统的挑战。更多内容及技术圣经系列PDF,请关注【技术自由圈】获取。
|
5月前
|
消息中间件 Java Kafka
消息传递新纪元:探索RabbitMQ、RocketMQ和Kafka的魅力所在
【8月更文挑战第29天】这段内容介绍了在分布式系统中起到异步通信与解耦作用的消息队列,并详细探讨了三种流行的消息队列产品:RabbitMQ、RocketMQ 和 Kafka。其中,RabbitMQ 是一个基于 AMQP 协议的开源消息队列系统,支持多种消息模型;RocketMQ 则是由阿里巴巴开源的具备高性能、高可用性和高可靠性的分布式消息队列,支持事务消息等多种特性;而 Kafka 作为一个由 LinkedIn 开源的分布式流处理平台,以高吞吐量和良好的可扩展性著称。此外,还提供了使用这三种消息队列发送和接收消息的代码示例。总之,这三种消息队列各有优势,适用于不同的业务场景。
81 3
|
2月前
|
消息中间件 大数据 Kafka
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
本文深入探讨了消息队列的核心概念、应用场景及Kafka、RocketMQ、RabbitMQ的优劣势比较,大厂面试高频,必知必会,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
|
2月前
|
消息中间件 存储 监控
ActiveMQ、RocketMQ、RabbitMQ、Kafka 的区别
【10月更文挑战第24天】ActiveMQ、RocketMQ、RabbitMQ 和 Kafka 都有各自的特点和优势,在不同的应用场景中发挥着重要作用。在选择消息队列时,需要根据具体的需求、性能要求、扩展性要求等因素进行综合考虑,选择最适合的消息队列技术。同时,随着技术的不断发展和演进,这些消息队列也在不断地更新和完善,以适应不断变化的应用需求。
131 1
|
3月前
|
消息中间件 存储 监控
说说如何解决RocketMq消息积压?为什么Kafka性能比RocketMq高?它们区别是什么?
【10月更文挑战第8天】在分布式系统中,消息队列扮演着至关重要的角色,它不仅能够解耦系统组件,还能提供异步处理、流量削峰和消息持久化等功能。在众多的消息队列产品中,RocketMQ和Kafka无疑是其中的佼佼者。本文将围绕如何解决RocketMQ消息积压、为什么Kafka性能比RocketMQ高以及它们之间的区别进行深入探讨。
126 1
|
7月前
|
消息中间件 存储 Kafka
Kafka 如何保证消息顺序及其实现示例
Kafka 如何保证消息顺序及其实现示例
175 0
|
5月前
|
消息中间件 存储 监控
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别,设计目标、适用场景、吞吐量、消息存储和持久化、可靠性、集群负载均衡
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别
|
5月前
|
消息中间件 存储 关系型数据库
Kafka 与 RabbitMQ 如何选择使用哪个?
Kafka 与 RabbitMQ 如何选择使用哪个?
62 1
|
5月前
|
消息中间件 Kafka Apache
kafka vs rocketmq: 不要只顾着吞吐量而忘了延迟这个指标
这篇文章讨论了Apache RocketMQ和Kafka的对比,强调RocketMQ在低延迟、消息重试与追踪、海量Topic、多租户等方面进行了优化,特别是在小包非批量和大量分区场景下的吞吐量超越Kafka,适合电商和金融领域等高并发、高可靠和高可用场景。
171 0
|
6月前
|
消息中间件 Kafka API
面试题Kafka问题之RabbitMQ的扩展和二次开发如何解决
面试题Kafka问题之RabbitMQ的扩展和二次开发如何解决
51 1