RabbitMQ中的SpringAMQP(上)

简介: RabbitMQ中的SpringAMQP(上)

1、什么是SpringAMQP

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

2、简单队列模型

1、在父工程pom.xml中引入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、首先在生产者端配置MQ地址,在application.yml中添加配置:

image.png

spring:
  rabbitmq:
    host: 192.168.58.131 #rabbitMQ的ip地址
    port: 5672 #端口
    username: jie #用户名
    password: 1234 #密码
    virtual-host: / #虚拟主机

2、在生产者端编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

package com.jie.mq.helloworld.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
import java.util.Map;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void test() {
        //队列名称
        String queueName = "simple.queue";
        //消息
        String message = "hello, spring amqp!";
        //发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

点击运行后可以到浏览器看看。

image.png

3、在消费者端配置MQ地址,在application.yml中添加配置(和在生产者端一致)

4、然后在生产者服务新建一个类,代码如下:

package com.jie.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg){
        System.out.println("spring 消费者接收到消息 :【" + msg + "】");
    }
  }

5、启动消费者,查看控制台和浏览器。


image.png

3、WorkQueue 工作队列

Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

image.png

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work 模型,多个消费者共同处理消息处理,速度就能大大提高了。

3.1 模拟WorkQueue,实现一个队列绑定多个消费者

1、消息发送

这次我们循环发送,模拟大量消息堆积现象。

在生产者服务中添加一个测试方法:

/**
     * @description:向队列中不停发送消息,模拟消息堆积。
     * @author: jie 
     * @time: 2022/2/22 9:23
     */
    @Test
    public void test2() throws InterruptedException {
        //队列名称
        String queueName = "simple.queue";
        //消息
        String message = "hello, spring amqp!--";
        for (int i = 1; i < 50; i++) {
            //发送消息
            rabbitTemplate.convertAndSend(queueName, message+i);
            //休眠
            Thread.sleep(20);
        }
    }

2、消息接收

要模拟多个消费者绑定同一个队列,我们在消费者服务的中添加2个新的方法:

@RabbitListener(queues = "simple.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("spring 消费者1接收到消息 :【" + msg + "】"+ LocalTime.now());
        //休眠
        Thread.sleep(20);
    }
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
        //休眠
        Thread.sleep(200);
    }

3、测试

启动消费者后,在执行生产者服务中刚刚编写的发送测试方法InterruptedException。

可以看到消费者1很快完成了自己的25条消息。消费者2却在缓慢的处理自己的25条消息。

网络异常,图片无法展示
|

也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。

4、能者多劳

在spring中有一个简单的配置,可以解决这个问题。我们修改消费者服务的application.yml文件,添加配置:

image.png

重启消费者,再重新发送消息。

image.png

5、小结

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

4、发布/订阅

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。

发布订阅的模型如图:

image.png

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给exchange(交换机)
  • Exchange:交换机。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
  • Fanout:广播,将消息交给所有绑定到交换机的队列
  • Direct:路由,把消息交给符合指定routing key 的队列
  • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Consumer:消费者,与以前一样,订阅队列,没有变化
  • Queue:消息队列也与以前一样,接收消息、缓存消息。

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6天前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的广播交换机(SpringAMQP 实现Fanout广播交换机)
SpringBoot实现RabbitMQ的广播交换机(SpringAMQP 实现Fanout广播交换机)
12 2
|
6天前
|
消息中间件 Java
SpringBoot实现RabbitMQ的通配符交换机(SpringAMQP 实现Topic交换机)
SpringBoot实现RabbitMQ的通配符交换机(SpringAMQP 实现Topic交换机)
|
6天前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的定向交换机(SpringAMQP 实现Direct定向交换机)
SpringBoot实现RabbitMQ的定向交换机(SpringAMQP 实现Direct定向交换机)
13 1
|
6天前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的WorkQueue(SpringAMQP 实现WorkQueue)
SpringBoot实现RabbitMQ的WorkQueue(SpringAMQP 实现WorkQueue)
11 2
|
5月前
|
消息中间件 Java Kafka
RabbitMQ安装和5种不同的消息模型(BasicQueue,WorkQueue,Fanout Exchange,Direct Exchange,Topic Exchange)与SpringAMQP
RabbitMQ安装和5种不同的消息模型(BasicQueue,WorkQueue,Fanout Exchange,Direct Exchange,Topic Exchange)与SpringAMQP
|
12月前
|
消息中间件
五、RabbitMQ 之 SpringAMQP 实现 Fanout 交换机
五、RabbitMQ 之 SpringAMQP 实现 Fanout 交换机
|
11月前
|
消息中间件 JSON 缓存
RabbitMQ中的SpringAMQP(下)
RabbitMQ中的SpringAMQP(下)
105 0
|
12月前
|
消息中间件 Java Spring
八、SpringBoot整合RabbitMQ 之 SpringAMQP 消息转换器
八、SpringBoot整合RabbitMQ 之 SpringAMQP 消息转换器
|
12月前
|
消息中间件
七、RabbitMQ 之 SpringAMQP 实现 Topic 交换机
七、RabbitMQ 之 SpringAMQP 实现 Topic 交换机
|
12月前
|
消息中间件
六、RabbitMQ 之 SpringAMQP 实现 Direct 交换机
六、RabbitMQ 之 SpringAMQP 实现 Direct 交换机