【RabbitMQ五】——RabbitMQ路由模式(Routing)

简介: 【RabbitMQ五】——RabbitMQ路由模式(Routing)

RabbitMQ路由模式

前言

通过本篇博客能够简单使用RabbitMQ的路由模式

本篇博客主要是博主通过官网以及学习他人的博客总结出的RabbitMQ路由模式。其中如果有误欢迎大家及时指正。

RabbitMQ模式的基本概念

路由模式是根据Routing Key有条件的将消息筛选后发送给消费者,消费者只接受筛选之后的消息

路由模式的核心是:配置一个类型为direct的交换机,并且需要指定不同的路由键(routing key),把对应的消息从交换机路由到不同的消息队列进行存储,再由对应的消费者进行消费

为什么要使用Rabbitmq 路由模式

由于发布订阅模式是无条件将所有消息分发给所有消费者,路由模式可以根据条件(Routing Key)将消息筛选之后发送给消费者。
应用场景:

例如:有一个股票分析机构,每天都会有一些独家的股票分析报告。对于其他一些应用平台,想要每天都到这家股票分析机构提供的百度的独家股票分析报告,对于另外一些应用平台想要收到谷歌的独家股票分析报告,就可以使用路由模式。

RabbitMQ路由模式组成元素


P:生产者,向交换机发送消息的是否需要指定routing key

X:交换机,接收生产者发送的消息,需要指定交换机的类型为direct,并且将消息发送给与routing key匹配的队列
C1:消费者1,它所在队列指定了需要routing key为error的信息

C2:消费者2,其所在队列指定了需要routing key 为 info、error、warning 的消息

路由模式完整代码

**业务场景:**生产者为日志分发平台,分发info、warning、error级别的日志,消费者1只接受日志级别为error的日志,消费者2接收全部日志。

Pom文件引入RabbtiMQ依赖

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>

RabbitMQ工具类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : RabbitMQUtils
 * @description : [rabbitmq工具类]
 * @createTime : [2023/1/17 8:49]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 8:49]
 * @updateRemark : [描述说明本次修改内容]
 */
public class RabbitMQUtils {
    /*
     * @version V1.0
     * Title: getConnection
     * @author Wangwei
     * @description 创建rabbitmq连接
     * @createTime  2023/1/17 8:52
     * @param []
     * @return com.rabbitmq.client.Connection
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");
        factory.setPort(5672);
        factory.setVirtualHost("虚拟主机");
        factory.setUsername("用户名");
        factory.setPassword("密码");
        //创建连接
        Connection connection=factory.newConnection();
        return  connection;
    }
    /*
     * @version V1.0
     * Title: getChannel
     * @author Wangwei
     * @description 创建信道
     * @createTime  2023/1/17 8:55
     * @param []
     * @return com.rabbitmq.client.Channel
     */
    public static Channel getChannel() throws IOException, TimeoutException {
        Connection connection=getConnection();
        Channel channel=connection.createChannel();
        return channel;
    }
}

生产者

import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : Producer
 * @description : [生产者]
 * @createTime : [2023/2/1 9:38]
 * @updateUser : [WangWei]
 * @updateTime : [2023/2/1 9:38]
 * @updateRemark : [描述说明本次修改内容]
 */
