通过实例理解 RabbitMQ 的基本概念

简介:

先说下自己开发的实例。

最近在使用 Spring Cloud Config 做分布式配置中心(基于 SVN/Git),当所有服务启动后,SVN/Git 中的配置文件更改后,客户端服务读取的还是旧的配置,并不能实时读取(配置信息会缓存在客户端),Spring Boot 提供了一种方式进行更新(通过spring-boot-starter-actuator监控模块),然后 Post 访问客户端服务的/refresh接口(也可以命令执行curl -X POST http://worker2:8115/refresh),这样客户端会重新从配置中心获取新的配置信息,请求命令可以写在 Git 的 Webhooks 脚本中(修改提交 Push 后执行)。

如果客户端服务比较少的话,这样的解决方式没问题,如果客户端服务多的话,执行的请求脚本就会非常多,而且单个服务的解决方式,也不利于后期的维护(点对点的方式),那该怎么解决上面的问题呢?答案就是通过 Spring Cloud Bus

Spring Cloud Bus 翻译为消息总线,使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都能连接上来,由于该主题中产生的消息会被所有实例监听和消费,所以我们称它为消息总线。在总线上的各个实例都可以方便地广播一些需要让其他连接在该主题上的实例都知道的消息,例如配置信息的变更或者其他一些管理操作等。

架构示意图(引用来源):

下面,我们需要利用 Spring Cloud Bus 来改造 Spring Cloud Config 的服务端和客户端,其实非常简单。

添加下面的依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

然后在bootstrap.yml中添加下面配置:

spring:
  rabbitmq:
    host: manager1
    port: 5672
    username: admin
    password: admin123
management:
  security:
    enabled: false

上面的配置信息都是新增的,并且都需要配置在服务端和客户端,通过上面的示例图可以看到,配置信息更新后请求的是服务端,那么客户端我们就不需要配置management.security.enabled(也不需要配置spring-boot-starter-actuator监控模块)。

服务端和客户端的任何 Java 代码都不需要编写,重新启动服务,当配置信息更新后,通过 Git 的 Webhooks 执行请求脚本:curl -X POST http://manager1:port/bus/refresh服务端接受到请求之后,会通过 Spring Cloud Bus 通知所有的客户端(通过 RabbitMQ),重新从配置中心获取配置信息,达到实时更新配置的目的。

上面的实例描述就到这里。

RabbitMQ 的基本概念

RabbitMQ,是一个使用 erlang 编写的 AMQP(高级消息队列协议)的服务实现,简单来说,就是一个功能强大的消息队列服务。

RabbitMQ 最基本模型

RabbitMQ 的基本概念

  • Producer:消息生产者。
  • Consumer:消息消费者。
  • Connection(连接):Producer 和 Consumer 通过TCP 连接到 RabbitMQ Server。
  • Channel(信道):基于 Connection 创建,数据流动都是在 Channel 中进行。
  • Exchange(交换器):生产者将消息发送到 Exchange(交换器),由 Exchange 将消息路由到一个或多个 Queue 中(或者丢弃);Exchange 并不存储消息;Exchange Types 常用有 Fanout、Direct、Topic 三种类型,每种类型对应不同的路由规则。
  • Queue(队列):是 RabbitMQ 的内部对象,用于存储消息;消息消费者就是通过订阅队列来获取消息的,RabbitMQ 中的消息都只能存储在 Queue 中,生产者生产消息并最终投递到 Queue 中,消费者可以从 Queue 中获取消息并消费;多个消费者可以订阅同一个 Queue,这时 Queue 中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
  • Binding(绑定):是 Exchange(交换器)将消息路由给 Queue 所需遵循的规则。
  • Routing Key(路由键):消息发送给 Exchange(交换器)时,消息将拥有一个路由键(默认为空), Exchange(交换器)根据这个路由键将消息发送到匹配的队列中。
  • Binding Key(绑定键):指定当前 Exchange(交换器)下,什么样的 Routing Key(路由键)会被下派到当前绑定的 Queue 中。

另外,再说下 Exchange Types(交换器类型)的三种常用类型

  • Direct:完全匹配,消息路由到那些 Routing Key 与 Binding Key 完全匹配的 Queue 中。比如 Routing Key 为cleint-key,只会转发cleint-key,不会转发cleint-key.1,也不会转发cleint-key.1.2
  • Topic:模式匹配,Exchange 会把消息发送到一个或者多个满足通配符规则的 routing-key 的 Queue。其中*表号匹配一个 word,#匹配多个 word 和路径,路径之间通过.隔开。如满足a.*.c的 routing-key 有a.hello.c;满足#.hello的 routing-key 有a.b.c.helo
  • Fanout:忽略匹配,把所有发送到该 Exchange 的消息路由到所有与它绑定 的Queue 中。

下面通过一段代码,理解一下消息发布的流程代码引用):

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='first', type='topic')
channel.queue_declare(queue='A')
channel.queue_declare(queue='B')

channel.queue_bind(exchange='first', queue='A', routing_key='a.*.*')
channel.queue_bind(exchange='first', queue='B', routing_key='a.#')

channel.basic_publish(exchange='first',
                      routing_key='a',
                      body='Hello World!')

channel.basic_publish(exchange='first',
                      routing_key='a.b.c',
                      body='Hello World!')

