1 RabbitMQ介绍
1.1 各个组件的层级关系
1.2 RabbitMQ的六种模式
2 Spring Boot整合RabbitMQ
2.1 RabbitMQ配置
2.1.1 添加用户
2.1.2 添加虚拟机
2.1.3 分配权限
2.1.4 添加队列
2.2 Spring Boot工程搭建
2.2.1 依赖
<!-- spring boot--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- rabbitmq amqp --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- test--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
2.2.2 配置文件
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: test password: 123 virtual-host: /test
2.3 各个模式测试
2.3.1 HelloWorld
生产者:
@Component public class Test01 { /** * 注入rabbitTemplate */ @Autowired private RabbitTemplate rabbitTemplate; /** * Hello world */ public void testHello() { /** * 参数1: 消息队列的名字 * 参数2: 发送的消息 */ rabbitTemplate.convertAndSend("sp_test", "hello world"); } }
消费者:
@Component public class Consumer { /** * 一个消费者 * * @param message */ @RabbitListener(queuesToDeclare = @Queue("sp_test")) public void receive1(String message) { System.out.println("message1 = " + message); } }
测试:
@SpringBootTest class SpRabbitmqApplicationTests { @Autowired private Test01 test01; @Test void producerTest01() { test01.testHello(); } }
2.3.2 Work Queues
生产者:
@Component public class Test01 { /** * 注入rabbitTemplate */ @Autowired private RabbitTemplate rabbitTemplate; /** * Work */ public void testWork() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("sp_test", "work模型" + i); } } }
消费者:
@Component public class Consumer { /** * 一个消费者 * * @param message */ @RabbitListener(queuesToDeclare = @Queue("sp_test")) public void receive1(String message) { System.out.println("message1 = " + message); } /** * 一个消费者 * * @param message */ @RabbitListener(queuesToDeclare = @Queue("sp_test")) public void receive2(String message) { System.out.println("message2 = " + message); } }
测试:
@Component public class Consumer { /** * 一个消费者 * * @param message */ @RabbitListener(queuesToDeclare = @Queue("sp_test")) public void receive1(String message) { System.out.println("message1 = " + message); } /** * 一个消费者 * * @param message */ @RabbitListener(queuesToDeclare = @Queue("sp_test")) public void receive2(String message) { System.out.println("message2 = " + message); } }
2.3.3 Public
新建路由:
生产者:
@Component public class Test01 { /** * 注入rabbitTemplate */ @Autowired private RabbitTemplate rabbitTemplate; /** * Fanout 广播 */ public void testFanout() { /** * 参数1: 交换机名称 * 参数2: routingKey * 参数3: message * */ rabbitTemplate.convertAndSend("test_fanout", "", "Fanout的模型发送的消息"); } }
消费者:
@Component public class Consumer2 { @RabbitListener(bindings = { @QueueBinding( value = @Queue,//创建临时队列 exchange = @Exchange(value = "test_fanout", type = "fanout") //绑定的交换机 ) }) public void receive1(String message) { System.out.println("message1 = " + message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue,//创建临时队列 exchange = @Exchange(value = "test_fanout", type = "fanout") //绑定的交换机 ) }) public void receive2(String message) { System.out.println("message2 = " + message); } }
测试:
@SpringBootTest class SpRabbitmqApplicationTests { @Autowired private Test01 test01; @Test void producerTest01() { test01.testFanout(); } }
2.3.4 Routing
创建交换机:
生产者:
@Component public class Test01 { /** * 注入rabbitTemplate */ @Autowired private RabbitTemplate rabbitTemplate; /** * Route 路由模式 */ public void testRoute() { /** * 参数1: 交换机名称 * 参数2: 路由key * 参数3: 发送的信息 * */ rabbitTemplate.convertAndSend("test_routing", "info", "发送info的key的路由信息"); } }
消费者:
@Component public class Consumer3 { @RabbitListener(bindings = { @QueueBinding( value = @Queue,//创建临时队列 exchange = @Exchange(value = "test_routing", type = "direct"),//自定交换机名称和类型 key = {"info", "error", "warn"} ) }) public void receive1(String message) { System.out.println("message1 = " + message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(value = "test_routing", type = "direct"), key = {"error"} ) }) public void receive2(String message) { System.out.println("message2 = " + message); } }
测试:
@SpringBootTest class SpRabbitmqApplicationTests { @Autowired private Test01 test01; @Test void producerTest01() { test01.testRoute(); } }
2.3.5 Topics
添加交换机:
生产者:
@Component public class Test01 { /** * 注入rabbitTemplate */ @Autowired private RabbitTemplate rabbitTemplate; /** * Topic 动态路由 订阅模式 */ public void testTopic() { /** * 参数1: 交换机名称 * 参数2: 路由key * 参数3: 发送的消息 * */ rabbitTemplate.convertAndSend("test_topic", "ymx.name.Mr_YanMingXin", "ymx路由消息"); } }
消费者:
@Component public class Consumer4 { /** * "*"代表一个单词,"#"代表0到多个单词 * * @param message */ @RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(type = "topic", name = "test_topic"), key = {"ymx.name", "ymx.name.*"} ) }) public void receive1(String message) { System.out.println("message1 = " + message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(type = "topic", name = "test_topic"), key = {"ymx.#", "ymx.name.#"} ) }) public void receive2(String message) { System.out.println("message2 = " + message); } }
测试:
@SpringBootTest class SpRabbitmqApplicationTests { @Autowired private Test01 test01; @Test void producerTest01() { test01.testTopic(); } }
3 小总结下
- 消费者是不需要手动去调用的,只需要写上一个带参数的方法和打上@RabbitListener注解,在项目运行时就可以自己进行消费了
- public中之所以没有指定routingKey是因为这种模式下会向全部的消费者发送消息,写上也没什么意义
- @RabbitListener可以写在方法上也可以写在类上,写在类上时,方法上就要打上@RabbitHandler注解
- 还有其他需要注意的欢迎交流~