分布式消息队列RocketMQ概念详解

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 分布式消息队列RocketMQ概念详解

1.MQ概述

1.1 RocketMQ简介

RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息生产,存储,消费全过程API的软件系统。

1.2 MQ用途

限流削峰


MQ可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。


39b3f707f7604e1abb975ca7262476a3.png


异步解耦


上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高、而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一个MQ层。


9089d887b4ca4ed7b35365bdabc3b953.png


数据收集


分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。


1.3 常见MQ产品


RabbitMQ

RabbitMQ是使用ErLang语言开发的一款MQ产品。其吞吐量较Kafka与RocketMQ要低,且由于其不是Java语言开发,所以公司内部对其实现定制化开发难度较大。

Kafka

Kafka是使用Scala/Java语言开发的一款MQ产品。其最大的特点就是高吞吐量,常用于大数据领域的实时计算、日志采集等场景。其没有遵循任何常见的MQ协议,而是使用自研协议。

RocketMQ

RocketMQ是使用Java语言开发的一款MQ产品。经过数年阿里双11的考验,性能与稳定性非常高。其没有遵循任何常见的MQ协议,而是使用自研协议。


对比


d8ef3d1a476a478c89fb998a4ed47485.png

2.RocketMQ 基本概念

2.1 消息

消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。单个消息所占空间不会很大。

RocketMQ中每个消息拥有唯一的MessageId,且可以携带具有业务标识的Key,以方便对消息的查询。不过需要注意的是,MessageId有两个:在生产者send()消息时会自动生成一个MessageId(msgId),当消息到达Broker后,Broker也会自动生成一个MessageId(offsetMsgId)。msgId、offsetMsgId与key都称为消息标识。


msgId:由producer端生成,其生成规则为: producerIp + 进程pid + MessageClientIDSetter类的ClassLoader的hashCode + 当前时间 + AutomicInteger自增计数器

offsetMsgId:由broker端生成,其生成规则为:brokerIp + 物理分区的offset(Queue中的偏移量)

key:由用户指定的业务相关的唯一标识

2.2 主题

Topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。 一个生产者可以同时发送多种Topic的消息;而一个消费者只对某种特定的Topic感兴趣,即只可以订阅和消费一种Topic的消息。

2.3 标签

标签为消息设置的标签,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。 标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。 Topic是消息的一级分类,Tag是消息的二级分类。Topic相当于货物,Tag相当于上海山东等地区。


2.4 队列

存储消息的物理实体。 一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。 一个Topic的Queue也被称为一个Topic中消息的分区(Partition)。 一个Topic的Queue中的消息只能被一个消费者组中的一个消费者消费。 一个Queue中的消息不允许同一个消费者组中的多个消费者同时消费。


分片不同于分区。在RocketMQ中,分片指的是存放相应Topic的Broker。每个分片中会创建出相应数量的分区,即Queue,每个Queue的大小都是相同的。


c1a12cb825b64d9fbb8aed7b346f1412.png


2.5 Producer

消息生产者,负责生产消息。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。  例如:用户提交的请求写入到MQ的过程,就是消息生产的过程,在这里用户就是生产者 。



RocketMQ中的消息生产者都是以生产者组(Producer Group)的形式出现的。生产者组是同一类生产者的集合,这类Producer发送相同Topic类型的消息。一个生产者组可以同时发送多个主题的消息。如果主题中有多个队列,生产者组只有一个生产者,生产者会采取轮询的方式进行发送消息。


生产者代码如下:


导入依赖

       <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>

生产者代码

  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer order = new DefaultMQProducer("order");
        order.setNamesrvAddr("localhost:9876");
        order.start();
        Message message = new Message("myTopic", "myTag", ("test").getBytes());
        SendResult result = order.send(message);
        System.out.println(result);
        order.shutdown();
    }

2.6 Consumer

消息消费者,负责消费消息。一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务处理。  例如:系统从MQ中读取到请求,并对请求进行处理的过程就是消息消费的过程,在这里系统就是消费者。

RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的。消费者组是同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息。 消费者组使得在消息消费方面,实现负载均衡(将一个Topic中的不同的Queue平均分配给同一个Consumer Group的不同的Consumer,注意,并不是将消息负载均衡)和容错(一个Consmer挂了,该Consumer Group中的其它Consumer可以接着消费原Consumer消费的Queue)的目标变得非常容易。


