消息队列中的fanout exchange

简介: 消息队列中的fanout exchange

公众号merlinsea


Fanout Exchange广播交换机


1、只需要简单的将队列绑定到交换机上,⼀个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像⼦⽹⼴播,每台⼦ ⽹内的主机都获得了⼀份复制的消息。

2、Fanout交换机转发消息是最快的,⽤于发布订阅,⼴播形式,中⽂是扇形 。

3、不处理路由键,即所有发往这种交换机的消息都会转发到与这个广播交换机绑定的所有队列上。


Fanout Exchange交换机的介绍

640.jpg

生产者代码

生产者端端代码负责创建广播交换机,发送消息的时候不需要指定key


public class Recv1 {
    private final static String EXCHANGE_NAME = "exchange_fanout";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("39.107.221.166");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机,fanout扇形,即广播
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
        //获取队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列, fanout交换机不用routingkey
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body="+new String(body,"utf-8"));
                //手工确认消息消费,不是多条确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //消费,关闭消息消息自动确认,重要
        channel.basicConsume(queueName,false,consumer);
    }
}


消费者代码

消费者端负责创建队列,并将队列绑定到交换机上,不需要指定binding key


public class Recv1 {
    private final static String EXCHANGE_NAME = "exchange_fanout";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("39.107.221.166");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机,fanout扇形,即广播
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
        //获取队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列, fanout交换机不用routingkey
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body="+new String(body,"utf-8"));
                //手工确认消息消费,不是多条确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //消费,关闭消息消息自动确认,重要
        channel.basicConsume(queueName,false,consumer);
    }
}


相关文章
|
10月前
|
消息中间件
消息队列中的topic exchange
消息队列中的topic exchange
|
10月前
|
消息中间件
消息队列中的direct exchange
消息队列中的direct exchange
|
2月前
|
消息中间件 存储 监控
RabbitMQ:分布式系统中的高效消息队列
RabbitMQ:分布式系统中的高效消息队列
|
7月前
|
消息中间件 存储 网络协议
企业实战(11)消息队列之Docker安装部署RabbitMQ实战
企业实战(11)消息队列之Docker安装部署RabbitMQ实战
131 0
|
8月前
|
消息中间件 存储 监控
【图解RabbitMQ-3】消息队列RabbitMQ介绍及核心流程
【图解RabbitMQ-3】消息队列RabbitMQ介绍及核心流程
229 0
|
2月前
|
消息中间件 Java
springboot整合消息队列——RabbitMQ
springboot整合消息队列——RabbitMQ
76 0
|
4月前
|
消息中间件 JSON Java
RabbitMQ消息队列
RabbitMQ消息队列
46 0
|
4月前
|
消息中间件
RabbitMQ 实现消息队列延迟
RabbitMQ 实现消息队列延迟
124 0
|
28天前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
18 0