RabbitMQ学习笔记 03、交换机模式(4种)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: RabbitMQ学习笔记 03、交换机模式(4种)

交换机工作模式(四种)


fanout:广播,这种模式只需要将队列绑定到交换机上即可,是不需要设置路由键的。(广播,直接绑定队列)


我们之前的哪里就是fanout形式。

direct:根据RoutingKey匹配消息路由到指定的队列。(绝对匹配)


topic:生产者指定RoutingKey消息根据消费端指定的队列通过模糊匹配方式进行相应转发。(模糊匹配)


headers:根据发送消息内容中的headers属性来匹配。


有了交换机之后就有了更强大的能力,可以根据交换机的模式来完成更加复杂的功能。



一、fanout模式


1.1、基本概念


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JBGMtaQg-1651541063735)(C:\Users\93997\AppData\Roaming\Typora\typora-user-images\image-20210925070020024.png)]


应用场景:例如我们要通过rabbitmq来进行日志记录,一般会以两种形式,控制台以及存储到磁盘方式进行落库存盘。无论哪种方式,其内容本质是相同的,此时我们可以使用fanout方式。


特点:


使用fanout的特点就是只有当消费者连接之后生产者发送的消息才会生效,在此之前生产者产生的会默认直接丢弃,这也就解决了消息积压的问题。(对于之前的一些日志记录丢弃掉也没有关系)

若是运行多个服务,那么每一个服务都能够收到同一份消息,而不像之前案例一样均匀分配任务了!!!


1.2、代码实操


目的(实现效果):发送的多个日志信息,每个服务都能够收到相同数量且同样内容的信息。


生产者


import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
/**
 * @ClassName EmitLog
 * @Author ChangLu
 * @Date 2021/9/25 7:10
 * @Description TODO
 */
public class EmitLog {
    //核心:指定交换机名称
    private static String EXCHANGE_NAME = "LOGS";
    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.118.128");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("password");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //绑定一个交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            //向指定的交换机发送消息
            String msg = "this is a log info!";
            //第二个参数是routingkey,第三个参数是基本属性
            channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息发送成功!");
        }catch (Exception e){
            System.out.println("消息发送失败,失败原因:"+e.getMessage());
        }
        Optional.ofNullable(channel).ifPresent(EmitLog::close);
        Optional.ofNullable(connection).ifPresent(EmitLog::close);
    }
    //反射进行资源关闭
    public static <T> void close(T t){
        Class<?> aClass = t.getClass();
        Optional.ofNullable(t).ifPresent(c-> {
            try {
                Method closeMethod = aClass.getDeclaredMethod("close");
                closeMethod.setAccessible(true);
                closeMethod.invoke(t, null);
            } catch (Exception e){
                e.printStackTrace();
            }
        });
    }
}




消费者


import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 * @ClassName ReceiveLog
 * @Author ChangLu
 * @Date 2021/9/25 7:23
 * @Description TODO
 */
public class ReceiveLog {
    private static String EXCHANGE_NAME = "LOGS";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.118.128");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("password");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //绑定指定的交换机类型
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //生成一个临时的随机的queue,并绑定交换机与队列
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,EXCHANGE_NAME,"");//第三个参数为routingkey
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("接收到消息:"+msg);
            }
        };
        //进行消费,需要指定队列名称
        channel.basicConsume(queueName,true,consumer);
    }
}


演示:生产者连发两条消息,启动的两个消费者服务都能够接收到相同的两份信息




二、direct模式


2.1、基本概念


概念图


可以看到下图,相同的队列可以指定相同的routingkeys,这是不受影响的。





说明


场景:有些情况下,对于日志而言我们进行升级,可能不需要把所有的日志都存储到磁盘上,只需要在磁盘中存储错误日志,例如error类型的,日志分为不同等级嘛。


在磁盘中只存储error,而在控制台里把所有消息都打印出来。涉及到不同的消费者接收消息不一致的情况此时非常建议使用direct模式。

实现效果:生产者服务在消费者服务启动前发送的消息,不会放入到队列中存储起来,只有在消费者服务启动后发送的消息才会被进行分发处理。


实现方式:设置交换机为direct,并且需要额外设置routingkey,可以将key理解为对指定信息感兴趣。



2.2、代码实操


目的(实现效果):不同类型的日志交由不同的服务处理,例如info、debug、warning交由消费者服务1处理;error交由消费者服务2处理。



生产者:


import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
/**
 * @ClassName EmitLogs
 * @Author ChangLu
 * @Date 2021/9/25 9:44
 * @Description 生产者:绑定交换机direct_LOGS,发送四个消息,每个消息匹配一个routingkeys(info、debug、error、warning)
 */
