RabbitMQ灵活运用,怎么理解五种消息模型

简介: RabbitMQ灵活运用,怎么理解五种消息模型

简介

上次我们介绍了,为什么rabbitMQ会被很多人中意选型,从而成为火热的MQ组件,今天就先来说一说MQ的基础使用 ———— 其五种消息模型


一、AMQP协议

我们都知道,RabbitMQ是一个使用Erlang语言,基于AMQP协议的MQ组件,那什么是AMQP协议呢,我们就从这开始今天的学习。


AMQP全称为 Advanced Message Queuing Protocol(高级消息队列协议),是一个面向消息的中间件传输协议,用于在应用程序之间进行异步消息通信。


AMQP协议定义了多种角色和服务,包括生产者、消费者、交换器、队列等。其中生产者负责生成消息,消费者负责接收和处理消息,交换器则负责将消息路由到队列中。如下图


1e51310928f14a0984b7ea8e3f622647.png

AMQP协议的消息传输基于消息队列,支持消息的确认、持久化、事务等功能,同时支持不同的消息传输模式,如点对点(point-to-point)和发布-订阅(publish-subscribe)模式。


我们今天要介绍的RabbitMQ的五种模型,其实都是基于AMQP的基础模型或其演变而来。


二、交换机类型与默认交换机

1. 交换机的四种类型

在进一步讲解之前,我们需要知道RabbitMQ,交换机有四种类型,分别是:Direct、Fanout、Topic和Headers。


direct:

按照消息的路由键(routing key)完全匹配来投递消息。直接匹配模式,将消息发送到与路由键完全匹配的队列中。direct模式可以使用RabbitMQ自带的默认交换机,所以不需要将交换机进行任何绑定操作


topic:

使用通配符进行模糊匹配,消息会带有一个路由键(routing key),而队列绑定到交换机上时,也可以指定主题,而且还能使用一定的正则形式,只要主题匹配上消息的路由键,该消息就会发送至该队列

符号“#”匹配一个或多个词 eg:" log.# "能够匹配到‘ log.info.oa ’

符号“ * ” 匹配不多不少一个词 eg:“ log.* ”只能匹配到“log.erro”


headers:

按照消息头(header)匹配来投递消息。根据消息头的键值对匹配,当消息头与某个绑定的headers完全匹配时,才会将消息发送到该队列中。


fanout :

将接收到的所有消息广播到它知道的所有队列中,如果routing_key 有指定也不会生效


不难发现,这四种类型本质上是告诉交换机应该把消息发送给哪些那些队列的,四种类别对应着四种判断角度。fanout —— 相当于广播,不作任何选择,发送给所有连接的队列;direct —— 消息发送时都附带一个字段叫routing_key,direct 模式的交换机就会直接把该字段理解成队列名,找到对应的队列并发送;topic —— 交换机会把routing_key理解成一个主题,恰好,队列绑定交换机时也可以以缩略形式指定主题,所以找到匹配主题的队列就发送;headers —— 这种模式的交换机不再以消息的routing_key作为判断依据,而是在队列绑定交换机时,每个队列需提供一个Map,当消息发送给交换机时,交换机会解析消息头,看看有没有能和各队列Map吻合的属性,有则发给该队列。


2. 默认交换机

RabbitMQ有一个自带的交换机,也被称为AMQP default exchange。当消息发送到RabbitMQ时,如果没有指定交换机,就会被发送到默认交换机。默认交换机的类型为direct类型,路由键与队列名相同。


如果消息的路由键和某个队列的名称一致,那么消息就会被发送到这个队列中。如果消息的路由键和任何一个队列的名称都不一致,那么消息会被丢弃。默认交换机可以通过设置routing_key来指定消息的目的地,例如:


//  将消息发送到名称为test_queue的队列中,空字符串代表默认交换机
channel.basic_publish(exchange="", routing_key="test_queue", body="Hello, RabbitMQ!")

但是,建议应用程序在发送消息时显式地指定交换机,以避免不必要的麻烦或错误。默认交换机只是一个简单的机制,不应被用于复杂的应用程序。


