RabbitMQ简介与springboot整合

简介: RabbitMQ简介与springboot整合


image.png

1.什么是消息队列

消息队列,主要解决异步消息的管理(注册后,短信发送不是必须,可以使用队列)。实现系统之间的双向解耦,同时也能起到消息缓冲,消息分发的作用。当生产者产生大量数据,而消费者无法快速消费,(秒杀数据量过大使系统崩溃,队列可以废弃多余请求),或者是消费者异常了(服务挂掉后使请求丢失,队列可以保存请求)。

说白话讲,主要作用就是异步,削峰与解耦。

1.rabbitMQ简介

1.运行流程

image.png

rabbitmq是消息队列的一种,通过上图可以看到工作流程。生产者把请求给交换机  ,交换机把请求按照一定绑定关系发送给队列(平均发送),然后队列在把请求给消费者。其中交换机只负责转发并不负责保存,然后通过绑定关系与队列相绑定。交换器按照路由键绑定队列。当多消费者消费一个队列时,队列会均匀的发送到多个消费者之中。

其中

  1. 生产者为发送消息的服务/类。
  2. 消费者是接收消息的服务/类。
  3. 交换机将接收到的消息按照交换机类型发送给队列。
  4. 未被消费的消息都被存放在队列中。

2.交换机类型

rabbitmq提供了四种交换机。

  1. fanout:发送给所有绑定该交换机的队列。
  2. Direct:默认的交换方法,按照提供的key去寻找队列。如果key为A,数据只能发送到A的队列中。
  3. Topic:模糊匹配,只要符合该格式就可以。可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。如*.C.# 可以匹配A.C.B.不能匹配A.B.C.(其中以banding key关联)
  4. head:根据消息内容中的headers属性进行匹配。

3.使用的工具类

ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。

Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

4.spring boot 整合rabbitmq

代码思路:在配置文件中定义队列(queue),交换机(exchange),然后队列与交换器以路由键名称相对应(路由键和队列名相匹配,既以路由键寻找对列名),然后生产者可以通过交换器和队列名称确定要发送的队列,而消费者选择监控队列,来获取消息。

在整合之前需要安装rabbitmq,然后启动和搭建框架。

1.Direct交换机

1.新建队列与绑定关系
@Configuration
public class RabbitMQConfig {
  // -------------------------topic队列
  // 创建队列
  @Bean
  public Queue topicQueue() {
    return new Queue("topic.mess");
  }
}
2.生产者

直接配置一个队列,然后调用API发送消息就可以了。

public class ProducerController {
  @Autowired
  private RabbitTemplate rabbitTemplate;
  @GetMapping("/sendMessage")
  public Object sendMessage() {
    new Thread(() -> {
      //for (int i = 0; i < 100; i++) {
        Date date = new Date();
        String value = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(date);
        System.out.println("send message {}" + value);
        City city= new City();
        city.setCityName("aaaa");
        city.setDescription("bbb");
        city.setProvinceId((long)111);
        rabbitTemplate.convertAndSend("topic.mess", city); //使用默认的队列
      //}
    }).start();
    return "ok";
  }
}
3.消费者

消费者直接使用就可以了(可以传对象 基本类型)。需要@RabbitListener注解。

@Component
@RabbitListener(queues = "topic.mess") //topic交换机
public class Consumer2 {
  @RabbitHandler
  public void consumeMessage(City city) {
    System.out.println("consume message {} 2222222:" + city);
  }
}

2.topic交换机

1.新建队列与绑定关系

声名两个队列和一个topic交换器,然后通过路由键绑定他们之间的关系,路由键和队列名相同就能匹配,但是topic可以模糊匹配 #可以代替一段字符。

@Configuration
public class RabbitMQConfig {
  // -------------------------topic队列
  // 创建队列
  @Bean
  public Queue topicQueue() {
    return new Queue("topic.mess");
  }
  @Bean
  public Queue topicQueue2() {
    return new Queue("topic.mess2");
  }
  // 创建 topic 类型的交换器
  @Bean
  public TopicExchange topicExchange() {
    return new TopicExchange("topic");
  }
  // 使用路由键(routingKey)把队列(Queue)绑定到交换器(Exchange) Topic交换器通过routingKey与队列绑定
  @Bean
  public Binding bindingA(Queue topicQueue, TopicExchange topicExchange) {
    return BindingBuilder.bind(topicQueue).to(topicExchange).with("topic.mess");
  }
  @Bean
  public Binding bindingB(Queue topicQueue2, TopicExchange topicExchange) {
    return BindingBuilder.bind(topicQueue2).to(topicExchange).with("topic.#");
  }
}
2.生产者

