springcloud:RabbitMQ快速上手(二)

简介: 上一章咱们讲解了什么是消息队列,已经为什么使用消息队列。并且阐述了我们入门选用RabbitMQ的原因。同时为了践行我们“先讲核心,快速入门,循环学习,深入原理”的原则,我们先将RabbitMQ核心操作讲解,让大家能够快速上手RabbitMQ,能够在工作中直接应用上,后续我们再来补讲其中的原理和常见面试题

0. 引言

上一章咱们讲解了什么是消息队列,已经为什么使用消息队列。并且阐述了我们入门选用RabbitMQ的原因。

同时为了践行我们“先讲核心,快速入门,循环学习,深入原理”的原则,我们先将RabbitMQ核心操作讲解,让大家能够快速上手RabbitMQ,能够在工作中直接应用上,后续我们再来补讲其中的原理和常见面试题

1. RabbitMQ介绍

rabbitmq是实现了高级消息队列协议(AMQP)的开源消息代理软件,使用erlang语言编写。erlang语言是一种面向并发的编程语言,所以其并发量远远大于java,因此rabbitmq也是所有的mq中延迟最低的,延迟在毫秒级,这也是其最大的一个特点。

所谓AMQP协议,就是生产者先奖消息发送到交换机上,通过路由规则再发送到指定的队列上,这个队列被一个或者多个消费者监听,队列中出现消息就会被消费者监听到并且消费掉。这样的一个流程就是AMQP协议

如果不清楚什么是生产者,消费者的可以先看看上一篇博客。
在这里插入图片描述

看到这里我们会发现,和我们上一章中讲到的消息队列结构,其实呢就多了一个交换机。消息不再是直接发送到队列中,而是先发送到交换机再根据设定的规则转发到对应的队列上。
在这里插入图片描述
所以AMQP的核心思想就是生产者并不直接将消息发送到队列中,而是由交换机来做消息到队列的投递。通过引入交换机,且支持不同的交换机类型,从而来支撑更加复杂的业务场景,rabbitmq一共有四种交换机类型

1.1 RabbitMQ的四种交换机类型

我们在介绍4种交换机类型前,先要将队列与交换机绑定的流程梳理清楚

我们上述也讲到了交换机通过指定的路由规则将消息转发到对应的队列上。这个所谓的路由规则是什么呢?

实际上就是一个routingKey,我们在创建队列的时候,会将队列和交换机绑定在一起,同时指定bindingKey,比如队列A与交换机Exchange绑定,绑定时指定好bindingKey是xxx。

后续发送消息时,会声明routingKey,这样交换机就会将消息发送到bindingKey对应routingKey的队列上。这里的对应可以是bindingKey等于routingKey,也可以是bindingKey包含routingKey,这个后续会详细说明。

1.1.1 直连交换机 Direct exchange

直连交换机就是将消息推送到bindingKey等于routingKey的队列上的交换机。

如下图所示,绑定我们设置了queue1的bindingKey为xxx,queue2的bindingKey为yyy和zzz。当routingKey为xxx时会发送到queue1上,当routingKey为yyy或者zzz时,发送到queue2上。

需要注意的是,一个queue不是只能设置一个bindingKey。
在这里插入图片描述

1.1.2 主题交换机 Topic exchange

主题交换机是将消息发送到bindingKey匹配routingKey的队列上的交换机

而主题模式中,bindingKey支持*#正则匹配,其中

*表示可以匹配一个词,比如xxx.*,那么就可以匹配到xxx.yyy,但不能匹配到xxx.yyy.zzz
#表示可以匹配到多个词,比如xxx.#,那么就可以匹配到xxx.yyy以及xxx.yyy.zzz
不同分词之间用 .隔开

如下图所示,当routingKey为aaa.xxx.bbb时会转发到queue1上,当routingKey为aaa-yyy或者z.xxx.yy.zz时消息会转发到queue2上
在这里插入图片描述