三、五种模式速览

1. 一对一简单模式


f73b9f3c58424b9885c862bbe64feb3f.png

概念

生产者将消息发送到“ hello”队列。使用者从该队列接收消息。


图解

“ P”是我们的生产者

“ C”是我们的消费者

中间的框是一个队列-RabbitMQ代表使用者保留的消息缓冲区。需要注意的是,虽然看起来生产者直接连接了队列,但是实际上,它连接的是rabbitMQ的默认交换机


916f309f4bf848e18111b527e6e7388a.png


2. work模式(轮询)

1843914b5b8b4ce58ae42447828f5611.png


概念

work模式是一个生产者,一个队列,多个消费者,每个消费者获取到的消息唯一,多个消费者只有一个队列,C1\C2是竞争关系,一个消息被C1消费,C2将没有消息


优点及作用

一个生产者一个队列多个消费者模式能够并行化工作,解决了消息积压问题


工作原理

默认情况下,RabbitMQ将每个消息按顺序发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。


注意

轮询分发采用平均,导致机器性能的浪费,可以将消费者信道设置如下,代表不公平分发


int preFetchCount = 1;

// 该参数的意思是消费者在接收到队列里的消息但没有返回确认结果之前,队列不会将新的消息分发给该消费者,该设置需要和手动ACK配合使用

channel.basicQos(preFetchCount )


3. 发布/订阅模式

前两种发布模式,在图中都没有画出 Exchange交换机,属于是一种模型的简化,实际上上两种模式,都将消息发布给了rabbitMQz自带的默认交换机,然后默认交换机再将消息转发给消息队列

(PS:每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同)


fd59b1ba7e50492b8625ac08db3525de.png

概念

和模式二不同的是,模式三是没有路由规则,多个队列,多个消费者。生产者将消息不是直接发送到队列,而是发送到X交换机,然后由交换机发送给两个队列,两个消费者各自监听一个队列,因此一个消息可以被多个消费者消费

4. 路由模式(自称direct模式)

cc6e532e4c194cad9d2729d1402f4610.png


概念

交换机连接接队列发送消息需要指定routing_key,所以模式四就是生产者发送消息到交换机并且要指定routing_key,消费者将队列绑定到交换机时需要指定路由key

算法

背后的路由算法很简单:消息自带一个routing_key(亦称路由键),交换机会把该消息路由给与其routing_key完全匹配的队列 。


在direct模式中,第一个队列以 orange为 key 绑定至交换机 x,第二个队列以black和green绑定。注意:如果有消息没有使用上述key,比如某个消息的key 是 red ,那么该消息将会被丢弃。


另外,用相同的路由键可以绑定多个队列

1c9ff5c825db4cba8d9b67e82b316fd7.png


5. Topic模式

d9e35312e3344d92a79def3950f5cc6c.png


概念

topic模式的routing_key使用通配符组成进行模糊匹配

符号“#”匹配一个或多个词 eg:" log.# "能够匹配到‘ log.info.oa ’

符号“ * ” 只匹配一个词 eg:“ log.* ”只能匹配到“log.erro”


举例说明,当使用下列 routing_key 发送消息时:


  1. “quick.orange.rabbit”的消息将传递到两个队列;
  2. “lazy.orange.elephant ”也将发送给他们两个;
  3. “quick.orange.fox ”只会进入Q1,而“ lazy.brown.fox ”只会进入Q2;
  4. “lazy.pink.rabbit ”将被传递到Q2只有一次,即使两个绑定都匹配;
  5. “quick.brown.fox ”与任何绑定都不匹配,因此将被丢弃;
  6. “orange“ 和“”quick.orange.male.rabbit“,因为单词个数对不上,则不会被匹配
  7. “lazy.orange.male.rabbit ”即使有四个单词,也将匹配最后一个绑定,并将其传送到Q2队列。

四、实例

我们以最复杂的Topic模式为例,实际写一段java代码