public class EmitLogs {
    //核心:指定交换机名称,需要跟之前的不一样
    private static String EXCHANGE_NAME = "direct_LOGS";
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.118.128");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("password");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //绑定一个Direct类型交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String msg = "sended msg";
        //发送三条消息,每个消息绑定对应的routingkey,发送给指定的一个交换机
        String INFO_MESSAGE = "info-"+msg;
        channel.basicPublish(EXCHANGE_NAME,"info",null,INFO_MESSAGE.getBytes(StandardCharsets.UTF_8));
        System.out.println("成功发送消息:"+INFO_MESSAGE);
        String DEBUG_MESSAGE = "debug-"+msg;
        channel.basicPublish(EXCHANGE_NAME,"debug",null,DEBUG_MESSAGE.getBytes(StandardCharsets.UTF_8));
        System.out.println("成功发送消息:"+DEBUG_MESSAGE);
        String WARNING_MESSAGE = "warning-"+msg;
        channel.basicPublish(EXCHANGE_NAME,"warning",null,WARNING_MESSAGE.getBytes(StandardCharsets.UTF_8));
        System.out.println("成功发送消息:"+WARNING_MESSAGE);
        String ERROR_MESSAGE = "warning-"+msg;
        channel.basicPublish(EXCHANGE_NAME,"error",null,ERROR_MESSAGE.getBytes(StandardCharsets.UTF_8));
        System.out.println("成功发送消息:"+ERROR_MESSAGE);
        channel.close();
        connection.close();
    }
}



消费者1:


import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 * @ClassName ReceiveLogs
 * @Author ChangLu
 * @Date 2021/9/25 9:50
 * @Description 消费者:绑定交换机direct_LOGS,并且设置三个routingkeys:info、debug、warning
 */
public class ReceiveLogs1 {
    private static String EXCHANGE_NAME = "direct_LOGS";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.118.128");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("password");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //绑定指定的交换机类型——direct
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //生成一个随机的临时的queue
        String queueName = channel.queueDeclare().getQueue();
        //一个交换机同时绑定三个routingkeys
        channel.queueBind(queueName,EXCHANGE_NAME,"info");
        channel.queueBind(queueName,EXCHANGE_NAME,"debug");
        channel.queueBind(queueName,EXCHANGE_NAME,"warning");
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("接收到消息:"+msg);
            }
        };
        //进行消费,需要指定队列名称
        channel.basicConsume(queueName,true,consumer);
    }
}



消费者2:大部分内容与消费者1相同,只不过就指定的routingkeys不一样而已


//绑定direct类型交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
//绑定指定routingkeys:error
channel.queueBind(queueName,EXCHANGE_NAME,"error");


测试一下:生产者发送了四个不同routingkeys的消息,两个消费者接受对应routingkeys信息




三、topic模式


3.1、基本概念


我们可对routingkey进行模糊匹配,如使用*或#来进行匹配!


*可以代替一个单词


#可以替代零个或多个单词



我们可以对routingkeys进行.分割的不同类型区分,上面对应着消费者服务来对生产者产生的信息进行过滤,每个队列能够拿到对应交换机可模糊匹配的执行信息,来达到区分的效果!



3.2、实战


目的:有10条记录其中包含了不同类型的内容,生产者发送消息的同时每条记录都附带对应详细的routingkey;消费者则会定义指定的routingkey模糊表达式,用于进行模糊匹配拿到生产者发送过来的信息记录条数。


生产者


这里对方法进行了抽离,实际上基本配置与之前都相同。这里的话准备了一个routingkey数组用于分别描述各组信息与实体信息传递出去:


import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 * @ClassName TopicProduce
 * @Author ChangLu
 * @Date 2021/9/25 10:57
 * @Description 生产者:交换机类型为topic
 */
public class TopicProduce {
    private static String EXCHANGE_NAME = "topic_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        Util.doBefore();
        Channel channel = Util.channel;
        //指定交换机为topic
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //定义关键字,不同的消息在发送时都指定routingKeys
        String[] routingKeys = new String[]{
            "quick.orange.rabbit","quick.orange.fox","quick.brown","quick.orange.male.rabbit",
            "lazy.orange.elephant","lazy.brown.fox","lazy.pink.rabbit","lazy.orange.male.rabbit",
            "orange"
        };
        for (String routingKey : routingKeys) {
            String msg = "信息内容:"+routingKey;
            //发送
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("已发送"+msg);
        }
        Util.close();
    }
    static class Util{
        private static Connection connection = null;
        public static Channel channel = null;
        public static void  doBefore() throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.118.128");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("password");
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
        }
        public static void close() throws IOException, TimeoutException {
            if(channel!=null){
                channel.close();
            }
            if(connection!=null){
                connection.close();
            }
        }
    }
}




消费者


消费者1:基本配置都相同,不再重复展示,匹配的routingkey为"*.orange.*"


private static String EXCHANGE_NAME = "topic_exchange";
...
//指定交换机为topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
//指定routingkeys,可搭配*或#进行匹配
String bindingKey = "*.orange.*";
channel.queueBind(queueName,EXCHANGE_NAME,bindingKey);//绑定队列、交换机以及routingkey
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body, StandardCharsets.UTF_8);
        System.out.println("收到消息:"+msg);
    }
});
...