1.1.3 扇形交换机 Fanout exchange

扇形交换机就比较好理解了,说的通俗点就是广播模式,只要绑定到交换机上的队列,交换机就会将消息转发到所有绑定的队列上。

也就是扇形交换机是不看bindingKey的,只要有消息就广播全发,因为不用考虑路由规则了,所以扇形交换机转发消息是最快的。
在这里插入图片描述

1.1.4 请求头交换机 Headers exchange

请求头交换机与直接交换机类似,只是请求头交换机不使用bindingKey与routingKey匹配。那么它怎么找到对应的队列呢?

所谓请求头交换机,就体现在这个请求头上,它通过匹配请求头header来实现消息转发。

请求头交换绑定的队列需要声明至少一个key-value,这个value可以是字符串也可以是对象,另外还需要声明一个x-match属性,该属性有两种值:all和any

all表示消息header中的key-value要与队列中声明的所有key-value匹配才转发
any表示消息header中的key-value要与队列中声明的任意一个key-value匹配就转发

如下图所示,当消息中的headers中声明了key为key1,value为xxx。所以只能匹配队列中key有key1的,下图案例中虽然两个队列都有key1,但是queue1要求必须要匹配key1-xxx,key3-Object才能转发,而消息中没有key3-Object,所以无法匹配queue1(这里的Object表示自定义的对象)

queue2中的规则是key1-xxx,key2-Object匹配到一个即可转发,所以消息会转发到queue2中
在这里插入图片描述

1.2 RabbitMQ提供的七种通讯方式

RabbitMQ中提供了七种通讯方式(消息模型),可以在官方文档中查看到

1.2.1 测试模式 Hello World

第一种是最简单的一种模式,也是官方用来给大家学习测试用的,所以其模型名称也够直接--'Hello World'。就像我们学习一门语言时的第一句话是hello world,我们学习RabbitMQ也可以通过Hello World模型来入门
在这里插入图片描述

这种模型只需要创建一个队列,然后生产者向该队列发送消息即可,这个过程不需要我们创建交换机,但是不代表着就不需要交换机了,RabbitMQ提供了一个默认交换机,当我们使用Hello World模式时,其消息就会发送到默认交换机中

hello world模式的bindingKey会设置为队列名,routingKey也会声明为队列名,也就意味着默认交换器类型其实是直接交换机Direct Exchange

下面我们演示下springboot项目下实现该模式的代码片段,后续我们会从零带大家实现这些代码

以下代码需要引入amqp依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 声明队列:在配置类中声明queue bean,并在该bean中创建队列,可以看到直接队列是不需要绑定交换机的,后续消息会先发送到默认交换机中,然后再转发到该消息队列上
/**
 * RabbitMQ常量池
 * 为了统一管理队列名称,交换机名称等,将rabbitmq的相关名称放到接口类中
 *
 * @author whx
 * @date 20220414
 */
public interface RabbitConstant {

    String HELLO_WORLD_QUEUE = "hello.world.queue";

}

@Configuration
public class RabbitConfig {

    @Bean
    public Queue helloWorldQueue() {
       //  创建队列
        return new Queue(RabbitConstant.HELLO_WORLD_QUEUE); 
    } 
}
  • 生产消息:通过调用RabbitTemplate的convertAndSend方法来发送消息到指定队列中
   @GetMapping("sendHelloWorld")
    public String sendHelloWorld(){
        rabbitTemplate.convertAndSend(RabbitConstant.HELLO_WORLD_QUEUE,"test message");
        return "发送成功";
    }
  • 消费消息:

(1)创建消息处理类,添加注解@Component将类声明为Bean
(2)创建消息处理方法,该方法一般有三个参数,也可以只保留第一个参数:

 >第一个参数:Object data,自定义的对象,也就是我们传递的消息,比如上述传递的是个字符串那么这里的对象类型就是String
  >第二个参数:Message message,消息的原型对象,利用它还可以获取到消息发送时设置的相关参数,这个后续我们会在代码中演示
  >第三个参数:Channel channel,接收消息所在的channel,所谓channel,及信道,就是消费者与队列之间的通道。