1. 生产者代码

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicProducer {
    private static final String EXCHANGE_NAME = "topic_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 创建Topic类型的交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 待发送的消息
        String routingKey = "topic.key1";
        String message = "hello rabbitmq, this is topic message";
        // 发送消息
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println("Sent message: " + message + ", routingKey: " + routingKey);
        channel.close();
        connection.close();
    }
}

2. 消费者代码

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicConsumer {
    private static final String EXCHANGE_NAME = "topic_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 创建Topic类型的交换机,由于发送者已创建,此步其实可省略,一般由发送方建立交换机
        // channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 创建一个非持久、独占、自动删除的队列
        String queueName = channel.queueDeclare().getQueue();
        // 绑定路由键为 "topic.#" 的消息
        channel.queueBind(queueName, EXCHANGE_NAME, "topic.#");
        System.out.println("Waiting for messages. To exit press CTRL+C");
        // 消费消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String routingKey = envelope.getRoutingKey();
                String message = new String(body, "UTF-8");
                System.out.println("Received message: " + message + ", routingKey: " + routingKey);
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

五、总结

rabbitMQ的五种模式就介绍完了,其实后三种模式非常相像,我们可以这么理解Topic模式


当队列用‘#’绑定时它将接收所有消息,与routing_key无关,此时就像fanout模式一样

当队列绑定中不使用‘*’和‘#’时,主题交换就像direct模式一样。


而至于前两种模式,由于模型中不带有交换机,所以生产者和消费者都是直接连接的Queue,则就是直肠子的模式


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6月前
|
消息中间件
RabbitMQ消息模型之Work Queues
RabbitMQ消息模型之Work Queues
49 1
RabbitMQ消息模型之Work Queues
|
6月前
|
消息中间件
RabbitMQ消息模型之Routing-Topic
RabbitMQ消息模型之Routing-Topic
45 0
|
6月前
|
消息中间件 缓存
RabbitMQ消息模型之Sample
RabbitMQ消息模型之Sample
37 0
|
6月前
|
消息中间件
RabbitMQ消息模型之Routing-Direct
RabbitMQ消息模型之Routing-Direct
98 1
|
6月前
|
消息中间件
RabbitMQ消息模型之发布订阅Publish-Subscribe
RabbitMQ消息模型之发布订阅Publish-Subscribe
85 0
RabbitMQ消息模型之发布订阅Publish-Subscribe
|
6月前
|
消息中间件 JSON 缓存
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
162 0
|
3月前
|
消息中间件 测试技术 Kafka
Apache RocketMQ 批处理模型演进之路
RocketMQ 早期批处理模型存在一定的约束条件,为进一步提升性能,RocketMQ 进行了索引构建流水线改造,同时 BatchCQ 模型和 AutoBatch 模型也优化了批处理流程,提供了更简便的使用体验,快点击本文查看详情及配置展示~
19768 78
|
2月前
|
消息中间件 存储 缓存
RabbitMQ:WorkQueues模型
RabbitMQ:WorkQueues模型
47 8
RabbitMQ:WorkQueues模型
|
6月前
|
消息中间件 存储 Apache
RocketMQ实战教程之常见概念和模型
Apache RocketMQ 实战教程介绍了其核心概念和模型。消息是基本的数据传输单元,主题是消息的分类容器,支持字节、数字和短划线命名,最长64个字符。消息类型包括普通、顺序、事务和定时/延时消息。消息队列是实际存储和传输消息的容器,是主题的分区。消费者分组是一组行为一致的消费者的逻辑集合,也有命名限制。此外,文档还提到了一些使用约束和建议,如主题和消费者组名的命名规则,消息大小限制,请求超时时间等。RocketMQ 提供了多种消息模型,包括发布/订阅模型,有助于理解和优化消息处理。
|
6月前
|
消息中间件 Java Kafka
RabbitMQ安装和5种不同的消息模型(BasicQueue,WorkQueue,Fanout Exchange,Direct Exchange,Topic Exchange)与SpringAMQP
RabbitMQ安装和5种不同的消息模型(BasicQueue,WorkQueue,Fanout Exchange,Direct Exchange,Topic Exchange)与SpringAMQP