RabbitMQ工作模式4 Routing路由模式

简介: RabbitMQ工作模式4 Routing路由模式

RabbitMQ工作模式4 Routing路由模式


模式说明

image.png1 队列与交换机的绑定,不能是任意绑定了,而是要指定一个routingKey(路由键)

2 消息的发送在向Exchange(交换机)发送消息时,也必须指定消息的routingKey(路由键)

3 Exchange(交换机)不再把消息交给每一个绑定的队列,而是根据消息的routingKey进行判断,只有队列的routingKey与消息的routingKey完全一致,才会接收到消息

根据以上需求,如果日志信息为info就存到数据库,那么数据库压力是巨大的,修改需求,日志级别为info的输出到控制台,error级别的保存到数据库

生产者

package com.wyh.producer;
/**
 * @program: SpringBoot-RabbitMQ
 * @description: RabbitMQ生产者 RoutingKey路由模式
 * @author: 魏一鹤
 * @createDate: 2022-03-23 22:40
 **/
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 *  producer主要用来发送消息
 *
 *
**/
public class Routing_producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立工厂  一般连接都是通过连接工厂进行连接 所以要创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
//2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数
        factory.setHost("127.0.0.1");//设置主机ip  172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1)
        factory.setPort(5672);//设置端口 默认值也是5672
        factory.setVirtualHost("/itcast_wyh");//设置虚拟机 默认值/ 杠
        factory.setUsername("weiyihe");//设置用户名 默认值 guest 游客
        factory.setPassword("weiyihe");//设置密码 默认值 guest 游客
        //3 创建连接Connection
        Connection connection = factory.newConnection();
//4 创建channel
        Channel channel = connection.createChannel();
//5 创建交换机 Exchange  exchangeDeclare有很多参数 下面一一说明
        //参数1 String exchange,  交换机名称
        //参数2 BuiltinExchangeType type,      交换机类型 是一个枚举类型 共有四个值 下面一一介绍
        //枚举1 DIRECT("direct"), 定向
        //枚举2 FANOUT("fanout"), 扇形(广播) 发送消息到每一个与之绑定队列
        //枚举3 TOPIC("topic"),  通配符
        //枚举4 HEADERS("headers"); 参数匹配
        //参数3 boolean durable, 释放持久化
        //参数4 boolean autoDelete,   自动删除
        //参数5 boolean internal,     内部使用 一般false
        //参数6 Map<String, Object> arguments  参数
        //交换机名称
        String exChangeName="test_exchange_wyh_direct";
        channel.exchangeDeclare(exChangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
//6 创建队列
        //队列名称
        String queueName1="test_queueName1_direct";
        String queueName2="test_queueName2_direct ";
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
//7 绑定队列和交换机的关系 让它们两个组合起来
        //queueBind有三个参数
        //参数1 String queue 队列名称
        //参数2 String exchange   交换机名称
        //参数3 String routingKey  路由键 绑定规则
        //如果交换机的类型为fanout(广播) 那么routingKey设置为空字符串""
        //队列1的级别绑定为error 和 test
        channel.queueBind(queueName1,exChangeName,"error");
        channel.queueBind(queueName1,exChangeName,"test");
//队列2的绑定分别为info warning error
        channel.queueBind(queueName2,exChangeName,"info");
        channel.queueBind(queueName2,exChangeName,"warning");
        channel.queueBind(queueName2,exChangeName,"error");
//8 发送消息
        //定义消息信息
        String body="日志信息:张三调用了findAll方法,日志级别为:info";
        channel.basicPublish(exChangeName,"error",null,body.getBytes());
//9 释放资源
        channel.close();
        connection.close();
    }
}

由于我们设置了info只能发送到队列2中,所以队列2有消息,队列1没有消息

image.png

再次发送error级别的消息,由于队列1和队列2都有error这个级别,所以都接收到了

image.png

如果发送的消息的路由key队列都适配,那么会同时进行接受

image.png


查看交换机的队列关系


image.png

消费者1

package com.wyh.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @program: SpringBoot-RabbitMQ
 * @description: RabbitMQ消费者Consumer 路由模式
 * @author: 魏一鹤
 * @createDate: 2022-03-24 22:08
 **/
/**
 *  consumer主要用来消费消息
 *
 *
 **/
public class Routing_consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立工厂  一般连接都是通过连接工厂进行连接 所以要创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
//2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数
        factory.setHost("127.0.0.1");//设置主机ip  172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1)
        factory.setPort(5672);//设置端口 默认值也是5672
        factory.setVirtualHost("/itcast_wyh");//设置虚拟机 默认值/ 杠
        factory.setUsername("weiyihe");//设置用户名 默认值 guest 游客
        factory.setPassword("weiyihe");//设置密码 默认值 guest 游客
        //3 创建连接Connection
        Connection connection = factory.newConnection();
//4 创建channel
        Channel channel = connection.createChannel();
//队列名称
        String queueName1="test_routing_Name1";
