开发者社区> 蘑菇街隐修> 正文

聊一聊顺序消息(RocketMQ顺序消息的实现机制)

简介: 当我们说顺序时,我们在说什么? 日常思维中,顺序大部分情况会和时间关联起来,即时间的先后表示事件的顺序关系。 比如事件A发生在下午3点一刻,而事件B发生在下午4点,那么我们认为事件A发生在事件B之前,他们的顺序关系为先A后B。
+关注继续查看

当我们说顺序时,我们在说什么?

日常思维中,顺序大部分情况会和时间关联起来,即时间的先后表示事件的顺序关系。

比如事件A发生在下午3点一刻,而事件B发生在下午4点,那么我们认为事件A发生在事件B之前,他们的顺序关系为先A后B。

上面的例子之所以成立是因为他们有相同的参考系,即他们的时间是对应的同一个物理时钟的时间。如果A发生的时间是北京时间,而B依赖的时间是东京时间,那么先A后B的顺序关系还成立吗?

如果没有一个绝对的时间参考,那么A和B之间还有顺序吗,或者说怎么断定A和B的顺序?

显而易见的,如果A、B两个事件之间如果是有因果关系的,那么A一定发生在B之前(前因后果,有因才有果)。相反,在没有一个绝对的时间的参考的情况下,若A、B之间没有因果关系,那么A、B之间就没有顺序关系。

那么,我们在说顺序时,其实说的是:

  • 有绝对时间参考的情况下,事件的发生时间的关系;

  • 和没有时间参考下的,一种由因果关系推断出来的happening before的关系;

在分布式环境中讨论顺序

当把顺序放到分布式环境(多线程、多进程都可以认为是一个分布式的环境)中去讨论时:

  • 同一线程上的事件顺序是确定的,可以认为他们有相同的时间作为参考

  • 不同线程间的顺序只能通过因果关系去推断

 

(点表示事件,波浪线箭头表示事件间的消息)

上图中,进程P中的事件顺序为p1->p2->p3->p4(时间推断)。而因为p1给进程Q的q2发了消息,那么p1一定在q2之前(因果推断)。但是无法确定p1和q1之间的顺序关系。

推荐阅读《Time, Clocks, and the Ordering of Events in a Distributed System》,会透彻的分析分布式系统中的顺序问题。

消息中间件中的顺序消息

什么是顺序消息

有了上述的基础之后,我们回到本篇文章的主题中,聊一聊消息中间件中的顺序消息。

顺序消息(FIFO 消息)是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型。顺序消息由两个部分组成:顺序发布和顺序消费。

顺序消息包含两种类型:

分区顺序:一个Partition内所有的消息按照先进先出的顺序进行发布和消费

全局顺序:一个Topic内所有的消息按照先进先出的顺序进行发布和消费

这是阿里云上对顺序消息的定义,把顺序消息拆分成了顺序发布和顺序消费。那么多线程中发送消息算不算顺序发布?

如上一部分介绍的,多线程中若没有因果关系则没有顺序。那么用户在多线程中去发消息就意味着用户不关心那些在不同线程中被发送的消息的顺序。即多线程发送的消息,不同线程间的消息不是顺序发布的,同一线程的消息是顺序发布的。这是需要用户自己去保障的。

而对于顺序消费,则需要保证哪些来自同一个发送线程的消息在消费时是按照相同的顺序被处理的(为什么不说他们应该在一个线程中被消费呢?)。

全局顺序其实是分区顺序的一个特例,即使Topic只有一个分区(以下不在讨论全局顺序,因为全局顺序将面临性能的问题,而且绝大多数场景都不需要全局顺序)。

如何保证顺序

在MQ的模型中,顺序需要由3个阶段去保障:

  1. 消息被发送时保持顺序

  2. 消息被存储时保持和发送的顺序一致

  3. 消息被消费时保持和存储的顺序一致

发送时保持顺序意味着对于有顺序要求的消息,用户应该在同一个线程中采用同步的方式发送。存储保持和发送的顺序一致则要求在同一线程中被发送出来的消息A和B,存储时在空间上A一定在B之前。而消费保持和存储一致则要求消息A、B到达Consumer之后必须按照先A后B的顺序被处理。

如下图所示:

对于两个订单的消息的原始数据:a1、b1、b2、a2、a3、b3(绝对时间下发生的顺序):

  • 在发送时,a订单的消息需要保持a1、a2、a3的顺序,b订单的消息也相同,但是a、b订单之间的消息没有顺序关系,这意味着a、b订单的消息可以在不同的线程中被发送出去

  • 在存储时,需要分别保证a、b订单的消息的顺序,但是a、b订单之间的消息的顺序可以不保证

    • a1、b1、b2、a2、a3、b3是可以接受的

    • a1、a2、b1、b2、a3、b3也是可以接受的

    • a1、a3、b1、b2、a2、b3是不能接受的

  • 消费时保证顺序的简单方式就是“什么都不做”,不对收到的消息的顺序进行调整,即只要一个分区的消息只由一个线程处理即可;当然,如果a、b在一个分区中,在收到消息后也可以将他们拆分到不同线程中处理,不过要权衡一下收益

开源RocketMQ中顺序的实现

上图是RocketMQ顺序消息原理的介绍,将不同订单的消息路由到不同的分区中。文档只是给出了Producer顺序的处理,Consumer消费时通过一个分区只能有一个线程消费的方式来保证消息顺序,具体实现如下。

Producer端

