消息队列浅析

简介: 消息队列在大型分布式应用中非常常见,目前还停留在能熟练使用基础功能的阶段,对其高级功能以及背后的原理了解甚少,比如事务消息、有序消息等。学习之前需要先带着几个问题,为什么会诞生消息队列?消息队列的原理是什么?使用消息队列需要注意什么?

背景

消息队列在大型分布式应用中非常常见,目前还停留在能熟练使用基础功能的阶段,对其高级功能以及背后的原理了解甚少,比如事务消息、有序消息等。

学习之前需要先带着几个问题,为什么会诞生消息队列?消息队列的原理是什么?使用消息队列需要注意什么?

假设我们自己开发了一套“极其简陋”的电商系统,包含浏览商品、下单、付款三个功能,系统涉及的服务有商品服务、订单服务、扣款服务。

我们来举例一下用户常用的操作流程:

下单:

1.用户看到自己喜欢的商品,点击提交订单,请求到订单服务,系统生成订单记录,核心数据为:状态“待支付”、商品名称、商品数量、过期时间等

2.生成订单后请求到商品服务,对应商品数量-1(并非真的-1,毕竟用户没有付款)

支付:

1.用户确定没问题,发起支付请求,请求到订单服务,状态改为“正在支付”

2.请求到扣款服务,系统通过支付系统对用户银行卡发卡扣款

3.扣款成功,回调订单服务更新订单状态为“支付成功”

4.支付成功后商品服务将商品数量真正-1

在了解了常见场景后我们来看当前系统存在什么问题?这里的例子只是假设,并非真实电商场景,只看部分问题即可。

  1. 容错率低。一次请求涉及多个系统,一个系统故障整个操作失败
  2. RT高实效性低。一次请求涉及多个系统,总RT = 各个系统RT之和
  3. 吞吐量低。服务器连接资源有限,每个同步调用都要占用一个连接,根据木桶效应系统能力取决于最低的系统的表现,大促等活动场景下流量是正常时候的几倍甚至几十倍,这种架构根本无法支持

抽象一下问题就是,同步调用、耦合严重、流量洪峰。这也就是消息队列要解决的三个问题。

  1. 异步,阶段性处理请求,处理完立刻返回
  2. 解耦,系统间不再强耦合,没有了木桶效应,能跑多快全凭系统自身
  3. 削峰,通过堆积系统处理不了的请求来达到消除流量高峰问题

通过消息队列改造后,整体架构如下图:

下单:

  1. 用户提交订单,消息分发器产生“下单”消息
  2. 商品服务、订单服务分别订阅该主题,自行消费处理

支付:

  1. 用户发起支付请求,消息分发器产生“支付”消息
  2. 订单服务、扣款服务分别订阅,自行处理消息
  3. 扣款服务处理成功,产生“支付成功”消息
  4. 商品服务、订单服务分别订阅,自行处理消息

可以看到通过消息队列改造后,各个系统间不再同步调用强耦合,系统不再直接对接用户请求,而是消费消息队列里堆积的请求。

消息队列也是一把双刃剑,感觉通过消息队列改造后整个系统的性能得到了质的提升,低延迟、高并发,看似很完美,但是也存在许多问题。

  1. 系统的可用性所有降低,这样的架构必须要保证消息队列不出问题
  2. 系统复杂性提高,引入消息队列后整个系统的链路变得复杂,系统理解、运维难度都有所提升
  3. 一致性问题,异步处理的场景下订单系统处理成功就成功了,无法保证商品服务不出问题,这要就会出现“多卖”,“少卖”的数据不一致问题

接下来我们一起看看消息队列是如何处理和解决这些问题的。

RocketMQ

简介

阿里内部使用的是 Metaq,外部又称之为 RocketMQ,两者可以说是等价的,RocketMQ github 地址:https://github.com/apache/rocketmq

