SpringBoot中使用RabbitMQ

简介: 🍅程序员小王的博客:程序员小王的博客🍅 欢迎点赞 👍 收藏 ⭐留言 📝🍅 如有编辑错误联系作者,如果有比较好的文章欢迎分享给我,我会取其精华去其糟粕🍅java自学的学习路线:java自学的学习路线

0.png

一、搭建环境

1.png


1、引入环境依赖

 

   

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.5.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

 


2、配置配置文件(properties)

server.port=9090
server.servlet.context-path=/rabbitMQ
#设置RabbitMQ的IP地址
spring.rabbitmq.host=192.168.5.128
#设置rabbitmq服务器连接端口
spring.rabbitmq.port=5672
#设置rabbitmq服务器虚拟主机
spring.rabbitmq.virtual-host=yingxue
#设置rabbitmq服务器用户名
spring.rabbitmq.username=whj
#设置rabbitmq服务器密码
spring.rabbitmq.password=563135


3、测试rabbitTemplate

  @Test
    public void testConnection(){
        //结果: org.springframework.amqp.rabbit.core.RabbitTemplate@2dfe5525
        System.out.println(rabbitTemplate);
    }
}


二、hello world模型

2.png


在上图的模型中,有以下概念:


P(Provider):生产者,也就是要发送消息的程序

C(Consumer):消费者:消息的接收者,会一直等待消息到来。

queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。


1、开发生产者

/**
 * @author 王恒杰
 * @date 2022/1/23 14:10
 * @Description:
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqApplication.class)
public class Provider {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * Hello模型的生产者
     */
    @Test
    public void TestProviderHello(){
    //    发送消息 参数:             队列名称   发送内容
        rabbitTemplate.convertAndSend("hello_王恒杰","Hello模型发送信息");
        System.out.println("发送成功");
    }
}


2、开发消费者

@Component
//               Declare:声明              队列名               队列持久化 durable:持久耐用  自动删除
@RabbitListener(queuesToDeclare = @Queue(value = "hello_王恒杰",durable = "true",autoDelete = "true"))
public class HelloConsumer {
    /**
     * Handler:处理者
     * @param message
     */
    @RabbitHandler
    public void HelloConsumer(String message){
        System.out.println("消费者:"+message);
    }
}


4、测试结果

3.png


三、work模型

work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会


被重复执行的。


4.png


角色:


P:生产者:任务的发布者

C1:消费者-1,领取任务并且完成任务,睡眠1000毫秒

C2:消费者-2:领取任务并完成任务


1、开发生产者

 

  /**
     * Work模型的生产者
     */
    @Test
    public void  TestProviderWork(){
        for (int i = 1; i <= 21; i++) {
        //    发送消息 参数:队列名称 发送内容
            rabbitTemplate.convertAndSend("work_王恒杰"," Work模型发送王恒杰这个消息"+i+"次!");
        }
        System.out.println("发送成功");
    }


2、开发消费者

  /**
     * Work模型的生产者
     */
    @Test
    public void  TestProviderWork(){
        for (int i = 1; i <= 21; i++) {
        //    发送消息 参数:队列名称 发送内容
            rabbitTemplate.convertAndSend("work_王恒杰"," Work模型发送王恒杰这个消息"+i+"次!");
        }
        System.out.println("发送成功");
    }


3、测试结果公平调度

说明:默认在Spring AMQP实现中Work这种方式就是公平调度,如果需要实现能者多劳需要额外配置


5.png


4、能者多劳实现

能者多劳配置以下配置


#能者多劳配置以下配置    每个消费者每次可以消费一个
spring.rabbitmq.listener.simple.prefetch=1


6.png


四、Fanout 广播模型

7.png


在广播模式下,消息发送流程是这样的:


可以有多个消费者

每个消费者有自己的queue(队列)

每个队列都要绑定到Exchange(交换机)

生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。

交换机把消息发送给绑定过的所有队列

队列的消费者都能拿到消息。实现一条消息被多个消费者消费


