窥探:消息中间件消息不丢不重奥秘

简介: 窥探:消息中间件消息不丢不重奥秘

前言

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。

比如在网约车系统,可以用它来对峰值写流量做削峰填谷,对次要的业务逻辑做异步处理,对不同的系统模块做解耦合。因为业务逻辑从同步代码中移除了,所以,我们也要有相应的队列处理程序来处理消息、执行业务逻辑。

比如用户在完成网约车订单,系统会给用户发一个抵扣红包,鼓励用户后续打车消费。由于发放红包的过程不在主流程之内,所以你考虑使用消息队列来异步处理。这时,你发现了一个问题:如果消息在投递的过程中发生丢失,那么用户就会因为没有得到红包而投诉。相反,如果消息在投递的过程中出现了重复,那么你的系统就会因为发送两个红包而损失。

那么如何保证,产生的消息一定会被消费到,并且只被消费一次呢?

问题虽然听起来很好理解,但是实际解决过程中却存在很多细节问题, 本文将详细讲解。

消息为啥会丢失

要想保证队列消息仅仅被消费一次,首先就要保证消息不会丢失。那么消息从被写入到消息队列,到被消费者消费完成,这个链路上会有哪些地方存在丢失消息的可能呢?其实,主要存在三个场景:

  • 消息从生产者写入到消息队列的过程。
  • 消息在消息队列中的存储场景。
  • 消息被消费者消费的过程。

下面针对每一个场景,详细地剖析一下,这样就可以针对不同的场景选择合适的,减少消息丢失的解决方案。

在消息生产的过程中丢失消息

在这个环节中主要有两种情况:

  • 网络传输中丢失消息,消息的生产者一般是我们的业务服务器,消息队列是独立部署在单独的服务器上的。两者之间的网络虽然是内网,但是也会存在抖动的可能,而一旦发生抖动,消息就有可能因为网络的错误而丢失。
  • MQ 发生异常未成功接收消息,消息队列因软硬件原因未接收到消息。

解决办法:

  • 消息重传:也就是当你发现发送超时后你就将消息重新发一次,但是你也不能无限制地重传消息。一般来说,如果不是消息队列发生故障,或者是到消息队列的网络断开了,重试 2~3 次就可以了。但是有可能会造成消息的重复,从而导致在消费的时候会重复消费同样的消息。
  • 确认或事务机制:主流的MQ都支持该机制,可以保证生产者将消息送达到 MQ。如 RabbitMQ 就有事务模式和 confirm 模式。

在消息队列中丢失消息

MQ成功接收消息后可能会出现内部处理出错、宕机等情况

拿Kafka 举例,消息在 Kafka 中是存储在本地磁盘上的,而为了减少消息存储时对磁盘的随机 I/O,我们一般会将消息先写入到操作系统的 Page Cache 中,然后再找合适的时机刷新到磁盘上。比如,Kafka 可以配置当达到某一时间间隔,或者累积一定的消息数量的时候再刷盘,也就是所说的异步刷盘。

不过,如果发生机器掉电或者机器异常重启,那么PageCache中还没有来得及刷盘的消息就会丢失了。那么怎么解决呢?

如果把刷盘的间隔设置很短,或者设置累积一条消息就就刷盘,这样频繁刷盘会对性能有比较大的影响,而且从经验来看,出现机器宕机或者掉电的几率也不高,所以不建议你这样做。

如果系统对消息丢失的容忍度很低,那么可以考虑以集群方式部署MQ服务,通过部署多个副本备份数据,保证消息尽量不丢失。

所以如果需要确保消息一条都不能丢失,那么建议不要开启消息队列的同步刷盘,而是需要使用集群的方式来解决,可以配置当所有同步中的从节点都接收到消息才返回成功。如果对消息的丢失有一定的容忍度,也建议配置只发送给一个Follower成功存储就可以返回成功了。

当然实际业务系统对于消息的丢失有一定的容忍度,比如说以上面的发红包为例,如果红包消息丢失了,我们可以兜底给一直没有收到红包的顾客补发红包。

在消费的过程中存在消息丢失的可能

还是以 Kafka 为例来说明。一个消费者消费消息的进度是记录在消息队列集群中的,而消费的过程分为三步:接收消息、处理消息、更新消费进度。

其中接收消息和处理消息的过程都可能会发生异常或者失败,比如说,消息接收时网络发生抖动,导致消息并没有被正确的接收到;处理消息时可能发生一些业务的异常导致处理流程未执行完成,这时如果更新消费进度,那么这条失败的消息就永远不会被处理了,也可以认为是丢失了。

所以一定要等到消息接收和处理完成后才能更新消费进度,但是这也会造成消息重复的问题,比方说某一条消息在处理之后,消费者恰好宕机了,那么因为没有更新消费进度,所以当这个消费者重启之后,还会重复地消费这条消息。

主流的解决方案是客户端消费消息改为手动确认模式,在业务处理完成后再确认消息。

如何保证消息只被消费一次

基于上面的分析中,为了避免消息丢失,系统需要付出两方面的代价:一方面是性能的损耗;一方面可能造成消息重复消费。

性能的损耗一般可以接受,因为大部分业务系统只有在写请求时才会有发送消息队列的操作,而一般系统的写请求的量级并不高,但是消息一旦被重复消费,就会造成业务逻辑处理的错误。那么要如何避免消息的重复呢?

想要完全的避免消息重复的发生是很难做到的,因为网络的抖动、机器的宕机和处理的异常都是比较难以避免的,在工业上并没有成熟的方法,可以把要求放宽,只要保证即使消费到了重复的消息,从消费的最终结果来看和只消费一次是等同的就好了,也就是保证在消息的生产和消费的过程是“幂等”的。