(3)在方法上声明监听的队列名称@RabbitListener(queues = "queue_name")

@Component
public class HelloWorldQueueListener {
    
    @RabbitListener(queues = RabbitConstant.HELLO_WORLD_QUEUE)
    public void handler(String messageInfo, Message message, Channel channel) {
      
        System.out.println("接收的消息:"+messageInfo);

    }
    
}

1.2.2 工作队列模式 Work queues

工作队列模式,我将其称之为轮询模式,默认情况下,队列会将消息以轮询的方式发送给不同的消费者,一个消息只会被一个消费者成功的消费。

那么队列如何保证这一点呢?消费者拿到消息之后,需要给rabbitmq一个ack,所谓ack大家先简单的将其理解为一个回执信息,mq在接收到ack后,rabbitmq会认为消费者已经成功收到消息了。

需要注意的是,这里Wrok模式使用的也是默认交换机
在这里插入图片描述

  • 声明队列
String WORK_QUEUE = "work.queue";

@Bean
public Queue workQueue(){
        return new Queue(RabbitConstant.WORK_QUEUE);
}
  • 生产消息
    @GetMapping("sendWork")
    public String sendWork(){
        rabbitTemplate.convertAndSend(RabbitConstant.WORK_QUEUE,"test message");
        return "发送成功";
    }
  • 消费消息:声明了两个消费者
    @RabbitListener(queues = RabbitConstant.WORK_QUEUE)
    public void workListener1(String messageInfo, Message message, Channel channel){
        System.out.println("消费者1接收的消息:"+messageInfo);
    }

    @RabbitListener(queues = RabbitConstant.WORK_QUEUE)
    public void workListener2(String messageInfo, Message message, Channel channel){
        System.out.println("消费者2接收的消息:"+messageInfo);
    }

测试发现其打印是轮询形式的
在这里插入图片描述

1.2.3 发布订阅模式 Publish/Subscribe

发布订阅模式,看到这个名称是不是想到点什么?我们上述讲解了一种扇形交换机(Fanout Exchange)就是用来做广播的,发布订阅不就是广播模式转换过来嘛。

消费者订阅指定的队列,有消息时Fanout Exchange会发布到所有的队列中,这样订阅的消费者就都能收到消息了。

如工作原理如下图所示。因为Fanout交换机,会将所有消息转发到所有绑定的队列上,所以也不用指定routingKey了
在这里插入图片描述

  • 声明队列

(1)创建队列A,B
(2)创建Fanout交换机
(3)将队列A,B绑定到交换机

String FANOUT_QUEUE_A = "fanout.queue.a";

String FANOUT_QUEUE_B = "fanout.queue.b";

String FANOUT_EXCHANGE = "fanout.exchange";

 @Bean
    public Queue fanoutQueueA(){
        return new Queue(RabbitConstant.FANOUT_QUEUE_A);
    }

    @Bean
    public Queue fanoutQueueB(){
        return new Queue(RabbitConstant.FANOUT_QUEUE_B);
    }

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(RabbitConstant.FANOUT_EXCHANGE);
    }

    @Bean
    public Binding fanoutQueueABing(Queue fanoutQueueA, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueueA).to(fanoutExchange);
    }

    @Bean
    public Binding fanoutQueueBBing(Queue fanoutQueueB, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueueB).to(fanoutExchange);
    }
  • 生产消息:因为是广播模式,我们就需要向Exchange发送,但是不用指定routingKey,所以我们直接指定为空字符串