RocketMQ 是一个队列模型的消息中间件,具备高性能、高可靠、高实时、分布式特点。Producer 轮流向一些队列发送消息,队列集合称为 Topic,Consumer 根据消费模式做集群/广播消息。

  • Producer、Consumer、队列都可以是分布式
  • 可以保证严格的消息顺序
  • 亿级消息堆积能力

架构图

Name Server

用于服务发现,专为 RocketMQ 设计的轻量级名称服务。

Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

Broker

消息中转角色,负责存储消息,转发消息,一般也称为server。

Broker 部署相对复杂,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。每个 Broker 与 Name Server 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 Name Server。

Producer

消息生产者,负责生产消息,一般由业务系统负责产生消息。

Producer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。

Consumer

消息消费者,负责消费消息,一般是由后台应用负责消费。

Consumer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。

消费模式

广播消息

consumer group中的每一个consumer都会消费一次消息,广播消费中的consumer group概念可以认为在消息划分上没有意义。

集群消息

一个consumer group中的consumer平均分摊消费消息。假设topic中有6条消息,consumer group有两个consumer实例(可能是两个进程或者两台机器),那么每个consumer只消费3条消息。

消息类型

普通消息

消息队列RocketMQ版中无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。

定时和延时消息

定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不 可避免的产生巨大性能开销。RocketMQ 支持定时消息,但是不支持任意时间精度,支持特定的 level,例如定时 5s,10s,1m 等。

顺序消息

消费消息的顺序要同发送消息的顺序一致,在 RocketMQ 中,主要指的是局部顺序,即一类消息为满足顺序性,必须 Producer 单线程顺序发送,且发送到同一个队列,这样 Consumer 就可以按照 Producer 发送的顺序去消费消息。

普通顺序消息

顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker 重启,由于队列总数发生变化,哈希取模后定位的队列会变化,产生短暂的消息顺序不一致。如果业务能容忍在集群异常情况(如某个 Broker 宕机或者重启)下,消息短暂的乱序,使用普通顺序方 式比较合适。

严格顺序消息

顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式 Failover 特性,即 Broker 集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。 如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不 可用。(依赖同步双写,主备自动切换,自动切换功能目前还未实现),目前已知的应用只有数据库 binlog 同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。

事务消息

RocketMQ 采用二阶段提交实现事务消息,保证本地事务与消息发送的原子性,达到事务最终一致性状态。

功能模块

消息优先级

支持根据消息优先级高低进行投递。

由于 RocketMQ 所有消息都是持久化的,所以如果按照优先级来排序,开销会非常大,因此 RocketMQ 没有特意支持消息优先级,但是可以通过变通的方式实现类似功能,即单独配置一个优先级高的队列,和一个普通优先级的队列, 将不同优先级发送到不同队列即可。

消息过滤

Broker 端消息过滤

在 Broker 中按照 Consumer 的要求过滤,优点是减少了对于 Consumer 无用消息的网络传输,缺点是增加了 Broker 的负担,实现相对复杂。

RocketMQ 支持按照简单的 Message tag 进行过滤,也支持按照 Message Header, Message Body 过滤。

Demo:

// Don't forget to set enablePropertyFilter=true in broker
consumer.subscribe("SqlFilterTest",
   MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
       "and (a is not null and a between 0 and 3)"));

Consumer 端过滤

这种过滤方式完全由 Consumer 端自己控制,缺点是很多无用的消息需要传送到 Consumer 端。

消息可靠性保证

影响消息可靠性的几种情况:

  1. Broker 正常关闭
  2. Broker 异常 Crash
  3. OS 死机
  4. 机器掉电,但是能立即恢复供电情况。
  5. 机器无法开机(可能是cpu、主板、内存等关键设备损坏)
  6. 磁盘设备损坏。

前四种情况都属于硬件资源可立即恢复情况,RocketMQ 在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。

5 和 6 属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ 在这两种情况下,通过异步复制,可保证 99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点, 同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与 Money 相关的应用。

RocketMQ 从 3.0 版本开始支持同步双写。

消息堆积