public class Producer {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        RabbitMQUtils.getConnection();
        //声明通道
        Channel channel = RabbitMQUtils.getChannel();
        //创建fanout类型交换机并命名为logs
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //声明routingKey
        String severityInfo="info";
        String severityError="error";
        String severityWarning="warning";
        //循环发送2条消息
        for (int i = 0; i <2 ; i++) {
            String msg="路由模式info:"+i;
            /*推送消息
             *交换机命名,不填写使用默认的交换机
             * routingKey -路由键-
             * props:消息的其他属性-路由头等正文
             * msg消息正文
             */
            channel.basicPublish(EXCHANGE_NAME,severityInfo,null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println(msg);
        }
        //循环发送2条消息
        for (int i = 0; i <2 ; i++) {
            String msg="路由模式error:"+i;
            /*推送消息
             *交换机命名,不填写使用默认的交换机
             * routingKey -路由键-
             * props:消息的其他属性-路由头等正文
             * msg消息正文
             */
            channel.basicPublish(EXCHANGE_NAME,severityError,null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println(msg);
        }
        //循环发送2条消息
        for (int i = 0; i <2 ; i++) {
            String msg="路由模式warning:"+i;
            /*推送消息
             *交换机命名,不填写使用默认的交换机
             * routingKey -路由键-
             * props:消息的其他属性-路由头等正文
             * msg消息正文
             */
            channel.basicPublish(EXCHANGE_NAME,severityWarning,null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println(msg);
        }
    }
}

消费者1

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerOne
 * @description : [消费者1]
 * @createTime : [2023/2/1 9:39]
 * @updateUser : [WangWei]
 * @updateTime : [2023/2/1 9:39]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerOne {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        String queueName = channel.queueDeclare().getQueue();
        //声明routingKey (error)
        String severityError="error";
        //交换机与队列进行绑定-如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失
        //queueName绑定了direct_logs交换机并且绑定了routingKey
        channel.queueBind(queueName, EXCHANGE_NAME,severityError );
        //因为Rabbitmq服务器将异步地向我们推送消息,所以我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备好使用它们。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

消费者2

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerTwo
 * @description : [消费者2]
 * @createTime : [2023/2/1 9:38]
 * @updateUser : [WangWei]
 * @updateTime : [2023/2/1 9:38]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerTwo {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();
        //创建fanout类型交换机并命名为logs
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //创建了一个非持久的、排他的、自动删除的队列,并生成了一个名称
        String queueName = channel.queueDeclare().getQueue();
        //声明routingKey (info,error,warning)
        String severityInfo="info";
        String severityError="error";
        String severityWarning="warning";
        //交换机与队列进行绑定-如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失
        //queueName绑定了direct_logs交换机并且绑定了3个routingKey
        channel.queueBind(queueName, EXCHANGE_NAME,severityInfo );
        channel.queueBind(queueName, EXCHANGE_NAME,severityError );
        channel.queueBind(queueName, EXCHANGE_NAME,severityWarning );
        //因为Rabbitmq服务器将异步地向我们推送消息,所以我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备好使用它们。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

运行结果截图



相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
物联网 Go 网络性能优化
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式【1月更文挑战第21天】【1月更文挑战第104篇】
109 1
|
3月前
|
消息中间件 Java
Java操作RabbitMQ单一生产-消费者模式
Java操作RabbitMQ单一生产-消费者模式
32 0
|
1月前
|
消息中间件 Java RocketMQ
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
12 1
|
1月前
|
消息中间件 存储 NoSQL
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
27 1
|
1月前
|
传感器 监控 网络协议
MQTT 发布、订阅模式介绍
【2月更文挑战第17天】
73 6
MQTT 发布、订阅模式介绍
|
3月前
|
消息中间件 网络架构
【面试问题】什么是 MQ topic 交换器(模式匹配) ?
【1月更文挑战第27天】【面试问题】什么是 MQ topic 交换器(模式匹配) ?
|
3月前
|
消息中间件
【面试问题】MQ 消息怎么路由?
【1月更文挑战第27天】【面试问题】MQ 消息怎么路由?
|
3月前
|
物联网 Go 网络性能优化
MQTT协议本身支持多种消息收发模式
MQTT协议本身支持多种消息收发模式【1月更文挑战第24天】【1月更文挑战第120篇】
37 3
|
3月前
|
消息中间件 Java Spring
一文看懂Spring Boot整合Rabbit MQ实现多种模式的生产和消费
一文看懂Spring Boot整合Rabbit MQ实现多种模式的生产和消费
66 0
|
3月前
|
消息中间件 Java Spring
RabbitMQ各种模式的含义与Spring Boot实例详解
RabbitMQ各种模式的含义与Spring Boot实例详解
36 0