什么是幂等: 幂等是一个数学上的概念,它的含义是多次执行同一个操作和执行一次操作,最终得到的结果是相同的。

比较典型的反面例子就是支付。用户购买商品支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了。用户再次点击按钮,假设这里的请求没有做幂等处理,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。而实际期望的是,一笔订单只进行一次扣款。所以一件事儿无论做多少次都和做一次产生的结果是一样的,那么这件事儿就具有幂等性。

消息在生产和消费的过程中都可能会产生重复,所以要做的是在生产过程和消费过程中增加消息幂等性的保证,这样就可以认为从最终结果上来看,消息实际上是只被消费了一次的。

在消息生产过程中,在Kafka和Pulsar中都支持”producer idempotency”的特性,翻译过来就是生产过程的幂等性,这种特性保证消息虽然可能在生产端产生重复,但是最终在消息队列存储时只会存储一份。

它的做法是给每一个生产者一个唯一的ID,并且为生产的每一条消息赋予一个唯一ID,消息队列的服务端会存储<生产者ID,最后一条消息ID> 的映射。当某一个生产者产生新的消息时,消息队列服务端会比对消息ID是否与存储的最后一条ID一致,如果一致,就认为是重复的消息,服务端会自动丢弃。

而在消费端,幂等性的保证会稍微复杂一些,你可以从通用层和业务层两个层面来考虑。在通用层面,你可以在消息被生产的时候,给它生成一个全局唯一的消息ID, 并将消息存储在数据库中,消费者在消费每条消息前,先从数据库里面查询这个全局ID是否被消费过,如果被消费过就放弃消费。

你可以看到,无论是生产端的幂等性保证方式,还是消费端通用的幂等性保证方式,它们的共同特点都是为每一个消息生成一个唯一的 ID,然后在使用这个消息的时候,先比对这个ID 是否已经存在,如果存在,则认为消息已经被使用过。所以这种方式是一种标准的实现幂等的方式。

小结

本文主要介绍了在消息队列中,消息可能会发生丢失的场景和应对方法,以及在消息重复的场景下,要如何保证,尽量不影响消息最终的处理结果。

重点是:虽然很多应对消息丢失的方法,但并不是说消息丢失一定不能被接受,毕竟在允许消息丢失的情况下,消息队列的性能更好,方案实现的复杂度也最低。

比如像是日志处理的场景,日志存在的意义在于排查系统的问题,而系统出现问题的几率不高,偶发的丢失几条日志是可以接受的。

所以方案设计看场景,这是一切设计的原则,不能把所有的消息队列都配置成防止消息丢失的方式,也不能要求所有的业务处理逻辑都要支持幂等性,这样会给开发和运维带来额外的负担。

相关文章
|
7月前
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
111 0
|
4月前
|
消息中间件 运维 Java
【揭秘RabbitMQ背后的秘密!】如何确保消息正确发送及消费?深入剖析与实战指南!
【8月更文挑战第24天】本文通过一个电商平台订单确认消息的案例,深入探讨了如何确保消息准确无误地发送到 RabbitMQ 以及如何保证消息被正确处理。为确保消息成功发送,文中介绍了使用发布确认、设置重试机制及事务处理等策略;并通过 Java 代码示例展示了如何实施这些策略。此外,还讨论了确保消息正确消费的方法,包括使用确认机制、设置超时及异常处理等,并提供了相应的 Java 示例代码。这些技术和策略有助于提升系统的稳定性和可靠性,对日常运维和性能优化具有重要意义。
68 1
|
7月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
108 0
|
4月前
|
消息中间件 缓存 Java
被怼了:acks=all消息也会丢失?
被虐了:acks=all消息也会丢失?
48 4
|
5月前
|
消息中间件 存储 Kafka
消息中间件MetaQ中的ConsumerGroup是指什么
消息中间件MetaQ中的ConsumerGroup是指什么
|
6月前
|
消息中间件 缓存 Java
避免消息积压的终极指南:四个关键技巧
本文作者小米分享了避免消息积压的四个策略:1) 提高消费并行度,可通过增加消费者实例和利用分区机制;2) 批量消费,利用消息中间件的批量API或自定义批量处理逻辑;3) 减少组件IO交互次数,如使用本地缓存和合并IO操作;4) 优先级消费,设置消息优先级并使用优先级队列。通过这些方法,可以优化消息处理效率,防止消息积压,确保关键业务的顺利进行。
94 5
|
存储 XML Java
何为消息持久化?
持久化(Persistence),即把数据(如内存中的对象)保存到可永久保存的存储设备中(如磁盘)。持久化的主要应用是将内存中的对象存储在关系型的数据库中,当然也可以存储在磁盘文件中、XML数据文件中等等。
125 0
|
消息中间件 网络协议 Java
到底什么才是面向消息的分布式架构呢?看完之后我终于明白了
在SOA或者微服务架构中,普遍会采用HTTP作为通信协议。HTTP具有平台无关性、语言中立性等特点,在分布式系统中被广泛应用。特别是微服务架构的流行,遵循一致的REST风格的HTTP,更能在各个微服务之间实现低沟通成本的通信。
|
消息中间件 监控
面试常考的问题:如何保证消息不丢失
使用MQ的目的:对系统进行解耦,流量控制(高可用和高性能问题)
137 0
|
消息中间件 存储 缓存
不看损失大了,刨根问底,Kafka消息中间件到底会不会丢消息
不看损失大了,刨根问底,Kafka消息中间件到底会不会丢消息
642 11
不看损失大了,刨根问底,Kafka消息中间件到底会不会丢消息