@GetMapping("sendFanout")
    public String sendFanout(){
        rabbitTemplate.convertAndSend(RabbitConstant.FANOUT_EXCHANGE,"","test message");
        return "发送成功";
    }
  • 消费消息:两个消费者分别监听两个队列
    @RabbitListener(queues = RabbitConstant.FANOUT_QUEUE_A)
    public void fanoutListener1(String messageInfo, Message message, Channel channel){
        System.out.println("消费者A接收的消息:"+messageInfo);
    }

    @RabbitListener(queues = RabbitConstant.FANOUT_QUEUE_B)
    public void fanoutListener2(String messageInfo, Message message, Channel channel){
        System.out.println("消费者B接收的消息:"+messageInfo);
    }

我们测试发送消息后,发现两个队列都同时收到消息了
在这里插入图片描述

1.2.4 路由模式 Routing

路由模式,与发布订阅模式不同的是,路由模式使用的交换机是Direct类型的,并且通过设置不同的bindingKey,同时在发送消息时要指定routingKey,以此通过routingKey与bindingKey匹配起来,从而将消息路由到指定的队列上
在这里插入图片描述

  • 声明队列

(1)创建队列A,B
(2)创建Fanout交换机
(3)将队列A,B绑定到交换机,并且声明队列A的bindingKey为'routing.key.a',队列B的bindingKey为'routing.key.b'和'routing.key.c'

    String ROUTING_QUEUE_A = "routing.queue.a";

    String ROUTING_QUEUE_B = "routing.queue.b";

    String ROUTING_EXCHANGE = "routing.exchange";
    
    String ROUTING_KEY_A = "routing.key.a";
    
    String ROUTING_KEY_B = "routing.key.b";
    
    String ROUTING_KEY_C = "routing.key.c";

    @Bean
    public Queue routingQueueA(){
        return new Queue(RabbitConstant.ROUTING_QUEUE_A);
    }

    @Bean
    public Queue routingQueueB(){
        return new Queue(RabbitConstant.ROUTING_QUEUE_B);
    }

    @Bean
    public DirectExchange routingExchange(){
        return new DirectExchange(RabbitConstant.ROUTING_EXCHANGE);
    }

    @Bean
    public Binding routingQueueABing(Queue routingQueueA, DirectExchange routingExchange){
        return BindingBuilder.bind(routingQueueA).to(routingExchange).with(RabbitConstant.ROUTING_KEY_A);
    }

    @Bean
    public Binding routingQueueBBing(Queue routingQueueB, DirectExchange routingExchange){
        return BindingBuilder.bind(routingQueueB).to(routingExchange).with(RabbitConstant.ROUTING_KEY_B);
    }

    @Bean
    public Binding routingQueueCBing(Queue routingQueueB, DirectExchange routingExchange){
        return BindingBuilder.bind(routingQueueB).to(routingExchange).with(RabbitConstant.ROUTING_KEY_C);
    }
  • 生产消息
    @GetMapping("sendRouting")
    public String sendRouting(){
        rabbitTemplate.convertAndSend(RabbitConstant.ROUTING_EXCHANGE,RabbitConstant.ROUTING_KEY_A,"发送给队列A的消息");
        rabbitTemplate.convertAndSend(RabbitConstant.ROUTING_EXCHANGE,RabbitConstant.ROUTING_KEY_B,"发送给队列B的消息");
        rabbitTemplate.convertAndSend(RabbitConstant.ROUTING_EXCHANGE,RabbitConstant.ROUTING_KEY_C,"还是发送给队列B的消息");
        return "发送成功";
    }
  • 消费消息
@RabbitListener(queues = RabbitConstant.ROUTING_QUEUE_A)
    public void routingListener1(String messageInfo, Message message, Channel channel){
        System.out.println("消费者A接收的消息:"+messageInfo);
    }

    @RabbitListener(queues = RabbitConstant.ROUTING_QUEUE_B)
    public void routingListener2(String messageInfo, Message message, Channel channel){
        System.out.println("消费者B接收的消息:"+messageInfo);
    }