消息持久化

RocketMQ 的所有消息都是持久化的,先写入系统 PAGECACHE,然后刷盘,可以保证内存与磁盘都有一份数据, 访问时,直接从内存读取。

Linux 异步刷盘

了解消息持久化前需要先了解 Linux 是如何实现异步刷盘的。因为硬盘和内存的读写性能差距巨大,Linux 默认情况是以异步方式读写文件的,比如调用系统函数 open() 打开或创建文件时缺省情况下是带有 O_ASYNC flag 的。Linux 借助于内核的 page cache 来实现这种异步操作。

异步刷盘

broker 将 producer 发送的消息写入到 Linux page cache 后直接返回,并不等待文件内容写入到硬盘才返回。

  • 由于服务器硬盘读写速度一般高于网卡速度(千兆网卡),所以刷盘速度跟得上消息写入速度,硬盘写操作不会是性能瓶颈。
  • 万一此时系统压力过大,可能堆积消息,除了写入 IO,还有读取 IO,万一出现磁盘读取落后情况,会不会导致系统内存溢出?答案是否定的。
  • 写入消息到 page cache 时,如果内存不足,则尝试丢弃干净的 page,腾出内存供新消息使用,策略是 LRU
  • 如果干净页不足,此时写入 page cache 会被阻塞,系统尝试刷盘部分数据,大约每次尝试32个 page 来找出更多干净 page

同步刷盘

同步刷盘与异步刷盘的唯一区别是,在把消息写入 page cache 后异步刷盘会直接返回,而同步刷盘需要等刷盘完成后才返回。

  • 消息写入 page cache,线程等待,通知刷盘线程刷盘
  • 刷盘线程完成后,通知前端等待线程(可能是一批线程)
  • 前端等待线程向用户返回成功

事务消息

主要流程

这里分两种场景来讲,正常场景下生产者根据本地事务直接返回 commit / rollback,异常情况下返回 unknown(抛异常、服务器重启等)

commit / rollback

流程简述:

  1. Producer 发送 Half 消息到 Broker。topic: RMQ_SYS_TRANS_HALF_TOPIC,Consumer 此时不能拉取到消息,需要等后续 Half 消息状态变更。
  2. Producer 执行本地事务,根据执行结果返回 commit 或 rollback 给 Broker。
  3. Broker 收到 Producer 事务状态,如果是 rollback,则丢弃 Half 消息
  4. 如果是 commit,则将 Half 消息放入对应的 topic 中
  5. Consumer 接收到消息,执行本地事务
  6. Consumer 根据执行结果返回消费状态给 Broker
  7. 如果消费失败,则根据 Consumer 配置决定是否重试

unknown