消费者2:匹配的routingkey为"quick.*"、"lazy.orange.#"、"quick.orange.fox"


private static String EXCHANGE_NAME = "topic_exchange";
//指定交换机为topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
//指定routingkeys,可搭配*或#进行匹配
String bindingKey = "quick.*";
channel.queueBind(queueName,EXCHANGE_NAME,bindingKey);//绑定队列、交换机以及routingkey
String bindingkey2 = "lazy.orange.#";
channel.queueBind(queueName,EXCHANGE_NAME,bindingkey2);
String bindingkey3 = "quick.orange.fox";
channel.queueBind(queueName,EXCHANGE_NAME,bindingkey3);
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body, StandardCharsets.UTF_8);
        System.out.println("收到消息:"+msg);
    }
});
...



测试:可以看到对应的消费者服务都能够进行精确匹配拿到对应的信息




精炼总结

直接指定队列形式:消费者服务没有启动前发送的消息,在消费者服务启动后仍然能够收到,因为此前发送的消息被存储到队列中。


就是一、二章节中的案例。

交换机类型:第三章节案例


fanout:①交换机发送的消息都一致的发送给现有的队列中。(每个队列都能够收到相同的消息)②在使用fanout类型交换机的消费者服务不会接收服务启动前生产者发送的信息,也就是说启动前发送的消息都会被丢弃掉!

应用场景:相同数量及内容多个消费者都能够收到,分别对所有内容进行不同的处理。

direct:①交换机在发送的时候带上routingkey,消息队列在接收时也指定routingkey即可拿到对应交换机想要发送给某个队列的信息,来达到直接准确的发送!②消费者服务启动前,生产者发送给指定交换机的信息会被丢弃。

应用场景:不同类型的日志记录进行不同处理,如窗口执行、磁盘存储。

topic模式(与direct是兄弟关系,或升级关系):①根据严重性,源头去考虑。消费者服务可以进行模糊匹配(*号代替一个单词,#替代零个或多个单词)。交换机可以设置多个routingkey,对应的接收队列可以进行模糊匹配。②②消费者服务启动前,生产者发送给指定交换机的信息不会被丢弃。

headers:根据发送消息内容中的headers属性来匹配。(实际基本不会使用)


整理者:长路 时间:2021.9.25

类型交换机的消费者服务不会接收服务启动前生产者发送的信息,也就是说启动前发送的消息都会被丢弃掉!


应用场景:相同数量及内容多个消费者都能够收到,分别对所有内容进行不同的处理。

direct:①交换机在发送的时候带上routingkey,消息队列在接收时也指定routingkey即可拿到对应交换机想要发送给某个队列的信息,来达到直接准确的发送!②消费者服务启动前,生产者发送给指定交换机的信息会被丢弃。

应用场景:不同类型的日志记录进行不同处理,如窗口执行、磁盘存储。

topic模式(与direct是兄弟关系,或升级关系):①根据严重性,源头去考虑。消费者服务可以进行模糊匹配(*号代替一个单词,#替代零个或多个单词)。交换机可以设置多个routingkey,对应的接收队列可以进行模糊匹配。②②消费者服务启动前,生产者发送给指定交换机的信息不会被丢弃。

headers:根据发送消息内容中的headers属性来匹配。(实际基本不会使用)

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
13天前
|
消息中间件 网络协议 RocketMQ
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
43 3
|
4月前
|
消息中间件 存储 缓存
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
340 7
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
|
5月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
89 2
|
4月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
113 0
|
5月前
|
安全 网络性能优化 网络安全
别再让网络瓶颈困扰你!掌握这十种交换机接口模式,提升你的网络布局技能
【8月更文挑战第23天】交换机作为网络核心,其接口模式直接影响网络布局与性能。本文介绍了十大常见接口模式及其配置实例,包括基础接入模式、优化布线的干道模式、动态学习相邻交换机VLAN信息的动态中继协议模式、固定分配VLAN的静态接入模式、确保语音优先传输的语音VLAN模式、指定默认VLAN的native模式、增加带宽与可靠性的链路聚合及EtherChannel模式、保障网络安全的端口安全模式以及确保关键业务流畅传输的QoS模式。理解并掌握这些模式对于构建高效稳定的网络至关重要。
175 1
|
5月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
88 1
|
4月前
|
消息中间件 存储
RabbitMQ-死信交换机和死信队列
死信队列和死信交换机是RabbitMQ提供的一个非常实用的功能,通过合理使用这一机制,可以大大增强系统的健壮性和可靠性。它们不仅能有效解决消息处理失败的情况,还能为系统的错误追踪、消息延迟处理等提供支持。在设计系统的消息体系时,合理规划和使用死信队列和死信交换机,将会为系统的稳定运行提供一个有力的
83 0
|
5月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
6月前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
下一篇
开通oss服务