RocketMQ(一):消息中间件缘起,一览整体架构及核心组件

本文涉及的产品
网络型负载均衡 NLB,每月750个小时 15LCU
传统型负载均衡 CLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
简介: 【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。

RocketMQ(一):消息中间件缘起,一览整体架构及核心组件

消息队列MessageQueue,简称MQ

在队列的基础上,加入生产者与消费者模型,使用队列作为载体就能够组成简单的消息队列,在队列中“运输”的数据被称为消息

image.png

消息队列可以在单节点内存中使用,也可以作为分布式存储的中间件来使用

由于项目的架构组织,目前常接触的消息队列往往是作为分布式存储的消息中间件来使用,比如:RabbitMQ、RocketMQ、Kafka等

内存队列相比于消息中间件往往有轻量、低延迟(无需网络通信)、简单易用的特点,但也存在不能持久化(消息丢了怎么办?)、无法扩展(消息量太大怎么办?)的缺陷

消息中间件的特点较多如:持久化、高可用、集群扩展、负载均衡、系统解耦等特点,但同时也会增加调用链路、提升系统复杂度,因此常用于分布式系统中

特点

异步通信:MQ提供异步通信,无需同步等待,适合需要异步场景

持久化:消息会进行持久化,持久化后无需担心异步通信的消息会丢失

削峰填谷:面对突发流量,MQ相当于缓冲区,防止后端服务短时间内接收过多请求导致服务崩溃

系统解耦:松耦合,生产者(调用方)、消费者(被调用方)可以独立升级/扩展

集群:与其他中间件集群类似,方便水平/垂直扩展,提高系统吞吐量/可用性

消息中间件除了这些特点外,还有它们独有的功能与特点,本文就从RocketMQ开始,快速入门消息中间件专栏

在专栏中一步步解析消息中间件的架构、流程、原理、源码等,再分析各种消息中间件的优势以及适用场景

RocketMQ架构概念

  • Message:消息为MQ中运输的载体,存储需要传输的数据与其他元数据

  • MessageQueue:消息队列用于存储消息,内部通过偏移量能够找到消息

    • 分为读写队列用于消费时读和持久化消息时写,通常队列数量相同

    • 队列ID使用数量0开始并逐步进行自增,比如分配3个读写队列,那么id分别为0、1、2

  • Topic:主题(类似Kafka中的分区),生产者需要将消息发送到对应的Topic上,消费者需要订阅对应的Topic进行消费

    • 充当消息的分类,过滤消息,比如不同业务(量级)的消息分发到对应的Topic中(order、pay、cart、user...)
    • Topic中存在多个队列(MQ)用于存储消息,增加topic下的队列能够提高消息的水平写入能力
    • Topic可以存在不同的broker上,保证高可用
  • Tag:主题下的二级分类,过滤消息

  • Broker:存储消息、MessageQueue、Topic等元数据的服务端,用于接收消息(处理生产)、持久化消息、查找消息(处理消费)

    • 接收消息只能由主节点接收并持久化,从节点只用于同步对应主节点消息
    • 消费消息既可以通过主节点拉取消息,也可以通过从节点

通过下面两个主节点的Broker图,很容易的可以理解它们的关系:

  1. Broker为存储消息的服务端,其中包含多个Topic
  2. Topic是用于生产、消费订阅的主题,其中为了能够水平扩展写入性能可以设置多个MessageQueue,MessageQueue的ID从0开始进行自增
  3. 为了保证高可用,不同的Broker也会存在相同的Topic,只是其中的队列不同,防止broker意外宕机时服务不可用,如图中的TopicA,0、1队列在Broker A中,而2、3队列在Broker B中

    image.png

  • NameServer:存储broker路由信息、类似注册中心

    • 与broker通信,存储其数据如:Topic、MessageQueue等数据,顺便进行心跳判断broker是否离线
    • 与producer、consumer通信,将broker元数据进行传输,这样无论是生产还是消费都能根据数据找到对应的Broker、Topic、MQ等
  • Product:生产者,用于生产消息,并把消息发送到消息队列

    • 相同配置的生产者成组Group可以协调工作
    • 通过NameServer通信获取到的路由信息,根据负载均衡算法选择对应的Topic以及队列ID发送到对应Broker中进行持久化
  • Comsumer:消费者,用于消费消息,从消息队列拉取消息(长轮询)进行消费消息

    • 队列中使用偏移量确认消息,消费时同理也使用偏移量标识消费到的位置
    • 消费模式分为广播、集群模式,广播模式就是发布订阅模型,集群模式为点对点消费
    • 拉取消息利用长轮询机制弥补实时性差的特点,但大量长连接会导致开销大(后文详细描述长轮询机制)
    • 通过NameServer通信获取到的路由信息,消费者根据消费模式(广播/集群)选择对应的Topic,根据推送/拉取的方式获取消息
    • Group 同组消费者协调工作均衡消费消息,集群模式下一个队列最多对应一个消费者,如果消费者数量超过队列数量则无效