大致步骤

  • 先获取一个 Connection(连接)。
  • 从 Connection(连接)上获取一个 Channel(信道)。
  • 声明一个 Exchange(交换器),只会创建一次。
  • 声明两个 Queue,只会创建一次。
  • 把 Queue 绑定到 Exchange(交换器)上.
  • 向指定的 Exchange(交换器)发送一条消息.

因为基于 Exchage Topic 模式,在上面发出的两条消息当中,消息a只会被a.#匹配到,而a.b.c会被两个都匹配到。所以,最终的结果会是队列 A 中有一条消息,队列 B 中有两条消息。

从队列取出消息代码:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='A')

def callback(ch, method, properties, body):
    print body

channel.basic_consume(callback, queue='A', no_ack=True)
channel.start_consuming()

服务消费者取出消息,需要重新创建 Connection(连接)和 Exchange(交换器),但 Queue 并不会创建,只需要从 Channel 中获取对应的 Queue 消息即可。

通过实例理解 RabbitMQ 基本概念

上面实例服务部署的情况是:三台管理服务器(config-server-git/config-server-svn)和一台工作服务器(config-client-git/config-server-svn),因为做了集群,服务的具体情况

  • config-server-git:3 个服务。
  • config-server-svn:3 个服务。
  • config-client-git:1 个服务。
  • config-client-svn:1 个服务。

所以,总的部署服务有 8 个

我们通过 RabbitMQ Server 管理界面中的内容,说下 Connection(连接)、Channel(信道)、Exchange(交换器)和 Queue(队列)的具体使用情况(根据数量理解)。

1. Connection(连接)

为什么 Connection(连接)数量为 16 个?因为部署的 8 个服务,各自发布和接受消息(即作为小心发布者,也作为消息接受者),计算公式:16 = 8 * 2

2. Channel(信道)

为什么 Channel(信道)数量为 16 个?因为 Connection(连接)数量为 16 个,Channel(信道)是在 Connection(连接)基础上创建的。

3. Exchange(交换器)

为什么 Exchange(交换器)数量为 1 个?因为都是使用的同一个 Exchange(交换器),名字为springCloudBus,Exchange Type 为topic,Routing Key 为#

4. Queue(队列)

为什么 Queue(队列)数量为 8 个?因为部署的 8 个服务,各自发布和接受的 Queue 是同一个,一个服务对应一个 Queue。

参考资料:

作者:田园里的蟋蟀 
微信公众号: 你好架构 
出处: http://www.cnblogs.com/xishuai/ 
公众号会不定时的分享有关架构的方方面面,包含并不局限于:Microservices(微服务)、Service Mesh(服务网格)、DDD/TDD、Spring Cloud、Dubbo、Service Fabric、Linkerd、Envoy、Istio、Conduit、Kubernetes、Docker、MacOS/Linux、Java、.NET Core/ASP.NET Core、Redis、RabbitMQ、MongoDB、GitLab、CI/CD(持续集成/持续部署)、DevOps等等。 
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接。
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
8月前
|
消息中间件 中间件 数据安全/隐私保护
RabbitMQ 的核心概念
RabbitMQ 的核心概念
50 2
|
8月前
|
物联网
MQTT常见问题之用单片机接入阿里MQTT实例失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
5月前
|
消息中间件 存储 网络协议
消息中间件RabbitMQ---概述和概念 【一】
该文章提供了对消息中间件RabbitMQ的全面概述,包括其核心概念、工作原理以及与AMQP和JMS的关系。
消息中间件RabbitMQ---概述和概念 【一】
|
6月前
|
消息中间件 负载均衡 算法
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
188 2
|
6月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
193 1
|
6月前
|
消息中间件 存储 RocketMQ
【RocketMQ系列十】RocketMQ的核心概念说明
【RocketMQ系列十】RocketMQ的核心概念说明
89 1
|
7月前
|
消息中间件 存储 中间件
【主流技术】聊一聊消息队列 RocketMQ 的基本结构与概念
2.6Broker 代理服务器(Broker)是消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。 2.7Pull Consumer 拉取式消费(Pull Consumer)是 Consumer 消费的一种类型,也是默认的类型。下游应用系统通常主动调用 Consumer 的拉消息方法从 Broke r服务器拉消息,即主动权由下游应用控制。一旦获取了批量消息,应用就会启动消费过程。
|
7月前
|
消息中间件 存储 物联网
RocketMQ基础概念
RocketMQ基础概念
73 1
|
8月前
|
消息中间件 存储 Apache
RocketMQ实战教程之常见概念和模型
Apache RocketMQ 实战教程介绍了其核心概念和模型。消息是基本的数据传输单元,主题是消息的分类容器,支持字节、数字和短划线命名,最长64个字符。消息类型包括普通、顺序、事务和定时/延时消息。消息队列是实际存储和传输消息的容器,是主题的分区。消费者分组是一组行为一致的消费者的逻辑集合,也有命名限制。此外,文档还提到了一些使用约束和建议,如主题和消费者组名的命名规则,消息大小限制,请求超时时间等。RocketMQ 提供了多种消息模型,包括发布/订阅模型,有助于理解和优化消息处理。
|
8月前
|
消息中间件 人工智能 Java
Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动
Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动
211 1