测试发送消息,从测试结果来看消息路由成功
在这里插入图片描述

1.2.5 主题模式 Topics

主题模式,与路由模式相同的是主题模式也需要指定bindingKey,但是不同的是其bindingKey支持*,#的正则匹配,使用的也正是我们上述讲到的Topic交换机

其工作原理如下图所示
在这里插入图片描述

  • 声明队列
    String TOPIC_QUEUE_A = "topic.queue.a";

    String TOPIC_QUEUE_B = "topic.queue.b";

    String TOPIC_EXCHANGE = "topic.exchange";

    String TOPIC_KEY_A = "*.xxx.*";

    String TOPIC_KEY_B = "yyy.#";

    String TOPIC_KEY_C = "zzz.*";

    @Bean
    public Queue topicQueueA(){
        return new Queue(RabbitConstant.TOPIC_QUEUE_A);
    }

    @Bean
    public Queue topicQueueB(){
        return new Queue(RabbitConstant.TOPIC_QUEUE_B);
    }

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(RabbitConstant.TOPIC_EXCHANGE);
    }

    @Bean
    public Binding topicQueueABing(Queue topicQueueA, TopicExchange topicExchange){
        return BindingBuilder.bind(topicQueueA).to(topicExchange).with(RabbitConstant.TOPIC_KEY_A);
    }

    @Bean
    public Binding topicQueueBBing(Queue topicQueueB, TopicExchange topicExchange){
        return BindingBuilder.bind(topicQueueB).to(topicExchange).with(RabbitConstant.TOPIC_KEY_B);
    }

    @Bean
    public Binding topicQueueCBing(Queue topicQueueB, TopicExchange topicExchange){
        return BindingBuilder.bind(topicQueueB).to(topicExchange).with(RabbitConstant.TOPIC_KEY_C);
    }
  • 生产消息:这里创建各种形式的routingKey,大家可以体会这之间的区别
    @GetMapping("sendTopic")
    public String sendTopic(){
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE,"aaa.xxx.bbb","发送给队列A的消息1");
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE,"xxx.bbb","发送给队列A的消息2");
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE,"aaa.xxx","发送给队列A的消息3");
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE,"aaa.aaa.xxx","发送给队列A的消息4");
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE,"aaa@xxx@bbb","发送给队列A的消息5");
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE,"aaa!xxx!bbb","发送给队列A的消息6");
        
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE,"yyy.aaa","发送给队列B的消息1");
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE,"yyy.aaa.bbb","发送给队列B的消息2");
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE,"bbb.yyy.aaa","发送给队列B的消息3");
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE,"bbb.yyy","发送给队列B的消息4"); 
        
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE,"zzz.aaa","还是发送给队列B的消息5");
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE,"zzz.aaa.bbb","还是发送给队列B的消息6");
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE,"aaa.zzz","还是发送给队列B的消息7");
        return "发送成功";
    }
  • 消费消息
    @RabbitListener(queues = RabbitConstant.TOPIC_QUEUE_A)
    public void topicListener1(String messageInfo, Message message, Channel channel){
        System.out.println("消费者A接收的消息:"+messageInfo);
    }

    @RabbitListener(queues = RabbitConstant.TOPIC_QUEUE_B)
    public void topicListener2(String messageInfo, Message message, Channel channel){
        System.out.println("消费者B接收的消息:"+messageInfo);
    }

测试发送消息,通过接收结果也验证了我们之前的说明:

*表示可以匹配一个词,比如xxx.*,那么就可以匹配到xxx.yyy,但不能匹配到xxx.yyy.zzz
#表示可以匹配到多个词,比如xxx.#,那么就可以匹配到xxx.yyy以及xxx.yyy.zzz
不同分词之间用 .隔开

在这里插入图片描述

1.2.6 组间调用模式 RPC

组间调用模式,这个模式下就没有生产者消费者的概念了,只有服务端和客户端,或者我们理解为调用端和被调用端

