mq如何保证消息顺序性

简介: mq如何保证消息顺序性

背景

面试的时候,经常会有面试官问道这个问题,发送顺序消息。

讨论

顺序性其实有两方面,一方面要保证Producer发送时是有序的,Consumer接受和处理消息的有序性。

另一面来说,我们也要考虑是需要全局有序还是局部有序就可以。

kafka

kafka的topic是分Partition的,当有多个Partition的时候,消息可能会按照/或者不按照规则被发送到不同的Partition。

Kafka 中的消费是基于拉模式的,即消费者主动向服务端发起请求来拉取消息。 Kakfa 中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用 poll () 方法,而 poll () 方法返回的是所订阅主题(或分区)上的一组消息。

全局有序

所以如果想做到全局有序的话,我们可以只设置一个Partition,这样保证发送的有序性,同时,Consumer端也应该保证接收有序并且不要用多线程,单线程处理。

局部有序

刚才说到了一个topic可以有多个Partition,kafka确保每个Partition只能同一个group中的同一个Consumer消费,所以就Consumer来说,可以保证一个Partition的消息顺序消费,然后kafka的Producer端可以根据要发送消息内容,指定Partition Key,Kafka对其进行Hash计算,根据计算结果决定放入哪个Partition。这样Partition Key相同的消息会放在同一个Partition。此时,Partition的数量仍然可以设置多个,提升Topic的整体吞吐量。

消息重试对顺序消息的影响

对于一个有着先后顺序的消息A、B,正常情况下应该是A先发送完成后再发送B,但是在异常情况下,在A发送失败的情况下,B发送成功,而A由于重试机制在B发送完成之后重试发送成功了。这时对于本身顺序为AB的消息顺序变成了BA。

针对这种问题,严格的顺序消费还需要max.in.flight.requests.per.connection参数的支持。

该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,同时也会提升吞吐量。把它设为1就可以保证消息是按照发送的顺序写入服务器的。

此外,对于某些业务场景,设置max.in.flight.requests.per.connection=1会严重降低吞吐量,如果放弃使用这种同步重试机制,则可以考虑在消费端增加失败标记的记录,然后用定时任务轮询去重试这些失败的消息并做好监控报警。

rocketmq

rocketmq的消息模式其实和kafka大差不差,rocketmq的Topic分Queue,当有多个Queue的时候,消息可能会按照/或者不按照规则被发送到不同的Queue。一个Queue最多只能分配给一个Consumer,一个Cosumer可以分配得到多个Queue。

Kafka 中的消费是基于拉模式的,即消费者主动向服务端发起请求来拉取消息。 Kakfa 中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用 poll () 方法,而 poll () 方法返回的是所订阅主题(或分区)上的一组消息。

全局有序

同上,一个Topic只设置一个Queue,Producer保证发送有序。Consumer保证接收有序并且不要用多线程,单线程处理。或者使用可以保证处理顺序的线程模型。

局部有序

rockemt的Consumer消费消息有两种形式:Push和Pull,大多数场景使用的是Push模式,在源码中这两种模式分别对应的是DefaultMQPushConsumer类和DefaultMQPullConsumer类。Push模式实际上在内部还是使用的Pull方式实现的,通过Pull不断地轮询Broker获取消息,当不存在新消息时,Broker端会挂起Pull请求,直到有新消息产生才取消挂起,返回新消息。

那我们就可以通过Producer发送有序消息,通过MessageQueueSelector()来指定消息发送到哪个队列,保证局部有序,消费方启动顺序消费,通过new MessageListenerOrderly()来控制。

消息重试对顺序消息的影响

对于Rocketmq来说,一个Consumer只能注册一个Listener,所以一个consumer要么按顺序消息的方式来消费,要么按普通消息的方式来消费。所以,如果一个用户进程要收两种消息,最好使用两个Consumer实例。

从实例代码可以看出,顺序消息处理消息后返回状态跟普通消息也有所不同,失败的话是返回的SUSPEND_CURRENT_QUEUE_A_MOMENT,而不是RECONSUME_LATER。这是因为对于顺序消息,消费失败是不会返回给broker重新投递的(其实即使重发也还是发到这个consumer上,没必要多此一举),而是会放到本地的缓存队列中重新处理。另外两个状态ROLLBACKCOMMIT已经被设置成deprecated了,我们就不关心了。

参考

一文理解Kafka如何保证消息顺序性

RocketMQ源码解析(十二)-顺序消息

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 存储 canal
3分钟白话RocketMQ系列—— 如何保证消息不丢失
3分钟白话RocketMQ系列—— 如何保证消息不丢失
3802 1
|
6月前
|
消息中间件 存储 运维
|
5月前
|
消息中间件 SQL 数据处理
实时计算 Flink版产品使用问题之sink多个并行度写入rabbit mq会导致顺序性问题吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 存储 负载均衡
消息队列 MQ产品使用合集之POP消费模式是否可以保证消息顺序性
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
消息中间件 存储 Kafka
如何保证MQ中消息的可靠性传输?
如何保证MQ中消息的可靠性传输?
107 1
|
6月前
|
消息中间件 存储 缓存
【面试问题】MQ 如何保证消息的顺序性?
【1月更文挑战第27天】【面试问题】MQ 如何保证消息的顺序性?
|
6月前
|
消息中间件 存储 程序员
RabbitMQ消息丢失的场景,如何保证消息不丢失?(详细讲解,一文看懂)
RabbitMQ消息丢失的场景,如何保证消息不丢失?(详细讲解,一文看懂)
325 0
RabbitMQ消息丢失的场景,如何保证消息不丢失?(详细讲解,一文看懂)
|
消息中间件 存储 canal
3分钟白话RocketMQ系列—— 如何保证消息顺序性
3分钟白话RocketMQ系列—— 如何保证消息顺序性
1661 1
3分钟白话RocketMQ系列—— 如何保证消息顺序性
|
消息中间件 关系型数据库 MySQL
如何保证MQ中消息的顺序性?
如何保证MQ中消息的顺序性?
96 1
|
21天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
61 6