消息队列中的direct exchange

简介: 消息队列中的direct exchange

公众号merlinsea


Direct Exchange 直连交换机


1、将⼀个队列绑定到交换机上,要求该消息与⼀个特定的路由键完全匹配

2、例⼦:如果⼀个队列绑定到该交换机上要求路由键“aabb”,则只有被标记为“aabb”的消息才被转发,不会转发aabb.cc,也不会 转发gg.aabb,只会转发aabb 3、会处理路由键,必须【完全匹配】才进行转发


Direct Exchange交换机介绍

640.jpg

生产者代码


生产者在发送消息的时候要指定这个消息的key

public class Send {
    private final static String EXCHANGE_NAME = "exchange_direct";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("39.107.221.166");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        //JDK7语法,自动关闭,创建连接
        try (Connection connection = factory.newConnection();
             //创建信道
             Channel channel = connection.createChannel()) {
            //绑定交换机,直连交换机
            channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
            String error = "lianglin error日志消息";
            String info = "lianglin info日志消息";
            String debug = "lianglin debug日志消息";
            channel.basicPublish(EXCHANGE_NAME,"errorRoutingKey",null,error.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME,"infoRoutingKey",null,info.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME,"debugRoutingKey",null,debug.getBytes(StandardCharsets.UTF_8));
            System.out.println("direct消息发送成功");
        }
    }
}


消费者代码


消费者端会创建队列,创建的队列和exchange绑定的时候需要指定binding key

public class Recv1 {
    private final static String EXCHANGE_NAME = "exchange_direct";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("39.107.221.166");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机,fanout扇形,即广播
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
        //获取队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列, direct交换机需要指定routingkey
        //队列在接收端创建
        channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");
        channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey");
        channel.queueBind(queueName,EXCHANGE_NAME,"debugRoutingKey");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body="+new String(body,"utf-8"));
                //手工确认消息消费,不是多条确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //消费,关闭消息消息自动确认,重要
        channel.basicConsume(queueName,false,consumer);
    }
}


说明:

在实际开发的过程中,是不会像上面这样编写的,通常创建交换机,队列等操作要交由管理员来执行,消费者和生产者只负责消费消息和生产消息。



相关文章
|
消息中间件
消息队列中的topic exchange
消息队列中的topic exchange
|
消息中间件
消息队列中的fanout exchange
消息队列中的fanout exchange
|
1天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
4月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
34 0
手撸MQ消息队列——循环数组
|
3月前
|
消息中间件 存储 缓存
一个用过消息队列的人,竟不知为何要用 MQ?
一个用过消息队列的人,竟不知为何要用 MQ?
135 1
|
4月前
|
消息中间件 开发工具 RocketMQ
消息队列 MQ使用问题之一直连接master失败,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 Prometheus 监控
消息队列 MQ使用问题之如何将旧集群的store目录迁移到新集群
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 安全 PHP
消息队列 MQ使用问题之如何获取PHP客户端代码
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。