通过以下的架构图,能够容易理解NameServer、Broker、Product、Consumer集群之间的关系:

  1. NameServer集群启动
  2. Broker集群配置NameServer集群地址并启动,向每个NameServer节点心跳的同时携带自身broker中的Topic、MessageQueue等路由信息(routing info)
  3. Product、Consumer客户端配置NameServer集群地址启动后,定时任务获取broker信息(broker上下线等操作也会即时更新)
  4. Product根据负载均衡算法、Topic、MessageQueue和Broker信息找到要通信的Broker发送消息
  5. Broker收到Product消息后进行持久化
  6. Consumer根据再平衡算法得到自己要消费的队列,再通过Broker信息通信获取消息进行消费

    image.png

在这个流程中,小菜有个疑问:为什么Product、Consumer获取Broker数据要通过NameServer通信?

  1. NameServer在架构中的作用如注册中心,管理服务注册与发现
  2. 架构解耦,将心跳/交互数据/判断状态等功能交给NameServer(注册中心)去做
  3. 在broker集群下,如果没有NameServer这种broker间需要互相进行心跳和同步汇总的数据,当节点繁多时增加带宽压力,另外broker宕机时还要增加机制来进行判断是否下线,并且product与consumer需要配置broker信息会非常多(需要网络通信)
  4. NameServer集群间节点无状态互不通信,提供高可用集群

Spring Boot 快速上手

RocketMQ的broker作为服务端,NameServer作为注册中心,与编写代码的接触比较少,较多的还是生产者与消费者(客户端)

经过大量的理论知识,我们知道MQ的大致流程,接下来使用SpringBoot编写代码实现Product和Consumer客户端

原生RocketMQ提供的生产者与消费者API繁多并且使用时需要try catch、使用起来麻烦,企业级开发通常会在其基础上进行封装常用的API

Spring Boot 框架作为脚手架,整合RocketMQ会非常快,并且还提供对应的RocketMQTemplate对原生API进行封装简化开发

  1. 导入maven依赖
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>
  1. 封装原生Product API

企业级开发常常会对原生API进行封装,而其中的ServerProduct是自定义的类,组合原生默认的生产者DefaultMQProducer来封装API简化开发

在这个过程中,通常会用配置文件的方式配置有关生产者的参数如:组名、nameserver地址、发送消息失败重试次数、发送消息超时时间等...

设置完参数后,启动生产者 producer.start()

public class ServerProduct {
   

    private DefaultMQProducer producer;

    public ServerProduct(String producerGroup) {
   
        producer = new DefaultMQProducer(producerGroup);
        init();
    }

    public ServerProduct() {
   
        producer = new DefaultMQProducer("Default_Server_Producer_Group");
        init();
    }

    private void init() {
   
        //初始化 主要通过配置文件的值进行set 最后启动生产者
        producer.setNamesrvAddr("127.0.0.1:9876");
        //...

        try {
   
            producer.start();
        } catch (MQClientException e) {
   
            throw new RuntimeException(e);
        }
    }
}

启动生产者主要会去启动定时任务同步nameserver、broker数据,并初始化一些组件,后续用于客户端网络通信、负载均衡等

这些原理放到后文源码解析再具体聊聊~

然后再封装一个发送消息的API:

sendSyncMsg 发送同步消息API中第一个参数为topic(一级分类),第二个传输为tag(二级分类),第三个传输为消息体

内部会通过参数构建Message并使用原生API发送给Broker

public SendResult sendSyncMsg(String topic, String tag, String jsonBody) {
   
    Message message = new Message(topic, tag, jsonBody.getBytes(StandardCharsets.UTF_8));
    SendResult sendResult;
    try {
   
        sendResult = producer.send(message);
    } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
   
        throw new RuntimeException(e);
    }
    return sendResult;
}
  1. 编写controller类,使用生产者调用API,发送到broker
@RequestMapping("/warn")
@RestController
@Slf4j
public class WarnController {
   

    private static final String topic = "TopicTest";

    @Autowired
    private ServerProduct producer;

    @GetMapping("/syncSend")
    public SendResult syncSend() {
   
        return producer.sendSyncMsg(topic, "tag", "sync hello world!");
    }
}
  1. 消费者订阅Topic

