Direct exchange

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。在本节我们将向其中添加一些特别的功能-比方说我们只让某个消费者订阅发布的部分消息。例如我们只把严重错误消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

前言



我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。在本节我们将向其中添加一些特别的功能-比方说我们只让某个消费者订阅发布的部分消息。例如我们只把严重错误消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。


绑定是交换机和队列之间的桥梁关系。也可以这么理解:


队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey来表示也可称该参数为binding key,创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");绑定之后的意义由其交换类型决定。


Direct exchange介绍



我们希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志消息避免浪费磁盘空间。Fanout这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的广播,在这里我们将使用direct这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingKey队列中去。


adb7915be8dc4ccebbd8b38824bc7901.png


在上面这张图中,我们可以看到X绑定了两个队列,绑定类型是direct。队列Q1绑定键为orange,队列Q2绑定键有两个:一个绑定键为black,另一个绑定键为green.


在这种绑定情况下,生产者发布消息到exchange上,绑定键为orange的消息会被发布到队列


Q1。绑定键为blackgreen和的消息会被发布到队列Q2,其他消息类型的消息将被丢弃。


多重绑定


当然如果exchange的绑定类型是direct,但是它绑定的多个队列的key如果都相同,在这种情况下虽然绑定类型是direct但是它表现的就和fanout有点类似了,就跟广播差不多,如下图所示。


58f5e8d9dc2a4743ae929aec126dee35.png


实战代码



477c8c9069944221b59290571b192e98.png


工具类:


public class untils {
    public static Channel getChannel() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.231.132");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}


消费者1:


public class ReceiveLogsDirect01 {
    private  static  final  String EXCHANG_NAME="direct_logs";
    public static void main(String[] args) throws  Exception{
        Channel channel = untils.getChannel();
        /**
         * 声明交换机
         */
        channel.exchangeDeclare(EXCHANG_NAME, BuiltinExchangeType.DIRECT);
        String queueName="disk";
        channel.queueDeclare(queueName,false,false,false,null);
        channel.queueBind(queueName,EXCHANG_NAME,"error");
        System.out.println("正在准备接收消息....");
        DeliverCallback deliverCallback=(Consumer,delivert)->
        {
            String s = new String(delivert.getBody());
            System.out.println("错误消息--->:"+s);
        };
        channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{} );
    }
}


 消费者2:


public class ReceiveLogsDirect02 {
        private  static  final  String EXCHANG_NAME="direct_logs";
        public static void main(String[] args) throws  Exception{
            Channel channel = untils.getChannel();
            /**
             * 声明交换机
             */
            channel.exchangeDeclare(EXCHANG_NAME, BuiltinExchangeType.DIRECT);
            String queueName="console";
            channel.queueDeclare(queueName,false,false,false,null);
            channel.queueBind(queueName,EXCHANG_NAME,"info");
            channel.queueBind(queueName,EXCHANG_NAME,"warning");
            System.out.println("正在准备接收消息....");
            DeliverCallback deliverCallback=(Consumer, delivert)->
            {
                String s = new String(delivert.getBody());
                System.out.println("info warning消息--->:"+s);
            };
            channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{} );
        }
}


生产者:


public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] args)throws  Exception {
        Channel channel = untils.getChannel();
        //创建多个bindingKeyMap
        Map<String,String> bindingKeyMap=new HashMap<>();
        bindingKeyMap.put("info","普通info消息");
        bindingKeyMap.put("warning","警告warning消息");
        bindingKeyMap.put("error","错误error消息");
        //debug没有的消费者接收这个消息,所以就失去
        bindingKeyMap.put("debug","调试debug消息");
        for (Map.Entry<String,String> bindingKeyEntry:bindingKeyMap.entrySet())
        {
            String bindingKey=bindingKeyEntry.getKey();
            String message=bindingKeyEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME,bindingKey,null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发出消息"+message);
        }
    }
}


结果:


6039fd7318794affb32eb5d0acac8881.jpgf5948bef9f2946cab4e5a7aa050cc799.jpg2e6f1bf287f94db5990338fc4c118420.jpg


相关实践学习
通过日志服务实现云资源OSS的安全审计
本实验介绍如何通过日志服务实现云资源OSS的安全审计。
相关文章
|
存储 数据建模 数据库
IOS开发数据存储:什么是 UserDefaults?有哪些替代方案?
IOS开发数据存储:什么是 UserDefaults?有哪些替代方案?
245 0
|
Web App开发 Java Linux
Nexus【部署 02】最新版本 nexus-3.35.0-02-unix.tar.gz 安装配置启动及测试(JDK版本+虚拟机参数配置说明)
Nexus【部署 02】最新版本 nexus-3.35.0-02-unix.tar.gz 安装配置启动及测试(JDK版本+虚拟机参数配置说明)
980 0
|
移动开发 监控 前端开发
如何从0-1的建设云上稳定性?
本文将从前后端的视角整体看下我们在云上稳定性治理的一些路径和经验。首先从平台的系统架构模型出发,站在全局视角看下整个平台的风险。
|
12月前
|
边缘计算 自动驾驶 物联网
5G技术的低延迟目标及其对4G的显著改进
5G技术的低延迟目标及其对4G的显著改进
693 0
|
存储 前端开发 Java
(二)JVM成神路之剖析Java类加载子系统、双亲委派机制及线程上下文类加载器
上篇《初识Java虚拟机》文章中曾提及到:我们所编写的Java代码经过编译之后,会生成对应的class字节码文件,而在程序启动时会通过类加载子系统将这些字节码文件先装载进内存,然后再交由执行引擎执行。本文中则会对Java虚拟机的类加载机制以及执行引擎进行全面分析。
249 0
|
存储 安全 Linux
Podman入门全指南:安装、配置与运行容器
Podman入门全指南:安装、配置与运行容器
7619 1
|
机器学习/深度学习 PyTorch TensorFlow
TensorFlow、Keras 和 Python 构建神经网络分析鸢尾花iris数据集|代码数据分享
TensorFlow、Keras 和 Python 构建神经网络分析鸢尾花iris数据集|代码数据分享
|
XML 人工智能 JSON
【AI大模型应用开发】【LangChain系列】5. 实战LangChain的智能体Agents模块
【AI大模型应用开发】【LangChain系列】5. 实战LangChain的智能体Agents模块
901 0
|
存储 JavaScript 前端开发
Vue 面试题汇总(一)
Vue 面试题汇总(一)
215 0
|
小程序
uniapp悬浮图标支持拖动支持微信小程序
uniapp悬浮图标支持拖动支持微信小程序
273 0