RocketMQ 顺序消息解析——图解、源码级解析

简介: RocketMQ 顺序消息解析——图解、源码级解析

# 顺序消息

说到<font color = "#33aaff">顺序</font>,我们经常会将它和现实里的时间关联起来,即按照时间顺序表示事件的先后关系。比如发生在10:00的消息A就要早于发生在11:00的消息B。


上面例子之所以成立的原因是他们有相同的参考系,倘若A的时间是北京时间,而B的时间是纽约时间,这个先后顺序就不一定成立了。


<br/>


当然除了时间以外,A和B之间的因果关系也可以断定他们的顺序,例如退款一定发生于付款之后。


综上所述,我们所讲的<font color = "#33aaff">顺序</font>,实际上的意思是:

1. 有统一的时间参考下,事件发生的先后关系

2. 没有统一的时间参考下的happen-before关系


<br/>


## 分布式环境下的顺序

设想一个分布式的环境:


1. 同一线程上的事件是有明确顺序关系的,发生的先后顺序就是

2. 不同线程的事件只能通过因果关系去推断


![在这里插入图片描述](https://ucc.alicdn.com/images/user-upload-01/56b3620371ac4b248595a60e7d0ef5c6.png#pic_center)

例如针对上图的两个线程A和B,进程A中的事件有明显的先后顺序(A1 -> A2 -> A3 -> A4),又因为A1给B2发了消息,所以A1一定在B2之前……


<br/>


## 消息中间件中的顺序消息

RocketMQ支持顺序消息的功能,既有<font color="#ffaa33">顺序发送</font>又有<font color="#ffaa33">顺序消费</font>。


而顺序消息又包含了两种类型:

- **分区顺序**:一个Partition内所有的消息按照先进先出的顺序进行发布和消费

- **全局顺序**:一个Topic内所有的消息按照先进先出的顺序进行发布和消费


对于顺序消费,需要明确哪些来自同一个发送线程的消息在消费时是按照相同的发送顺序来进行消费的。


<br/>


**在MQ里,顺序在不同的阶段里都需要得到保障:**

1. 发送消息是保证顺序


在同一个线程内应该采取同步的方式发送;


2. 消息按照顺序存储


按照A、B顺序发送的消息,在空间上A也要保存在B之前;


3. 按照存储的顺序消费消息


消息A、B到达后,Consumer先消费A后消费B


![在这里插入图片描述](https://ucc.alicdn.com/images/user-upload-01/10b6929d82124ec8b7c08c453492ad70.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCP546L5pu-5piv5bCR5bm0,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center)

如上图所示,假设有两个订单A、B,消息的原始数据为a1、b1、b2、a2、a3、b3


- **发送阶段:** A订单的消息要保证a1、a2、a3的顺序,B订单的消息要保证b1、b2、b3的顺序,但是两个订单之间的消息没有先后顺序要求,所以可以由两个线程分别发送


- **存储阶段:**  A订单的消息要保证a1、a2、a3的顺序,B订单的消息要保证b1、b2、b3的顺序,但是两个订单之间的消息没有先后顺序要求


- **消费阶段:** 可以由一个线程按照接收到的顺序进行消费,也可以用两个线程分别消费订单A和订单B的数据


<br/><br/>


# RocketMQ中顺序消息的实现

在RocketMQ里顺序消息的实现如下图所示:

![在这里插入图片描述](https://ucc.alicdn.com/images/user-upload-01/f389280a784e44989a2ab7f11576bd25.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCP546L5pu-5piv5bCR5bm0,size_19,color_FFFFFF,t_70,g_se,x_16#pic_center)

假设创建订单,订单付款,完成订单是三个必须的顺序消息,通过他们相同的订单ID将其路由到不同的分区中,Consumer消费时一个分区只对应一个线程来消费,从而保障消息的顺序性。


<br/><br/>

# Producer顺序发送

Producer要确保消息有序性唯一要做的就是将消息路由到特定的分区,在RocketMQ中,通过`MessageQueueSelector`来实现分区的选择。


```java

public interface MessageQueueSelector {

   MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);

}

```


1. `List<MessageQueue> mqs`:消息要发送到的Topic下的所有队列

2. `Message msg`:消息对象

3. `Object arg`:用户自定义的参数


例如下面的代码就可以将相同订单ID的消息路由到相同的分区:


```java

long orderId = order.getOrderId;

return mqs.get(orderId % mqs.size());

```

**完整的示例Demo如下:**

```java

public class Main {

   public static void main(String[] args) throws Exception {

       DefaultMQProducer producer = new DefaultMQProducer("Group A");

       producer.setNamesrvAddr("localhost");

       producer.start();


       for (int i = 0; i < 100; i++) {

           // 流水号

           int orderId = i % 10;

           // 构造消息对象

           Message msg = new Message("Topic A", "TagA", ("Hello RocketMQ " + i).getBytes(StandardCharsets.UTF_8));

           // 发送消息,将相同订单id的消息路由到同一个MQ里

           SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

               @Override

               public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

                   Integer id = (Integer) arg;

                   int index = id % mqs.size();

                   return mqs.get(index);

               }

           }, orderId);

           System.out.println(sendResult);

       }

       producer.shutdown();

   }

}

```

<br/><br/>

# Consumer顺序消费

RocketMQ消费消息有两种形式:拉模式和推模式,分别对应`MQPullConsumer`和`MQPushConsumer`。


- `MQPullConsumer`由用户线程控制,主动从服务端获取MQ中的一条消息,所以拿到的消息也是天然有顺序的,Consumer在消费时也要保证自己的消费顺序

![在这里插入图片描述](https://ucc.alicdn.com/images/user-upload-01/26523021c33d4fd98f39becfde62bd20.png#pic_center)


- `MQPushConsumer`由消息中间件主动推送消息给Consumer,由用户注册`MessageListener`来消费消息


<br/>


**以`MQPullConsumer`为例,保证消息顺序的流程如下:**

1. `PullMessageService`以单线程从Broker中拿消息

2. 拿到消息后将其放入`ProcessQueue`中(可以看做是消息的缓存)

3. `ConsumeMessageService`以多线程的形式尝试获取锁,拿到锁之后再从`ProcessQueue`中获取消息


**示例代码如下:**


```java

public class Consumer {


   public static void main(String[] args) throws MQClientException {

       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group A");

       consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

       consumer.subscribe("Topic A", "Tag A");

       consumer.setNamesrvAddr("localhost");

     

       consumer.registerMessageListener(new MessageListenerOrderly() {

           @Override

           public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {

               System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);

               return ConsumeOrderlyStatus.SUCCESS;

           }

       });

     

       consumer.start();

       System.out.println("Consumer Started.");

   }

}

```


**如何保证消费消息的顺序?**

1. 消息添加至`ProcessQueue`的过程是单线程执行的,所以`ProcessQueue`中的消息也是顺序的

2. 消费时是从`ProcessQueue`中读取消息进行消费,并且使用锁进行了并发控制,所以也是有序的



# 顺序消息带来的缺陷

聊完了什么是顺序消息以及他们的实现方式,接下来就看一看为了保证消息的一致,系统所引入的一些缺陷。


1. 当出现热点数据时,可能某些`MessageQueue`的数据量会很大

2. 发送顺序消息不能使用分布式系统的容错性,因为针对同一条数据只能被发送到某一个`MessageQueue`里

3. 某一串顺序消息里,即使有某条消息消费失败,也不能跳过



**一些尝试性的解决方案:**

目前对于热点数据没有什么好的解决方法,只能通过优化路由策略或拆分`MessageQueue`来将消息尽可能均匀地发送给不同的`MessageQueue`。


对于同一个`MessageQueue`,也可以有其副本,这些`MessageQueue`之间有自己的路由规则。


对于消费失败的消息,可以提供重试机制来重新消费这条消息,前提是要满足系统的幂等性。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
10月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
267 2
|
6月前
|
算法 测试技术 C语言
深入理解HTTP/2:nghttp2库源码解析及客户端实现示例
通过解析nghttp2库的源码和实现一个简单的HTTP/2客户端示例,本文详细介绍了HTTP/2的关键特性和nghttp2的核心实现。了解这些内容可以帮助开发者更好地理解HTTP/2协议,提高Web应用的性能和用户体验。对于实际开发中的应用,可以根据需要进一步优化和扩展代码,以满足具体需求。
637 29
|
6月前
|
前端开发 数据安全/隐私保护 CDN
二次元聚合短视频解析去水印系统源码
二次元聚合短视频解析去水印系统源码
184 4
|
6月前
|
JavaScript 算法 前端开发
JS数组操作方法全景图,全网最全构建完整知识网络!js数组操作方法全集(实现筛选转换、随机排序洗牌算法、复杂数据处理统计等情景详解,附大量源码和易错点解析)
这些方法提供了对数组的全面操作,包括搜索、遍历、转换和聚合等。通过分为原地操作方法、非原地操作方法和其他方法便于您理解和记忆,并熟悉他们各自的使用方法与使用范围。详细的案例与进阶使用,方便您理解数组操作的底层原理。链式调用的几个案例,让您玩转数组操作。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
6月前
|
移动开发 前端开发 JavaScript
从入门到精通:H5游戏源码开发技术全解析与未来趋势洞察
H5游戏凭借其跨平台、易传播和开发成本低的优势,近年来发展迅猛。接下来,让我们深入了解 H5 游戏源码开发的技术教程以及未来的发展趋势。
|
6月前
|
存储 前端开发 JavaScript
在线教育网课系统源码开发指南:功能设计与技术实现深度解析
在线教育网课系统是近年来发展迅猛的教育形式的核心载体,具备用户管理、课程管理、教学互动、学习评估等功能。本文从功能和技术两方面解析其源码开发,涵盖前端(HTML5、CSS3、JavaScript等)、后端(Java、Python等)、流媒体及云计算技术,并强调安全性、稳定性和用户体验的重要性。
|
6月前
|
负载均衡 JavaScript 前端开发
分片上传技术全解析:原理、优势与应用(含简单实现源码)
分片上传通过将大文件分割成多个小的片段或块,然后并行或顺序地上传这些片段,从而提高上传效率和可靠性,特别适用于大文件的上传场景,尤其是在网络环境不佳时,分片上传能有效提高上传体验。 博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
9月前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
9月前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
9月前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析

推荐镜像

更多
  • DNS