1、什么是SpringAMQP
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
2、简单队列模型
1、在父工程pom.xml中引入依赖
<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2、首先在生产者端配置MQ地址,在application.yml中添加配置:
spring: rabbitmq: host: 192.168.58.131 #rabbitMQ的ip地址 port: 5672 #端口 username: jie #用户名 password: 1234 #密码 virtual-host: / #虚拟主机
2、在生产者端编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:
package com.jie.mq.helloworld.spring; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.HashMap; import java.util.Map; @RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void test() { //队列名称 String queueName = "simple.queue"; //消息 String message = "hello, spring amqp!"; //发送消息 rabbitTemplate.convertAndSend(queueName, message); } }
点击运行后可以到浏览器看看。
3、在消费者端配置MQ地址,在application.yml中添加配置(和在生产者端一致)
4、然后在生产者服务新建一个类,代码如下:
package com.jie.mq.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg){ System.out.println("spring 消费者接收到消息 :【" + msg + "】"); } }
5、启动消费者,查看控制台和浏览器。
3、WorkQueue 工作队列
Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,速度就能大大提高了。
3.1 模拟WorkQueue,实现一个队列绑定多个消费者
1、消息发送
这次我们循环发送,模拟大量消息堆积现象。
在生产者服务中添加一个测试方法:
/** * @description:向队列中不停发送消息,模拟消息堆积。 * @author: jie * @time: 2022/2/22 9:23 */ @Test public void test2() throws InterruptedException { //队列名称 String queueName = "simple.queue"; //消息 String message = "hello, spring amqp!--"; for (int i = 1; i < 50; i++) { //发送消息 rabbitTemplate.convertAndSend(queueName, message+i); //休眠 Thread.sleep(20); } }
2、消息接收
要模拟多个消费者绑定同一个队列,我们在消费者服务的中添加2个新的方法:
@RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("spring 消费者1接收到消息 :【" + msg + "】"+ LocalTime.now()); //休眠 Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now()); //休眠 Thread.sleep(200); }
3、测试
启动消费者后,在执行生产者服务中刚刚编写的发送测试方法InterruptedException。
可以看到消费者1很快完成了自己的25条消息。消费者2却在缓慢的处理自己的25条消息。
也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。
4、能者多劳
在spring中有一个简单的配置,可以解决这个问题。我们修改消费者服务的application.yml文件,添加配置:
重启消费者,再重新发送消息。
5、小结
Work模型的使用:
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量
4、发布/订阅
发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。
发布订阅的模型如图:
可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
- Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给exchange(交换机)
- Exchange:交换机。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:路由,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- Consumer:消费者,与以前一样,订阅队列,没有变化
- Queue:消息队列也与以前一样,接收消息、缓存消息。
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!