//接收消息  它的参数比较多 下面一一说明
        //  String queue, 队列名称
        //  boolean autoAck, 是否自动确认  消费者收到消息会自动告诉MQ它收到了消息
        //  Consumer callback  回调对象 可以监听一些方法
        //consumer本质是一个接口 需要创建它的实现类
        Consumer consumer=new DefaultConsumer(channel){
//匿名内部类 重写它的方法
            //回调方法 当收到消息后,会自动执行该方法 它有一些参数
            //String consumerTag 标识
            //Envelope envelope 可以获取一些信息 比如交换机 路由key
            //AMQP.BasicProperties properties 配置信息
            // byte[] body 数据
            //
            @Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//System.out.println("consumerTag = " + consumerTag);
                //System.out.println("exchange = " + envelope.getExchange());
                //System.out.println("routingKey = " + envelope.getRoutingKey());
                //System.out.println("properties = " + properties);
                System.out.println("订阅模式消费者1接收的消息 = " + new String(body));
                System.out.println("将日志打印在控制台");
            }
/**   打印的信息
            **/
        };
        channel.basicConsume(queueName1,true,consumer);
//消费者本质是一个监听 所以不要去关闭资源
    }
}

消费者2

package com.wyh.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @program: SpringBoot-RabbitMQ
 * @description: RabbitMQ消费者Consumer 路由模式
 * @author: 魏一鹤
 * @createDate: 2022-03-24 22:08
 **/
/**
 *  consumer主要用来消费消息
 *
 *
 **/
public class Routing_consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立工厂  一般连接都是通过连接工厂进行连接 所以要创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
//2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数
        factory.setHost("127.0.0.1");//设置主机ip  172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1)
        factory.setPort(5672);//设置端口 默认值也是5672
        factory.setVirtualHost("/itcast_wyh");//设置虚拟机 默认值/ 杠
        factory.setUsername("weiyihe");//设置用户名 默认值 guest 游客
        factory.setPassword("weiyihe");//设置密码 默认值 guest 游客
        //3 创建连接Connection
        Connection connection = factory.newConnection();
//4 创建channel
        Channel channel = connection.createChannel();
//队列名称
        String queueName2="test_routing_Name2";
//接收消息  它的参数比较多 下面一一说明
        //  String queue, 队列名称
        //  boolean autoAck, 是否自动确认  消费者收到消息会自动告诉MQ它收到了消息
        //  Consumer callback  回调对象 可以监听一些方法
        //consumer本质是一个接口 需要创建它的实现类
        Consumer consumer=new DefaultConsumer(channel){
//匿名内部类 重写它的方法
            //回调方法 当收到消息后,会自动执行该方法 它有一些参数
            //String consumerTag 标识
            //Envelope envelope 可以获取一些信息 比如交换机 路由key
            //AMQP.BasicProperties properties 配置信息
            // byte[] body 数据
            //
            @Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//System.out.println("consumerTag = " + consumerTag);
                //System.out.println("exchange = " + envelope.getExchange());
                //System.out.println("routingKey = " + envelope.getRoutingKey());
                //System.out.println("properties = " + properties);
                System.out.println("订阅模式消费者1接收的消息 = " + new String(body));
                System.out.println("将日志存储到数据库");
            }
/**   打印的信息
            **/
        };
        channel.basicConsume(queueName2,true,consumer);
//消费者本质是一个监听 所以不要去关闭资源
    }
}

路由模式小结

1 Routing模式要求队列在绑定交换机时指定routingKey,消息会转发给符合routingKey条件的队列

2 交换机类型要设置为direci(定向)

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
20天前
|
消息中间件 Java API
MQ产品使用合集之RocketMQ dledger集群模式的dledgerpeers端口是集群之间通讯吗
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
20天前
|
消息中间件 Java BI
RabbitMQ的四种消息传递模式与演示代码
RabbitMQ的四种消息传递模式与演示代码
43 0
|
20天前
|
消息中间件 Java Maven
springboot 使用注解的方式创建rabbitmq的交换机、路由key、以及监听队列的名称
springboot 使用注解的方式创建rabbitmq的交换机、路由key、以及监听队列的名称
|
20天前
|
消息中间件 Java 调度
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
22 1
|
20天前
|
消息中间件 Java RocketMQ
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
15 1
|
20天前
|
消息中间件 存储 NoSQL
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
30 1
|
20天前
|
传感器 监控 网络协议
MQTT 发布、订阅模式介绍
【2月更文挑战第17天】
128 6
MQTT 发布、订阅模式介绍
|
20天前
|
消息中间件 网络架构
【面试问题】什么是 MQ topic 交换器(模式匹配) ?
【1月更文挑战第27天】【面试问题】什么是 MQ topic 交换器(模式匹配) ?
|
20天前
|
消息中间件
【面试问题】MQ 消息怎么路由?
【1月更文挑战第27天】【面试问题】MQ 消息怎么路由?
|
20天前
|
物联网 Go 网络性能优化
MQTT协议本身支持多种消息收发模式
MQTT协议本身支持多种消息收发模式【1月更文挑战第24天】【1月更文挑战第120篇】
43 3