RocketMQ在项目中的使用总结

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: RocketMQ在项目中如何使用,什么场景下需要用到RocketMQ以及使用RocketMQ如何保证消息的准备性,都需要我们思考?

1、RocketMQ 基础

Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。


  • Topic:消息主题,用于将一类的消息进行归类,比如订单主题,就是所有订单相关的消息都可以由这个主题去承载,生产者向这个主题发送消息。
  • 生产者:负责生产消息并发送消息到 Topic 的角色。
  • 消费者:负责从 Topic 接收并消费消息的角色。
  • 消息:生产者向 Topic 发送的内容,会被消费者消费。
  • 消息属性:生产者发送的时候可以为消息自定义一些业务相关的属性,比如 Message Key 和 Tag 等。
  • Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。


RocketMQ 主要由 Producer、Broker、Consumer、Namesvr 等组件组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息,Namesvr 负责存储元数据,各组件的主要功能如下:

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以 push 推,pull 拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
  • NameServer:NameServer 是一个非常简单的 Topic 路由注册中心,支持分布式集群方式部署,其角色类似 Dubbo 中的 zookeeper,支持 Broker 的动态注册与发现。
  • BrokerServer:Broker 主要负责消息的存储、投递和查询以及服务高可用保证,支持分布式集群方式部署。


2、为什么要使用 RocketMQ?


2.1、异步解耦

随着微服务架构的流行,服务之间的关系梳理非常重要。异步解耦可以降低服务之间的耦合程度,同时也能提高服务的吞吐量。使用异步解耦的业务场景非常多,因为每个行业的业务都会不太一样,以一些比较通用的业务来说明相信大家都能理解。比如电商行业的下单业务场景,以最简单的下单流程来说,下单流程如下:

  1. 锁库存
  2. 创建订单
  3. 用户支付
  4. 扣减库存
  5. 给用户发送购买短信通知
  6. 给用户增加积分
  7. 通知商家发货

我们以下单成功后,用户进行支付,支付完成会有个逻辑叫支付回调,在回调里面需要去做一些业务逻辑。上面的下单流程从 3 到 5 都是可以采用异步流程进行处理,对于用户来说,支付完成后他就不需要关注后面的流程了。后台慢慢处理就行了,这样就能简化三个步骤,提高回调的处理时间。

2.2、削峰填谷

削峰填谷指的是在大流量的冲击下,利用 RocketMQ 可以抗住瞬时的大流量,保护系统的稳定性,提升用户体验。在电商行业,最常见的流量冲击就是秒杀活动了,利用 RocketMQ 来实现一个完整的秒杀业务还是与很多需要做的工作,利用RocketMQ 来扛住高并发,前提是业务场景支持异步处理。

2.3、分布式事务最终一致性

众所周知,分布式事务有 2PC,TCC,最终一致性等方案。其中使用消息队列来做最终一致性方案是比较常用的。在电商的业务场景中,交易相关的核心业务一定要确保数据的一致性。通过引入消息队列 RocketMQ 版的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。


2.4、数据分发

数据分发指的是可以将原始数据分发到多个需要使用这份数据的系统中,实现数据异构的需求。最常见的有将数据分发到 ES, Redis 中为业务提供搜索,缓存等服务。除了手动通过消息机制进行数据分发,还可以订阅 Mysql 的 binlog 来分发,在分发这个场景,需要使用 RocketMQ 的顺序消息来保证数据的一致性。


3、RocketMQ发布订阅大体流程

a、Producer生产者连接Nameserver,产生数据放入不同的Topic;

b、对于RocketMQ,一个Topic可以分布在各个Broker上,我们可以把一个Topic分布在一个Broker上的子集定义为一个Topic分片;

c、将Topic分片再切分为若干等分,其中的一份就是一个Queue。每个Topic分片等分的Queue的数量可以不同,由用户在创建Topic时指定。

d、Consumer消费者连接Nameserver,根据Broker分配的Queue来消费数据。

285763-20190823162946324-1549109758.png

4、RocketMQ 消息类型

RocketMQ 支持丰富的消息类型,可以满足多场景的业务需求。不同的消息有不同的应用场景,常用的四种消息类型。