发送完消息后,消息会持久化到broker中,因此我们需要使用消费者获取消息并进行消费

企业级开发时通常会使用注解的方式标识consumer需要订阅的信息,再通过解析注解的方式将数据注入的消费者中,我们这里直接使用spring提供的注解@RocketMQMessageListener

@Component
@RocketMQMessageListener(topic = "TopicTest", consumerGroup = "warn_consumer_group")
public class WarnConsumer implements RocketMQListener<String> {
   

    @Override
    public void onMessage(String message) {
   
        // 处理消息
        System.out.println("Received message: " + message);
    }
}
  1. 手动创建Topic:TopicTest 不要自动创建Topic 项目太大可能导致遗忘

使用Dashboard手动创建Topic

  1. 启动NameServer、Broker再启动SpringBoot服务,访问 /warn/syncSend 测试发送消息、消费

NameServer、Broker的部署可以查看官方文档

至此我们经历过消息的生产与消费,但在消息的“一生”中还可能出现各种各样的情况:

  1. 发送消息的方式:普通(同步、异步、单向)/顺序/延迟/批量/事务消息等...
  2. 消费消息的方式:push/pull消费、集群/广播模式...
  3. 如何保证消息不丢失?
  4. 消息是如何高效持久化的?
  5. 如何保证消费幂等?如何解决消息堆积、延时?

这些情况后文都会由浅入深一一解决,查看RocketMQ实现原理,并从原理中体验出设计的思想,再去查看其他的消息中间件~

总结

消息中间件通常有削峰填谷、异步通信、架构解耦、高性能、高可用、集群扩展、负载均衡等相关特点,同时项目中引入消息中间件也会增加调用链路、系统复杂度

RocketMQ由NameServer、Broker、Product、Consumer等集群组成

其中Broker作为服务端,负责接收消息、对消息进行高效持久化、消费消息时高效查询

定义Topic对消息进行分类,为了提升水平扩展写入能力,Topic下可以设置MessageQueue队列,消息作为数据载体存储在队列中,等待被消费

为了满足高可用,相同的Topic会被放到不同的master broker,避免”所有坤蛋都在同一个篮子“中

NameServer集群作为“注册中心”,节点无状态之间互不通信,只与Broker集群心跳同时更新路由信息,等到Product、Consumer定时通信时再将Broker信息进行传输

Product为消息生产方,通过与NameServer获取的Broker中Topic、队列ID等信息,使用负载均衡算法后找到对应Broker进行通信

Consumer为消息消费者,根据再平衡负载均衡得到自己负责消费的队列,再通过Broker获取消息进行消费

最后(点赞、收藏、关注求求啦~)

本篇文章被收入专栏 消息中间件,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 Gitee-CaiCaiJavaGithub-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
监控 前端开发 数据可视化
3D架构图软件 iCraft Editor 正式发布 @icraft/player-react 前端组件, 轻松嵌入3D架构图到您的项目,实现数字孪生
@icraft/player-react 是 iCraft Editor 推出的 React 组件库,旨在简化3D数字孪生场景的前端集成。它支持零配置快速接入、自定义插件、丰富的事件和方法、动画控制及实时数据接入,帮助开发者轻松实现3D场景与React项目的无缝融合。
197 8
3D架构图软件 iCraft Editor 正式发布 @icraft/player-react 前端组件, 轻松嵌入3D架构图到您的项目,实现数字孪生
|
2月前
|
SQL 数据采集 分布式计算
【赵渝强老师】基于大数据组件的平台架构
本文介绍了大数据平台的总体架构及各层的功能。大数据平台架构分为五层:数据源层、数据采集层、大数据平台层、数据仓库层和应用层。其中,大数据平台层为核心,负责数据的存储和计算,支持离线和实时数据处理。数据仓库层则基于大数据平台构建数据模型,应用层则利用这些模型实现具体的应用场景。文中还提供了Lambda和Kappa架构的视频讲解。
256 3
【赵渝强老师】基于大数据组件的平台架构
|
2月前
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
121 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
2月前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
3月前
|
消息中间件 运维 NoSQL
基础架构组件选型及服务化
【10月更文挑战第15天】本文概述了分布式系统中常见的基础架构组件及其选型与服务化的重要性。
|
8月前
|
消息中间件 存储 负载均衡
消息中间件的选择:RabbitMQ是一个明智的选择
消息中间件的选择:RabbitMQ是一个明智的选择
123 0
|
7月前
|
消息中间件 存储 中间件
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
1888 0
|
6月前
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
288 3
|
3月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
141 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
5月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
下一篇
开通oss服务