流程简述:

  1. Producer 发送 Half 消息到 Broker。topic: RMQ_SYS_TRANS_HALF_TOPIC,Consumer 此时不能拉取到消息,需要等后续 Half 消息状态变更。
  2. Producer 执行本地事务,此时遇到未捕获的异常,服务器返回 unknown 状态,或者超过时间限制(默认6s),Broker 也会当作 unknown 状态来处理。
  3. Broker 收到 Producer unknown 的事务状态,不处理。
  4. 针对 unknown 状态的消息,Broker 会定期(30s左右,根据实际情况调整)发送 checkLocalTransaction 到 Producer 集群(相同 producerGroup)的任意一台机器,以此来检查对应的事务状态(默认最多检查10次,直到 commit 或者 rollback,超过次数限制则当作 unknown 来处理。
  5. Broker 根据最终状态来决定是丢弃 Half 消息,还是放入对应的 topic。

测试demo

TransactionProducerTest.java

public class TransactionProducerTest {


   public static void main(String[] args) throws InterruptedException {

       TransactionMQProducer producer = new TransactionMQProducer("CID_PC_TRAN_QUICK_START_PICHENG_TEST");

       ExecutorService executorService = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {

           @Override

           public Thread newThread(@NotNull Runnable r) {

               Thread thread = new Thread(r);

               thread.setName("client-transaction-msg-check-thread");

               thread.setDaemon(false);

               return thread;

           }

       });

       producer.setExecutorService(executorService);

       producer.setTransactionListener(new TransactionCheckListener());

       // 设置重试次数

       producer.setRetryTimesWhenSendFailed(1);

       try {

           producer.start();

           print("-> producer started success");

       } catch (Exception e) {

           print("-> producer started failed");

           e.printStackTrace();

       }


       final String topic = "PC_TRAN_QUICK_START";

       final String tag = "dkangel";

       final String keys = "PC_TRAN_QUICK_START_keys";

       String content = "Rocketmq transaction test";

       Message message = new Message(topic, tag, keys, content.getBytes());

       try {

           TransactionSendResult result = producer.sendMessageInTransaction(message, null);

           print(String.format("-> msgId:%s, content:%s transaction result:%s", result.getMsgId(),

               content, result.getLocalTransactionState()));

           if (result.getLocalTransactionState() != LocalTransactionState.COMMIT_MESSAGE) {

               print(String.format("-> halfMsg sendResult:%s, transaction result:%s", result.getSendStatus(),

                   result.getLocalTransactionState()));

           } else {

               print(String.format("-> halfMsg sendResult:%s, transaction result:%s", result.getSendStatus(),

                   result.getLocalTransactionState()));

           }

           Thread.sleep(100);

       } catch (MQClientException | InterruptedException e) {

           e.printStackTrace();

       }


       for (int i = 0; i < 100000; i++) {

           Thread.sleep(1000);

       }

       print("-> shutdown producer");

       producer.shutdown();

   }


}

TransactionCheckListener.java

public class TransactionCheckListener implements TransactionListener {

   private final AtomicInteger atomicInteger = new AtomicInteger(0);

   private final AtomicLong last = new AtomicLong(System.currentTimeMillis());


   @Override

   public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

       int value = atomicInteger.get();

       if (value == 0) {

           //注意:如果抛出异常,等于设置UNKNOW

           PrintUtils.print(String.format("-> execute transaction exception, index: %d", value));

           throw new RuntimeException("Could not find db");

       } else if ((value % 3) == 0) {

           PrintUtils.print(String.format("-> execute transaction rollback, index: %d", value));

           return LocalTransactionState.ROLLBACK_MESSAGE;

       } else if ((value % 2) == 0) {

           PrintUtils.print(String.format("-> execute transaction commit, index: %d", value));

           return LocalTransactionState.COMMIT_MESSAGE;

       }


       PrintUtils.print(String.format("-> execute transaction unknown, index: %d", value));

       return LocalTransactionState.UNKNOW;

   }


   @Override

   public LocalTransactionState checkLocalTransaction(MessageExt msg) {

       return process();

   }


   private LocalTransactionState process() {

       long time = (System.currentTimeMillis() - last.get()) / 1000;

       if (this.atomicInteger.get() != 0 && this.atomicInteger.get() % 10 == 0) {

           PrintUtils.print(String.format("-> check transaction multi-commit, index: %d, cost(s): %d", atomicInteger.get(), time));

           return LocalTransactionState.COMMIT_MESSAGE;

       }

       this.atomicInteger.getAndIncrement();

       int value = atomicInteger.get();

       last.compareAndSet(last.get(), System.currentTimeMillis());

       if ((value % 11) == 0) {

           //注意:如果抛出异常,等于设置UNKNOW

           PrintUtils.print(String.format("-> check transaction exception, index: %d, cost(s): %d", value, time));

           throw new RuntimeException("Could not find db");

       } else if ((value % 12) == 0) {

           PrintUtils.print(String.format("-> check transaction rollback, index: %d, cost(s): %d", value, time));

           return LocalTransactionState.ROLLBACK_MESSAGE;

       } else if ((value % 10) == 0) {

           PrintUtils.print(String.format("-> check transaction commit, index: %d, cost(s): %d", value, time));

           return LocalTransactionState.COMMIT_MESSAGE;

       }


       PrintUtils.print(String.format("-> check transaction unknown, index: %d, cost(s): %d", value, time));

       return LocalTransactionState.UNKNOW;

   }

}