这个模式下有两个队列,一个队列用来盛装server对client的调用,一个队列用来盛装client对server的调用

但是这种模式的性能并不佳,实际工作中很少使用,所以我们这里也不多讲解,大家了解即可,现在组间调用多用feign组件来实现,如果不了解这个组件的可以查看专栏往期博客
在这里插入图片描述

1.2.7 可靠模式 Publisher Confirms

可靠模式,所谓可靠模式就是保证消息可靠性的实现方法,主要是通过实现队列持久化、消息持久化、重试机制等来实现的

这个模式的代码实现,我们将在下一节的RabbitMQ讲解中带大家一起书写

因为篇幅关系,本期讲解我们就到这里了,下期我们将结合springboot来向大家展示RabbitMQ的使用。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件
SpringCloud Stream集成RabbitMQ
SpringCloud Stream集成RabbitMQ
66 0
|
6月前
|
消息中间件 Java Maven
微服务技术系列教程(34) - SpringCloud-使用RabbitMQ实现消息驱动
微服务技术系列教程(34) - SpringCloud-使用RabbitMQ实现消息驱动
171 0
|
12天前
|
消息中间件 Java RocketMQ
Spring Cloud RocketMQ:构建可靠消息驱动的微服务架构
【4月更文挑战第28天】消息队列在微服务架构中扮演着至关重要的角色,能够实现服务之间的解耦、异步通信以及数据分发。Spring Cloud RocketMQ作为Apache RocketMQ的Spring Cloud集成,为微服务架构提供了可靠的消息传输机制。
27 1
|
2月前
|
消息中间件 存储 NoSQL
【Redis项目实战】使用Springcloud整合Redis分布式锁+RabbitMQ技术实现高并发预约管理处理系统
【Redis项目实战】使用Springcloud整合Redis分布式锁+RabbitMQ技术实现高并发预约管理处理系统
|
5月前
|
消息中间件 监控 Java
【Spring Cloud + RabbitMQ 实现分布式消息总线】—— 每天一点小知识
【Spring Cloud + RabbitMQ 实现分布式消息总线】—— 每天一点小知识
104 0
|
7月前
|
消息中间件 安全 Java
24SpringCloud - Spring Cloud Bus 消息总线集成(RabbitMQ)
24SpringCloud - Spring Cloud Bus 消息总线集成(RabbitMQ)
54 0
|
10月前
java初中级面试题(SSM+Mysql+微服务(SpringCloud+Dubbo)+消息队列(RocketMQ)+缓存(Redis+MongoDB)+设计模式+搜索引擎(ES)+JVM
java初中级面试题(SSM+Mysql+微服务(SpringCloud+Dubbo)+消息队列(RocketMQ)+缓存(Redis+MongoDB)+设计模式+搜索引擎(ES)+JVM
461 0
|
10月前
java初中级面试题(SSM+Mysql+微服务(SpringCloud+Dubbo)+消息队列(RocketMQ)+缓存(Redis+MongoDB)+设计模式+搜索引擎(ES)+JVM
java初中级面试题(SSM+Mysql+微服务(SpringCloud+Dubbo)+消息队列(RocketMQ)+缓存(Redis+MongoDB)+设计模式+搜索引擎(ES)+JVM
597 0
|
10月前
java初中级面试题(SSM+Mysql+微服务(SpringCloud+Dubbo)+消息队列(RocketMQ)+缓存(Redis+MongoDB)+设计模式+搜索引擎(ES)+JVM
java初中级面试题(SSM+Mysql+微服务(SpringCloud+Dubbo)+消息队列(RocketMQ)+缓存(Redis+MongoDB)+设计模式+搜索引擎(ES)+JVM
656 0
|
11月前
|
消息中间件 RocketMQ 微服务
SpringCloud基于RocketMQ实现分布式事务(文末送书)
SpringCloud基于RocketMQ实现分布式事务(文末送书)
512 0