直接调用API发送消息。消费者发送到队列,因为有模糊匹配的规则,topic.mess可以匹配 topic.mess和topic.mess2队列 而topic.mess2只能匹配到topic.#。

@RestController
public class ProducerController {
  @Autowired
  private RabbitTemplate rabbitTemplate;
  @GetMapping("/sendMessage")
  public Object sendMessage() {
    new Thread(() -> {
      //for (int i = 0; i < 100; i++) {
        Date date = new Date();
        String value = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(date);
        System.out.println("send message {}" + value);
        City city= new City();
        city.setCityName("aaaa");
        city.setDescription("bbb");
        city.setProvinceId((long)111);
        rabbitTemplate.convertAndSend("topic", "topic.mess", city);
        rabbitTemplate.convertAndSend("topic", "topic.mess2", city);
      //}
    }).start();
    return "ok";
  }
}
3.消费者

消费者直接接收。

@Component
@RabbitListener(queues = "topic.mess") //topic交换机
public class Consumer2 {
  @RabbitHandler
  public void consumeMessage(City city) {
    System.out.println("consume message {} 2222222:" + city);
  }
}

3.Fanout Exchange 广播

1.新建队列与绑定关系

在配置文件中声名队列和交换器,然后绑定。

// --------FanoutExchange绑定
  // -------------------------Fanout 队列
  @Bean
  FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanoutExchange");
  }
  @Bean
  public Queue fanoutQueue() {
    return new Queue("fanoutqueue");
  }
  @Bean
  public Queue fanoutQueue2() {
    return new Queue("fanoutqueue2");
  }
  @Bean
  public Binding bindingC(Queue fanoutQueue, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
  }
  @Bean
  public Binding bindingD(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
  }
2.生产者

调用API发送消息。

@RestController
public class ProducerController {
  @Autowired
  private RabbitTemplate rabbitTemplate;
  @GetMapping("/sendMessage")
  public Object sendMessage() {
    new Thread(() -> {
      //for (int i = 0; i < 100; i++) {
        Date date = new Date();
        String value = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(date);
        System.out.println("send message {}" + value);
        City obj = new City();
        obj.setCityName("aaaa");
        obj.setDescription("bbb");
        obj.setProvinceId((long)111);
        rabbitTemplate.convertAndSend("fanoutExchange","", value); //使用默认的队列
      //}
    }).start();
    return "ok";
  }
}
3.消费者

然后接收,所有绑定队列的都可以接收到。

@Component
@RabbitListener(queues = "fanoutqueue2")
public class Consumer {
  @RabbitHandler
  public void consumeMessage(String message) {
    System.out.println("consume message {} 1111111:" + message);
  }
}

5.页面新建队列

还可以在http://localhost:15672/#/exchanges中直接配置队列和交换机绑定关系。(略微有点麻烦,不建议使用)

1.新建队列

image.png

2.新建交换机

image.png

3.新增绑定关系

绑定交换器和队列之间的关系 ,然后就可以直接使用了,并不需要java内部声名使用。

image.png


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 Java 网络架构
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
38 6
|
7月前
|
消息中间件 Java RocketMQ
消息队列 MQ产品使用合集之当SpringBoot应用因网络不通而启动失败时,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
JavaScript NoSQL Java
CC-ADMIN后台简介一个基于 Spring Boot 2.1.3 、SpringBootMybatis plus、JWT、Shiro、Redis、Vue quasar 的前后端分离的后台管理系统
CC-ADMIN后台简介一个基于 Spring Boot 2.1.3 、SpringBootMybatis plus、JWT、Shiro、Redis、Vue quasar 的前后端分离的后台管理系统
62 0
|
5月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
5月前
|
消息中间件 安全 物联网
RabbitMQ的人生简介
8月更文挑战第26天
|
5月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
1086 3
|
5月前
|
消息中间件 Java Maven
|
6月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
419 1
|
6月前
|
消息中间件 Java 数据安全/隐私保护
Spring Boot与RabbitMQ的集成
Spring Boot与RabbitMQ的集成
下一篇
开通oss服务