PrintUtils

public class PrintUtils {

   public static void print(String msg) {

       System.out.println(Thread.currentThread().getId() + "-" + Thread.currentThread().getName() + ": " + msg);

   }

}

结果展示

maxPoolSize = 2

maxPoolSize = 1


疑问:

broker master、slave同步方式?

所有数据单独存储到一个commitLog,顺序写,随机读。按照什么维度区分?topic?

pageCache?顺序跳跃读?

写消息LRU方式?

messageIdId格式?

message key,slot table 槽表格式?

message queue存储结构?

消费方式:pull 长轮询方式?

producer选messageQueue算法?

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue

附录

消息队列选型分析

kafka, rocketMQ, rabbitMQ

专业术语

Topic:消息主题,由用户定义并在mq-ops进行配置。

Tag:消息主题标签,一个topic可以有多个tag。

Producer:消息生产者,负责生产消息,一般由业务系统负责产生消息。

Producer group:表示一类producer,通常用来发送一类消息,发送时可以指定不同的topic。

Consumer:消息消费者,负责消费消息,一般是由后台应用负责消费。

Consumer group:表示一类consumer,这类consumer消费一类消息,消费逻辑一致。

Push consumer:consumer的一种,向consumer对象注册一个listener接口,一旦接受到消息,consumer对象就立刻回调listener接口。(这里和diamond注册listener类似,都是通过回调方法实现消息通知)

Pull consumer:consumer的一种,应用主动调用consumer的拉消息方法从broker拉消息,主动权由应用控制。

Name server:用于服务发现,专为 RocketMQ 设计的轻量级名称服务。

Broker:消息中转角色,负责存储消息,转发消息,一般也称为server。

关键约束

topicMaxLength = 255

com.alibaba.rocketmq.client.Validators#CHARACTER_MAX_LENGTH

defaultMaxMessageSize = 4M

com.alibaba.rocketmq.client.producer.DefaultMQProducer#maxMessageSize=1024 * 1024 * 4; // 4M

涉及技术点

Netty

Channel

NIO

NioEventLoopGroup

JDK

other

transient

Comparable

ThreadLocal

JUC

CountDownLatch

AtomicBoolean

AtomicInteger

ReentrantLock

ConcurrentHashMap

ThreadPoolExecutor

AQS

Linux

page cache

https://qinglinmao8315.github.io/linux/2018/03/14/linux-page-cache.html

参考文献

事务消息:https://xie.infoq.cn/article/53240651a2ef7c173f50a3194

rocketmq前世今生:https://developer.aliyun.com/article/69647

发展史:https://juejin.cn/post/6844904001452900360

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 Kafka Apache
浅谈消息队列
消息队列的应用场景十分广泛,目前不少公司都在使用,主流的消息中间件有ActiveMQ,RabbitMQ,RocketMQ,ZeroMQ,Kafka等,ActiveMQ是最老牌的MQ,它是Apache的开源项目。
88 0
|
消息中间件 存储 容灾
优秀的消息队列
优秀的消息队列
67 1
|
消息中间件 数据库
|
消息中间件 Java 数据库
消息队列(五)
消息队列(五)
169 0
消息队列(五)
|
消息中间件 存储 缓存
消息队列(六)
消息队列(六)
243 0
消息队列(六)
|
消息中间件 存储 网络协议
消息队列(一)
消息队列(一)
183 0
消息队列(一)
|
消息中间件 SQL 关系型数据库
消息队列
消息队列
239 0
|
消息中间件 存储 数据库
|
消息中间件 存储 缓存
常用消息队列对比
常用消息队列对比
|
消息中间件 数据库
什么时候需要消息队列
什么时候需要消息队列
216 0