4.1、普通消息

普通消息是指 RocketMQ 中无特性的消息。当没有特殊的业务场景,使用普通消息就够了。如果有特殊的场景,就可以使用特殊的消息类型,比如顺序,事务等。

同步发送:消息发送方发送出去一条消息,会同步得到服务端返回的结果。

异步发送:消息发送方发出去一条消息,不用等待服务端返回结果,可以接着发送下一条消息。发送方可以通过回调接口接收服务端响应,并处理响应结果。

单向发送:消息发送方只负责发送消息,发送出去后就不管了,这种方式发送速度非常快,存在丢失消息的风险。


4.2、顺序消息

顺序消息是指生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被消费者接收到。比如数据分发的场景,如果我们订阅了 Mysql 的 binlog 来进行数据异构。消息要是没有顺序,就会出现数据错乱问题。比如新增一条 id=1 的数据,然后马上删除。这样就产生了两条消息。正常的消费顺序是先新增,然后删除,此时数据是没有的。如果消息没有顺序,删除的先被消费了,然后消费新增的,此时数据还在,没被删除掉,就会导致不一致。


4.3、定时消息

定时消息是指消息具备定时发送的功能,当消息发送到服务端后,不会立即投递给消费者。而是要等到消息指定的时间后才会投递给消费者进行消费。延迟消息也就是定时消息,定时消息是定在某个时间点进行发送,比如 2020-11-11 12:00:00 发送。延迟消息一般是在当前发送时间的基础上延迟多久进行发送,比如当前时间是 2020-09-10 12:00:00,延迟 10 分钟,那么消息发送成功后将在 2020-09-10 12:10:00 进行投递给消费者。定时消息可以在订单超时未支付自动取消等场景使用。


4.4、事务消息

RocketMQ 提供类似 X/Open XA 的分布式事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致。

交互流程:

  1. 发送方首先发送半事务消息到 RocketMQ 服务端。
  2. RocketMQ 服务端接收到消息,然后将消息持久化成功之后,向发送方返回 Ack 确认消息已经发送成功,此时消息为半事务消息,不会投递给消费方。
  3. 收到半事务消息的 Ack 后,发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向服务端提交二次确认,如果本地事务执行成则进行消息的 Commit,如果执行失败则进行消息的 Rollback,服务端收到 Commit 状态则将半事务消息标记为可投递,消费方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,消费方将不会收到该消息。
  5. 如果出现意外情况,步骤 4 没有进行消息的二次确认,等待固定时间后服务端将对该消息发起消息回查。
  6. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行操作。


5、最佳实践


5.1、消息重试

消息在消费方消费失败后,RocketMQ 服务端会重新进行消息的投递,知道消费者成功消费消息,当然重试有次数限制,默认 16 次。消息重试在一定程度上保证了消息不丢失,通过重试来达到最终被消费的目的。需要注意的是消费者在消费的时候一定要等本地业务成功后才能进行 ACK (消费确认),不然就会出现消费失败,但是已经 ACK,消息将不会重复投递。如果采取异步消费的方式,需要进行异步转同步,等异步操作完才进行 ACK。最后需要做好对应的监控,如果重试了 4,5 次还是失败的,基本上后面重试也是失败的。这个时候需要让开发人员知道,该人工处理的就人工介入。或者直接监控死信队列。


5.2、消息过滤

消息主题,一般用于一类消息的统一分类。比如订单主题,但是订单下的消息会分为很多种。比如创建订单,取消订单等。不同类型的消息有不同的业务处理,我们可以统一定义消息格式,然后通过一个字段去区分消息类型来做不同的业务逻辑。不好的点在于所有消息都会推送到消费方,不能按需消费。在 RocketMQ 中可以给消息指定 tag,通过 tag 来区分消息类型。消费者可以根据 Tag 在 RocketMQ 服务端完成消息过滤,以确保消费者最终只消费到其关注的消息类型。所有消息都在一个主题中,测试环境消费测试环境的 tag,线上消费线上的 tag。这种方式的问题在于消息没做隔离,线上线下的消息都在一起。另一个就是 tag 被固定成了环境的区分,无法用于消息类型场景,导致只能建多个 topic 来承载多个业务消息类型。

