准备环境
使用IDEA创建一个MAVEN的SpringBoot项目。并勾选如下使用的依赖等
一、fanout模式
(一)生产者
1、目录结构
2、application.yml文件
#服务端口 server: port: 8989 #配置rabbitmq spring: rabbitmq: port: 5672 username: admin password: 123456 host: 你的rabbitmq的IP或者域名 virtual-host: / #配置其他 这些是自定义的,方便之后使用 mq: fannout: exchangName: order.fanout.ex
3、Producer.java
package com.zh.srp.mq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.xml.crypto.Data; import java.util.Date; import java.util.UUID; @Component public class Producer { @Autowired private RabbitTemplate rabbitTemplate; //1、定义交换机 @Value("${mq.fannout.exchangName}") private String exchangName; //2、路由Key private String routeKey = ""; public void sendMessage(int i){ //订单信息 String orderId = UUID.randomUUID().toString(); //消息 String message = "你的订单第【"+i+"】个信息是:" + orderId + new Date().toString(); System.out.println("正在发送----->:"+message); rabbitTemplate.convertAndSend(exchangName,routeKey,message); } }
4、SRCApplicationTests.java
package com.zh.srp; import com.zh.srp.mq.Producer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class SRCApplicationTests { @Autowired private Producer producer; @Test void contextLoads() throws InterruptedException { for (int i = 0; i < 100; i++) { producer.sendMessage(i); Thread.sleep(2); } } }
(二)消费者
1、目录结构
2、application.yml文件
#服务端口 server: port: 8081 #配置rabbitmq spring: rabbitmq: port: 5672 username: admin password: 123456 host: ip virtual-host: / #配置其他 mq: fannout: exchangName: order.fanout.ex #交换机 log: #日志队列 queue: order.fanout.log.queue #C1队列 email: queue: order.fanout.email.queue #C2队列
3、EmailService.java
package com.zh.srp.mq; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Component //会把申请的队列和交换机进行绑定 //确定消息的模式 fanout direct topic //确定队列queue的持久性 @RabbitListener( bindings = @QueueBinding(value = @Queue(value = "${mq.fannout.email.queue}", autoDelete = "true"), exchange = @Exchange(value = "${mq.fannout.exchangName}",type = ExchangeTypes.FANOUT))) public class EmailService { @RabbitHandler public void sendMessage(String Message){ System.out.println("Email-log--------->"+Message); } }
4、LogService.java
package com.zh.srp.mq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.xml.crypto.Data; import java.util.Date; import java.util.UUID; @Component //会把申请的队列和交换机进行绑定 //确定消息的模式 fanout direct topic //确定队列queue的持久性 @RabbitListener( bindings = @QueueBinding(value = @Queue(value = "${mq.fannout.log.queue}", autoDelete = "true"), exchange = @Exchange(value = "${mq.fannout.exchangName}",type = ExchangeTypes.FANOUT))) public class LogService { @RabbitHandler public void sendMessage(String Message){ System.out.println("log--------->"+Message); } }
(三)fanout模式测试
1、启动消费者的SRApplication.java
2、启动生产者的Test.java中的测试方法
生产:
消费:
二、direct模式
(一)生产者
1、目录结构
2、application.yml文件
#服务端口 server: port: 8989 #配置rabbitmq spring: rabbitmq: port: 5672 username: admin password: 123456 host: IP virtual-host: / #配置其他 mq: direct: exchangName: order.direct.ex
3、Producer.java
package com.zh.srp.mq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.xml.crypto.Data; import java.util.Date; import java.util.UUID; @Component public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; // 这个是一个接口,最终也会找到实现类RabbitTemplate // @Autowired // private AmqpTemplate amqpTemplate; //1、定义交换机 @Value("${mq.direct.exchangName}") private String exchangName; // 保存用户 public void saveUser(int i){ //订单信息 String orderId = UUID.randomUUID().toString(); //消息 String message = "保存用户:" + orderId + new Date().toString(); System.out.println("正在发送user----->:"+message); rabbitTemplate.convertAndSend(exchangName,"email",message); rabbitTemplate.convertAndSend(exchangName,"log",message); rabbitTemplate.convertAndSend(exchangName,"wx",message); } // 保存用户 public void WX(int i){ //订单信息 String orderId = UUID.randomUUID().toString(); //消息 String message = "你的微信第【"+i+"】个信息是:" + orderId + new Date().toString(); System.out.println("正在发送WX----->:"+message); rabbitTemplate.convertAndSend(exchangName,"email",message); rabbitTemplate.convertAndSend(exchangName,"log",message); } }
4、DirectApplicationTests.java
package com.zh.srp; import com.zh.srp.mq.OrderService; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class DirectApplicationTests { @Autowired private OrderService producer; @Test void contextLoads() throws InterruptedException { for (int i = 0; i < 1; i++) { producer.saveUser(i); producer.WX(i); Thread.sleep(2); } } }
(二)消费者
1、目录结构
2、application.yml文件
#服务端口 server: port: 8081 #配置rabbitmq spring: rabbitmq: port: 5672 username: admin password: 123456 host: IP virtual-host: / #配置其他 mq: direct: exchangName: order.direct.ex log: #日志队列 queue: order.direct.log.queue #C1队列 email: queue: order.direct.email.queue #C2队列 wx: queue: order.direct.wx.queue #C3队列
3、EMailService.java
package com.zh.srp.mq; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Component public class EMailService { //会把申请的队列和交换机进行绑定 //确定消息的模式 fanout direct topic //确定队列queue的持久性 autoDelete = "false":代表持久化 @RabbitListener( bindings = @QueueBinding(value = @Queue(value = "${mq.direct.email.queue}", autoDelete = "false"), exchange = @Exchange(value = "${mq.direct.exchangName}",type = ExchangeTypes.DIRECT), key = "email" //路由Key )) @RabbitHandler public void sendMessage(String Message){ System.out.println("email-log--------->"+Message); } }
4、LOGService.java
package com.zh.srp.mq; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Component //会把申请的队列和交换机进行绑定 //确定消息的模式 fanout direct topic //确定队列queue的持久性 public class LOGService { @RabbitListener( bindings = @QueueBinding(value = @Queue(value = "${mq.direct.log.queue}", autoDelete = "false"), exchange = @Exchange(value = "${mq.direct.exchangName}",type = ExchangeTypes.DIRECT), key = "log" //路由Key )) @RabbitHandler public void sendMessage(String Message){ System.out.println("log-log--------->"+Message); } }
5、WXService.java
package com.zh.srp.mq; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Component //会把申请的队列和交换机进行绑定 //确定消息的模式 fanout direct topic //确定队列queue的持久性 public class WXService { @RabbitListener( bindings = @QueueBinding(value = @Queue(value = "${mq.direct.wx.queue}", autoDelete = "false"), exchange = @Exchange(value = "${mq.direct.exchangName}",type = ExchangeTypes.DIRECT), key = "wx" //路由Key )) @RabbitHandler public void sendMessage(String Message){ System.out.println("WX-log--------->"+Message); } }
(三)测试
1、启动消费者DirectCApplication.java
2、启动生产者的Test类的测试方法
消费
生产
三、topic模式
(一)生产者
1、目录结构
2、application.yml文件
#服务端口 server: port: 8989 #配置rabbitmq spring: rabbitmq: port: 5672 username: admin password: 123456 host: ip virtual-host: / #配置其他 mq: topic: exchangName: linux.topic
3、OrderService.java
package com.zh.srp.mq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.xml.crypto.Data; import java.util.Date; import java.util.UUID; @Component public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; //1、定义交换机 @Value("${mq.topic.exchangName}") private String exchangName; //WX public void QQ(int i){ //订单信息 String orderId = UUID.randomUUID().toString(); //消息 String message = "你的WX第【"+i+"】个信息是:" + orderId + new Date().toString(); System.out.println("正在发送WX----->:"+message); //通配符topic rabbitTemplate.convertAndSend(exchangName,"qq.log",message); } //QQ public void WX(int i){ //订单信息 String orderId = UUID.randomUUID().toString(); //消息 String message = "你的QQ第【"+i+"】个信息是:" + orderId + new Date().toString(); System.out.println("正在发送QQ----->:"+message); //通配符topic rabbitTemplate.convertAndSend(exchangName,"wx.log",message); } }
4、TopicApplicationTests.java
package com.zh.srp; import com.zh.srp.mq.OrderService; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class TopicApplicationTests { @Autowired private OrderService producer; @Test void contextLoads() throws InterruptedException { for (int i = 0; i < 1; i++) { producer.QQ(i); producer.WX(i); Thread.sleep(2); } } }
(二)消费者
1、目录结构
2、application.yml文件
#服务端口 server: port: 8081 #配置rabbitmq spring: rabbitmq: port: 5672 username: admin password: 123456 host: ip virtual-host: / #配置其他 mq: topic: exchangName: linux.topic qq: #日志队列 queue: linux.topic.qq.queue #C1队列 wx: queue: linux.topic.wx.queue #C3队列
3、QQService.java
package com.zh.srp.mq; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Component public class QQService { @RabbitListener( bindings = @QueueBinding(value = @Queue(value = "${mq.topic.qq.queue}", autoDelete = "true"), exchange = @Exchange(value = "${mq.topic.exchangName}",type = ExchangeTypes.TOPIC), key = "qq.*" //路由Key )) @RabbitHandler public void sendMessage(String Message){ System.out.println("QQ-log--------->"+Message); } }
4、WXService.java
package com.zh.srp.mq; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Component public class WXService { @RabbitListener( bindings = @QueueBinding(value = @Queue(value = "${mq.topic.wx.queue}", autoDelete = "true"), exchange = @Exchange(value = "${mq.topic.exchangName}",type = ExchangeTypes.TOPIC), key = "wx.*" //路由Key )) @RabbitHandler public void sendMessage(String Message){ System.out.println("WX-log--------->"+Message); } }
(三)测试
1、启动消费者
2、启动生产者
生产者
消费者