RabbitMQ工作模式4 Routing路由模式
模式说明
1 队列与交换机的绑定,不能是任意绑定了,而是要指定一个routingKey(路由键)
2 消息的发送在向Exchange(交换机)发送消息时,也必须指定消息的routingKey(路由键)
3 Exchange(交换机)不再把消息交给每一个绑定的队列,而是根据消息的routingKey进行判断,只有队列的routingKey与消息的routingKey完全一致,才会接收到消息
根据以上需求,如果日志信息为info就存到数据库,那么数据库压力是巨大的,修改需求,日志级别为info的输出到控制台,error级别的保存到数据库
生产者
package com.wyh.producer; /** * @program: SpringBoot-RabbitMQ * @description: RabbitMQ生产者 RoutingKey路由模式 * @author: 魏一鹤 * @createDate: 2022-03-23 22:40 **/ import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * producer主要用来发送消息 * * **/ public class Routing_producer { public static void main(String[] args) throws IOException, TimeoutException { //1.建立工厂 一般连接都是通过连接工厂进行连接 所以要创建连接工厂 ConnectionFactory factory=new ConnectionFactory(); //2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数 factory.setHost("127.0.0.1");//设置主机ip 172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1) factory.setPort(5672);//设置端口 默认值也是5672 factory.setVirtualHost("/itcast_wyh");//设置虚拟机 默认值/ 杠 factory.setUsername("weiyihe");//设置用户名 默认值 guest 游客 factory.setPassword("weiyihe");//设置密码 默认值 guest 游客 //3 创建连接Connection Connection connection = factory.newConnection(); //4 创建channel Channel channel = connection.createChannel(); //5 创建交换机 Exchange exchangeDeclare有很多参数 下面一一说明 //参数1 String exchange, 交换机名称 //参数2 BuiltinExchangeType type, 交换机类型 是一个枚举类型 共有四个值 下面一一介绍 //枚举1 DIRECT("direct"), 定向 //枚举2 FANOUT("fanout"), 扇形(广播) 发送消息到每一个与之绑定队列 //枚举3 TOPIC("topic"), 通配符 //枚举4 HEADERS("headers"); 参数匹配 //参数3 boolean durable, 释放持久化 //参数4 boolean autoDelete, 自动删除 //参数5 boolean internal, 内部使用 一般false //参数6 Map<String, Object> arguments 参数 //交换机名称 String exChangeName="test_exchange_wyh_direct"; channel.exchangeDeclare(exChangeName, BuiltinExchangeType.DIRECT,true,false,false,null); //6 创建队列 //队列名称 String queueName1="test_queueName1_direct"; String queueName2="test_queueName2_direct "; channel.queueDeclare(queueName1,true,false,false,null); channel.queueDeclare(queueName2,true,false,false,null); //7 绑定队列和交换机的关系 让它们两个组合起来 //queueBind有三个参数 //参数1 String queue 队列名称 //参数2 String exchange 交换机名称 //参数3 String routingKey 路由键 绑定规则 //如果交换机的类型为fanout(广播) 那么routingKey设置为空字符串"" //队列1的级别绑定为error 和 test channel.queueBind(queueName1,exChangeName,"error"); channel.queueBind(queueName1,exChangeName,"test"); //队列2的绑定分别为info warning error channel.queueBind(queueName2,exChangeName,"info"); channel.queueBind(queueName2,exChangeName,"warning"); channel.queueBind(queueName2,exChangeName,"error"); //8 发送消息 //定义消息信息 String body="日志信息:张三调用了findAll方法,日志级别为:info"; channel.basicPublish(exChangeName,"error",null,body.getBytes()); //9 释放资源 channel.close(); connection.close(); } }
由于我们设置了info只能发送到队列2中,所以队列2有消息,队列1没有消息
再次发送error级别的消息,由于队列1和队列2都有error这个级别,所以都接收到了
如果发送的消息的路由key队列都适配,那么会同时进行接受
查看交换机的队列关系
消费者1
package com.wyh.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @program: SpringBoot-RabbitMQ * @description: RabbitMQ消费者Consumer 路由模式 * @author: 魏一鹤 * @createDate: 2022-03-24 22:08 **/ /** * consumer主要用来消费消息 * * **/ public class Routing_consumer1 { public static void main(String[] args) throws IOException, TimeoutException { //1.建立工厂 一般连接都是通过连接工厂进行连接 所以要创建连接工厂 ConnectionFactory factory=new ConnectionFactory(); //2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数 factory.setHost("127.0.0.1");//设置主机ip 172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1) factory.setPort(5672);//设置端口 默认值也是5672 factory.setVirtualHost("/itcast_wyh");//设置虚拟机 默认值/ 杠 factory.setUsername("weiyihe");//设置用户名 默认值 guest 游客 factory.setPassword("weiyihe");//设置密码 默认值 guest 游客 //3 创建连接Connection Connection connection = factory.newConnection(); //4 创建channel Channel channel = connection.createChannel(); //队列名称 String queueName1="test_routing_Name1"; //接收消息 它的参数比较多 下面一一说明 // String queue, 队列名称 // boolean autoAck, 是否自动确认 消费者收到消息会自动告诉MQ它收到了消息 // Consumer callback 回调对象 可以监听一些方法 //consumer本质是一个接口 需要创建它的实现类 Consumer consumer=new DefaultConsumer(channel){ //匿名内部类 重写它的方法 //回调方法 当收到消息后,会自动执行该方法 它有一些参数 //String consumerTag 标识 //Envelope envelope 可以获取一些信息 比如交换机 路由key //AMQP.BasicProperties properties 配置信息 // byte[] body 数据 // @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //System.out.println("consumerTag = " + consumerTag); //System.out.println("exchange = " + envelope.getExchange()); //System.out.println("routingKey = " + envelope.getRoutingKey()); //System.out.println("properties = " + properties); System.out.println("订阅模式消费者1接收的消息 = " + new String(body)); System.out.println("将日志打印在控制台"); } /** 打印的信息 **/ }; channel.basicConsume(queueName1,true,consumer); //消费者本质是一个监听 所以不要去关闭资源 } }
消费者2
package com.wyh.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @program: SpringBoot-RabbitMQ * @description: RabbitMQ消费者Consumer 路由模式 * @author: 魏一鹤 * @createDate: 2022-03-24 22:08 **/ /** * consumer主要用来消费消息 * * **/ public class Routing_consumer2 { public static void main(String[] args) throws IOException, TimeoutException { //1.建立工厂 一般连接都是通过连接工厂进行连接 所以要创建连接工厂 ConnectionFactory factory=new ConnectionFactory(); //2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数 factory.setHost("127.0.0.1");//设置主机ip 172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1) factory.setPort(5672);//设置端口 默认值也是5672 factory.setVirtualHost("/itcast_wyh");//设置虚拟机 默认值/ 杠 factory.setUsername("weiyihe");//设置用户名 默认值 guest 游客 factory.setPassword("weiyihe");//设置密码 默认值 guest 游客 //3 创建连接Connection Connection connection = factory.newConnection(); //4 创建channel Channel channel = connection.createChannel(); //队列名称 String queueName2="test_routing_Name2"; //接收消息 它的参数比较多 下面一一说明 // String queue, 队列名称 // boolean autoAck, 是否自动确认 消费者收到消息会自动告诉MQ它收到了消息 // Consumer callback 回调对象 可以监听一些方法 //consumer本质是一个接口 需要创建它的实现类 Consumer consumer=new DefaultConsumer(channel){ //匿名内部类 重写它的方法 //回调方法 当收到消息后,会自动执行该方法 它有一些参数 //String consumerTag 标识 //Envelope envelope 可以获取一些信息 比如交换机 路由key //AMQP.BasicProperties properties 配置信息 // byte[] body 数据 // @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //System.out.println("consumerTag = " + consumerTag); //System.out.println("exchange = " + envelope.getExchange()); //System.out.println("routingKey = " + envelope.getRoutingKey()); //System.out.println("properties = " + properties); System.out.println("订阅模式消费者1接收的消息 = " + new String(body)); System.out.println("将日志存储到数据库"); } /** 打印的信息 **/ }; channel.basicConsume(queueName2,true,consumer); //消费者本质是一个监听 所以不要去关闭资源 } }