5.3、消费模式

RocketMQ 消费模式有两种,集群消费和广播消费。

集群消费:消费者部署了多个实例我们称之为一个集群,集群消费只会被其中的某一个实例进行消费。适合大部分的业务场景,大部分的场景我们的消息只允许被消费一次,而且只能有一个消费者去消费,比如支付回调场景,如果一个消息被多个实例同时消费,那么就会出现同时去修改订单状态,同时去扣减库存的情况。

广播消费:广播消费会让集群中每个实例都消费一次。

比如我们使用了本地缓存,当数据变更的时候,我们需要刷新每个节点本地的缓存,所以每个节点都需要收到消息。


5.4、消费幂等

幂等问题,无论是在 API 请求场景还是在消息消费场景,都会遇到。一条消息不能重复消费多次这个肯定是要保证的,因为我们不能保证消息发送方不发送多次,也不能保证消息不重复投递。RocketMQ 的 Exactly-Once 投递语义,就是用于解决幂等问题。Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。最佳的幂等处理方式还是需要有一个唯一的业务标识,虽然每条消息都有 MessageId,但是不建议用 MessageId 来做幂等判断,在发送消息的时候,可以为每条消息设置一个 MessageKey,这个 MessageKey 就可以用来做业务的唯一标识。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 传感器
RabbitMQ 在物联网 (IoT) 项目中的应用案例
【8月更文第28天】随着物联网技术的发展,越来越多的设备被连接到互联网上以收集和传输数据。这些设备可以是传感器、执行器或其他类型的硬件。为了有效地管理这些设备并处理它们产生的大量数据,需要一个可靠的消息传递系统。RabbitMQ 是一个流行的开源消息中间件,它提供了一种灵活的方式来处理和转发消息,非常适合用于物联网环境。
107 1
|
消息中间件 存储 网络协议
从原理到实战,手把手教你在项目中使用RabbitMQ
RabbitMQ 的文章之前写过,但是当时给的示例是 Demo 版的,这篇文章主要是结合之前写的理论知识,将 RabbitMQ 集成到技术派项目中。 话不多说,上文章目录: 下面我们先回顾一下理论知识,如果对这块知识已经清楚的同学,可以直接跳到实战部分。 1. 消息队列 1.1 消息队列模式 消息队列目前主要 2 种模式,分别为“点对点模式”和“发布/订阅模式”。 点对点模式 一个具体的消息只能由一个消费者消费,多个生产者可以向同一个消息队列发送消息,但是一个消息在被一个消息者处理的时候,这个消息在队列上会被锁住或者被移除并且其他消费者无法处理该消息。 需要额外注意的是,如果消费者
578 5
|
3月前
|
消息中间件 物联网 API
消息队列 MQ使用问题之如何在物联网项目中搭配使用 MQTT、AMQP 与 RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 JSON Java
RabbitMQ的springboot项目集成使用-01
RabbitMQ的springboot项目集成使用-01
|
5月前
|
消息中间件 Java 数据安全/隐私保护
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
|
11月前
|
消息中间件 Kafka 测试技术
微服务轮子项目(33) -RocketMQ特点、安装部署、异常处理、事务消息原理
微服务轮子项目(33) -RocketMQ特点、安装部署、异常处理、事务消息原理
147 0
|
消息中间件 缓存 NoSQL
|
10月前
|
消息中间件 存储 NoSQL
SpringBoot项目整合Redis,Rabbitmq发送、消费、存储邮件
SpringBoot项目整合Redis,Rabbitmq发送、消费、存储邮件
634 9
|
5月前
|
网络协议 Linux 网络安全
Linux MQTT智能家居项目(网络基础知识)
Linux MQTT智能家居项目(网络基础知识)
132 0
Linux MQTT智能家居项目(网络基础知识)
|
5月前
|
消息中间件 缓存 Java
RabbitMQ在项目中做什么用?怎么消费消息?具体怎么使用的?
RabbitMQ在项目中做什么用?怎么消费消息?具体怎么使用的?