1、开发生产者

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqApplication.class)
public class Provider {
    private static final Logger log = LoggerFactory.getLogger(Provider.class);
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * Fanout广播模型的生产者
     */
    @Test
    public void  TestProviderFanout(){
        //发送消息 参数:交换机名字,路由key 发送内容
        rabbitTemplate.convertAndSend("fount2022","","SpringBoot fanout 广播类发送信息");
        log.warn("发送成功");
    }



2、开发消费者

@Component
public class FanoutConsumer {
    private static final Logger log = LoggerFactory.getLogger(FanoutConsumer.class);
    /**
     * 消费者1:
     * receive:接收
     * bindings配置绑定 @QueueBinding(
     * value队列@Queue,
     * exchange交换机@Exchange(name交换机名 key路由key type交换机类型)
     * durable="是否持久化"
     * autoDelete="是否自动删除")
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,//创建临时队列
            exchange = @Exchange(name = "fount2022", type = "fanout")))
    public void receive1(String message) {
        log.warn("message1=" + message);
    }
    /**
     * 消费者2
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(name = "fount2022", type = "fanout")))
    public void receive2(String message) {
        log.warn("message2=" + message);
    }
}


3、测试结果

8.png


五、Route 路由模型

9.png


图解:


P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

C1:消费者,其所在队列指定了需要routing key 为 error 的消息

C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息


1、开发生产者

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqApplication.class)
public class Provider {
    private static final Logger log = LoggerFactory.getLogger(Provider.class);
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * Route 路由模型的生产者  direct直接的
     */
    @Test
    public void testProviderRoute() {
        rabbitTemplate.convertAndSend("route_direct_王恒杰",
                                      "error",
                                   "王恒杰通过路由直接发送发送错误的日志信息error");
        log.warn("发送成功");
    }
    }

2、开发消费者

@Component
public class RedirectConsumer {
    private static final Logger log = LoggerFactory.getLogger(RedirectConsumer.class);
    /**
     * 消费者1
     * @param message
     */
    @RabbitListener(bindings = {@QueueBinding(
            value = @Queue,
            key = { "info","warn"},
            exchange = @Exchange(type = "direct", name = "route_direct_王恒杰"))})
    public void RedirectConsumer1(String message) {
        log.warn("message1=" + message);
    }
    /**
     * 消费者2
     * @param message
     */
    @RabbitListener(bindings = {@QueueBinding(
            value = @Queue,
            key = {"error", "warn"},
            exchange = @Exchange(type = "direct", name = "route_direct_王恒杰"))})
    public void RedirectConsumer2(String message) {
        log.warn("message2=" + message);
    }
}

3、测试结果

消费者1和消费者2都含有error,则都打印信息

10.png


消费者1没有error,消费者2含有error,则消费者2打印信息

11.png


六、Topic 订阅模型(动态路由模型)

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert


13.png


通配符

# 通配符
  * (star) can substitute for exactly one word.    匹配不多不少恰好1个词
  # (hash) can substitute for zero or more words.  匹配一个或多个词
audit.*   只能匹配 audit.irs
audit.#   可匹配audit.irs.corporate 或者 audit.irs 等


1、开发生产者

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqApplication.class)
public class Provider {
    private static final Logger log = LoggerFactory.getLogger(Provider.class);
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testProviderTopic(){
        rabbitTemplate.convertAndSend("topics","user.save.findAll","用户查询所有");
        log.warn("发送成功");
    }
}


2、开发消费者

@Component
public class TopicConsumer {
    private static final Logger log = LoggerFactory.getLogger(TopicConsumer.class);
    @RabbitListener(bindings = {@QueueBinding(
            value = @Queue,
            //key = {"user.*"},   如果是user.* 则消费者1不能打印信息
            key = {"user.save.*"},  //如果是user.save.* 这可以打印
            exchange = @Exchange(type = "topic",name = "topics")
    )})
    public void TopicConsumer1(String message) {
        log.warn("message1=" + message);
    }
    @RabbitListener(bindings = {@QueueBinding(
            value = @Queue,
            key = {"user.#"},  //如果是user.# 可以匹配user.save 也可以匹配user.save.findAll
            exchange = @Exchange(type = "topic",name = "topics")
    )})
    public void TopicConsumer2(String message) {
        log.warn("message2=" + message);
    }
}


3、测试结果

如果是user.* 则消费者1不能打印信息 如果是user.save.* 这可以打印

如果是user.# 可以匹配user.save 也可以匹配user.save.findAll

14.png

如果是user.save.* 这可以打印

15.png


七、MQ的应用场景

注:MQ的应用场景参考CSDN某博主


1、异步处理

场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1.串行的方式 2.并行的方式


串行方式: 将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.

20.png


并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。

21.png


消息队列:假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并行已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回.

引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理

22.png

由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。


2、应用解耦

场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.


23.png


这种做法有一个缺点:


当库存系统出现故障时,订单就会失败。 订单系统和库存系统高耦合. 引入消息队列


24.png


订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

库存系统:订阅下单的消息,获取下单消息,进行减库存操作。 就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失.


3、流量削峰

场景: 秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。


作用:

1.可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^)


2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)


25.png


1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.


2.秒杀业务根据消息队列中的请求信息,再做后续处理.


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 Java 网络架构
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
36 6
|
5月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
5月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
1044 3
|
5月前
|
消息中间件 Java Maven
|
6月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
412 1
|
6月前
|
消息中间件 Java 数据安全/隐私保护
Spring Boot与RabbitMQ的集成
Spring Boot与RabbitMQ的集成
|
6月前
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成
|
消息中间件 Linux
centos7 yum快速安装rabbitmq服务
centos7 yum快速安装rabbitmq服务
243 0
|
消息中间件 中间件 微服务
RabbitMQ 入门简介及安装
RabbitMQ 入门简介及安装
130 0