Producer端确保消息顺序唯一要做的事情就是将消息路由到特定的分区,在RocketMQ中,通过MessageQueueSelector来实现分区的选择。

  • List<MessageQueue> mqs:消息要发送的Topic下所有的分区

  • Message msg:消息对象

  • 额外的参数:用户可以传递自己的参数

比如如下实现就可以保证相同的订单的消息被路由到相同的分区:

long orderId = ((Order) object).getOrderId;
return mqs.get(orderId % mqs.size());

Consumer端

RocketMQ消费端有两种类型:MQPullConsumer和MQPushConsumer。

MQPullConsumer由用户控制线程,主动从服务端获取消息,每次获取到的是一个MessageQueue中的消息。PullResult中的List msgFoundList自然和存储顺序一致,用户需要再拿到这批消息后自己保证消费的顺序。

对于PushConsumer,由用户注册MessageListener来消费消息,在客户端中需要保证调用MessageListener时消息的顺序性。RocketMQ中的实现如下:

  1. PullMessageService单线程的从Broker获取消息

  2. PullMessageService将消息添加到ProcessQueue中(ProcessMessage是一个消息的缓存),之后提交一个消费任务到ConsumeMessageOrderService

  3. ConsumeMessageOrderService多线程执行,每个线程在消费消息时需要拿到MessageQueue的锁

  4. 拿到锁之后从ProcessQueue中获取消息

保证消费顺序的核心思想是:

  • 获取到消息后添加到ProcessQueue中,单线程执行,所以ProcessQueue中的消息是顺序的

  • 提交的消费任务时提交的是“对某个MQ进行一次消费”,这次消费请求是从ProcessQueue中获取消息消费,所以也是顺序的(无论哪个线程获取到锁,都是按照ProcessQueue中消息的顺序进行消费)

顺序和异常的关系

顺序消息需要Producer和Consumer都保证顺序。Producer需要保证消息被路由到正确的分区,消息需要保证每个分区的数据只有一个线程消息,那么就会有一些缺陷:

  • 发送顺序消息无法利用集群的Failover特性,因为不能更换MessageQueue进行重试

  • 因为发送的路由策略导致的热点问题,可能某一些MessageQueue的数据量特别大

  • 消费的并行读依赖于分区数量

  • 消费失败时无法跳过

不能更换MessageQueue重试就需要MessageQueue有自己的副本,通过Raft、Paxos之类的算法保证有可用的副本,或者通过其他高可用的存储设备来存储MessageQueue。

热点问题好像没有什么好的解决办法,只能通过拆分MessageQueue和优化路由方法来尽量均衡的将消息分配到不同的MessageQueue。

消费并行度理论上不会有太大问题,因为MessageQueue的数量可以调整。

消费失败的无法跳过是不可避免的,因为跳过可能导致后续的数据处理都是错误的。不过可以提供一些策略,由用户根据错误类型来决定是否跳过,并且提供重试队列之类的功能,在跳过之后用户可以在“其他”地方重新消费到这条消息。

如果本文对您有帮助,点一下右下角的“推荐”

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
消息队列RocketMQ应对双十一流量洪峰的“六大武器”
消息队列 RocketMQ 是如何帮助各企业交易系统扛住瞬间千万级 TPS、万亿级流量洪峰的冲击,并保持各个应用之间的消息通畅的呢?下文将为您介绍消息队列 RocketMQ 应对双十一流量洪峰的“六大武器”。
420 0
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,阿里云优惠总结大概有三种登录方式: 登录到ECS云服务器控制台 在ECS云服务器控制台用户可以更改密码、更换系.
24973 0
DNS 查看域名解析顺序
/etc/nsswitch.conf 决定是先使用/etc/hosts 还是 /etc/resolv.conf
1115 0
RocketMQ 消息发送system busy、broker busy原因分析与解决方案
1、现象 最近收到很多RocketMQ使用者,反馈生产环境中在消息发送过程中偶尔会出现如下4个错误信息之一:1)[REJECTREQUEST]system busy, start flow control for a while2)too many requests and system thre.
1730 0
云栖发布|阿里云消息队列 RocketMQ 5.0:消息、事件、流融合处理平台
RocketMQ5.0 的发布标志着阿里云消息正式从消息领域正式迈向了“消息、事件、流”场景大融合的新局面。
440 0
RocketMq消息队列使用
最近在看消息队列框架 ,alibaba的RocketMQ单机支持1万以上的持久化队列,支持诸多特性, 目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景 比kafka还是有过之无不及,其实kafka文档很丰富 但RocketMQ网上的...
1597 0
阿里云服务器端口号设置
阿里云服务器初级使用者可能面临的问题之一. 使用tomcat或者其他服务器软件设置端口号后,比如 一些不是默认的, mysql的 3306, mssql的1433,有时候打不开网页, 原因是没有在ecs安全组去设置这个端口号. 解决: 点击ecs下网络和安全下的安全组 在弹出的安全组中,如果没有就新建安全组,然后点击配置规则 最后如上图点击添加...或快速创建.   have fun!  将编程看作是一门艺术,而不单单是个技术。
18465 0
阿里云服务器怎么设置密码?怎么停机?怎么重启服务器?
如果在创建实例时没有设置密码,或者密码丢失,您可以在控制台上重新设置实例的登录密码。本文仅描述如何在 ECS 管理控制台上修改实例登录密码。
20196 0
+关注
蘑菇街隐修
专注分布式消息中间件和存储系统
68
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
JS零基础入门教程(上册)
立即下载
性能优化方法论
立即下载
手把手学习日志服务SLS,云启实验室实战指南
立即下载