消费者代码

  public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("myTopic","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println("收到的消息"+list);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }

负载均衡策略


queue 个数大于 Consumer个数, 那么 Consumer 会平均分配 queue,不够平均,会根据clientId排序来拿取余数

queue个数小于Consumer个数,那么会有Consumer闲置,就是浪费掉了,其余Consumer平均分配到queue

消费者组中Consumer的数量应该小于等于订阅Topic的Queue数量。如果超出Queue数量,则多出的Consumer将不能消费消息。

2.7 NameServer

NameServer是一个Broker与Topic路由的注册中心,支持Broker的动态注册与发现。

主要包括两个功能:

Broker管理:接受Broker集群的注册信息并且保存下来作为路由信息的基本数据;提供心跳检测机制,检查Broker是否还存活。


路由信息管理:每个NameServer中都保存着Broker集群的整个路由信息和用于客户端查询的队列信息。Producer和Conumser通过NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。



路由注册

Name Server既然是注册中心,那么是如何完成注册的呢? NameServer通常也是以集群的方式部署,不过,NameServer是无状态的,即NameServer集群中的各个节点间是无差异的,各节点间相互不进行信息通讯。 那各节点中的数据是如何进行数据同步的呢?在Broker节点启动时,轮询NameServer列表,与每个NameServer节点建立长连接,发起注册请求。在NameServer内部维护着⼀个Broker列表,用来动态存储Broker的信息。

Broker节点为了证明自己是活着的,为了维护与NameServer间的长连接,会将最新的信息以心跳包的方式上报给NameServer,每30秒发送一次心跳。心跳包中包含 BrokerId、Broker地址(IP+Port)、Broker名称、Broker所属集群名称等等。NameServer在接收到心跳包后,会更新心跳时间戳,记录这个Broker的最新存活时间。



路由剔除

由于Broker关机、宕机或网络抖动等原因,NameServer没有收到Broker的心跳,NameServer可能会将其从Broker列表中剔除。 NameServer中有⼀个定时任务,每隔10秒就会扫描⼀次Broker表,查看每一个Broker的最新心跳时间戳距离当前时间是否超过120秒,如果超过,则会判定Broker失效,然后将其从Broker列表中剔除。



路由发现

RocketMQ的路由发现采用的是Pull模型。当Topic路由信息出现变化时,NameServer不会主动推送给客户端,而是客户端定时拉取Topic最新的路由。 默认客户端每30秒会拉取一次最新的路由。


2.8 Broker

Broker充当着消息中转角色,负责存储消息、转发消息。

Broker在RocketMQ系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求作准备。Broker同时也存储着消息相关的元数据,包括消费者组消费进度偏移offset、主题、队列等。


模块如下图:


379a9636b0ba4de5bbfb606422d78fb0.png

Remoting Module:整个Broker的实体,负责处理来自clients端的请求。而这个Broker实体则由以下模块构成。


Client Manager:客户端管理器。负责接收、解析客户端(Producer/Consumer)请求,管理客户端。例如,维护Consumer的Topic订阅信息


Store Service:存储服务。提供方便简单的API接口,处理消息存储到物理硬盘和消息查询功能。


HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。


Index Service:索引服务。根据特定的Message key,对投递到Broker的消息进行索引服务,同时也提供根据Message Key对消息进行快速查询的功能。


2.9 RocketMQ 工作流程

工作流程如下图:


a725047ef7784c48ab60dd2f23b94e9a.png

1)启动NameServer,NameServer启动后开始监听端口,等待Broker、Producer、Consumer连接。



2)启动Broker时,Broker会与所有的NameServer建立并保持长连接,然后每50秒向NameServer定时发送心跳包。



3)发送消息前,可以先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,当然,在创建Topic时也会将Topic与Broker的关系写入到NameServer中。不过,这步是可选的,也可以在发送消息时自动创建Topic。



4) Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取路由信息,即当前发送的Topic消息的Queue与Broker的地址(IP+Port)的映射关系。然后根据算法策略从队选择一个Queue,与队列所在的Broker建立长连接从而向Broker发消息。当然,在获取到路由信息后,Producer会首先将路由信息缓存到本地,再每30秒从NameServer更新一次路由信息。



5)Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取其所订阅Topic的路由信息,然后根据算法策略从路由信息中获取到其所要消费的Queue,然后直接跟Broker建立长连接,开始消费其中的消息。Consumer在获取到路由信息后,同样也会每30秒从NameServer更新一次路由信息。不过不同于Producer的是,Consumer还会向Broker发送心跳,以确保Broker的存活状态。



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
存储 SQL 分布式数据库
OceanBase 入门:分布式数据库的基础概念
【8月更文第31天】在当今的大数据时代,随着业务规模的不断扩大,传统的单机数据库已经难以满足高并发、大数据量的应用需求。分布式数据库应运而生,成为解决这一问题的有效方案之一。本文将介绍一款由阿里巴巴集团自主研发的分布式数据库——OceanBase,并通过一些基础概念和实际代码示例来帮助读者理解其工作原理。
304 0
|
22天前
|
消息中间件 网络协议 C#
C#使用Socket实现分布式事件总线,不依赖第三方MQ
`CodeWF.EventBus.Socket` 是一个轻量级的、基于Socket的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。
C#使用Socket实现分布式事件总线,不依赖第三方MQ
|
1月前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现
消息队列系统中的确认机制在分布式系统中如何实现
|
1月前
|
消息中间件 存储 监控
【10月更文挑战第2天】消息队列系统中的确认机制在分布式系统中如何实现
【10月更文挑战第2天】消息队列系统中的确认机制在分布式系统中如何实现
|
28天前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现?
消息队列系统中的确认机制在分布式系统中如何实现?
|
1月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
46 0
|
3月前
|
消息中间件 存储 网络协议
消息中间件RabbitMQ---概述和概念 【一】
该文章提供了对消息中间件RabbitMQ的全面概述,包括其核心概念、工作原理以及与AMQP和JMS的关系。
消息中间件RabbitMQ---概述和概念 【一】
|
3月前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现?
消息队列系统中的确认机制在分布式系统中如何实现?
|
3月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
84 2
|
3月前
|
运维 负载均衡 算法
“分布式基础概念”全面解析,让你秒懂分布式系统!【一】
该博客文章全面解析了分布式系统的基础概念,包括微服务架构、集群与分布式的区别、节点定义、远程调用、负载均衡、服务注册与发现、配置中心、服务熔断与降级以及API网关,帮助读者快速理解分布式系统的关键组成部分和工作原理。
“分布式基础概念”全面解析,让你秒懂分布式系统!【一】