公众号merlinsea
Fanout Exchange广播交换机
1、只需要简单的将队列绑定到交换机上,⼀个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像⼦⽹⼴播,每台⼦ ⽹内的主机都获得了⼀份复制的消息。
2、Fanout交换机转发消息是最快的,⽤于发布订阅,⼴播形式,中⽂是扇形 。
3、不处理路由键,即所有发往这种交换机的消息都会转发到与这个广播交换机绑定的所有队列上。
Fanout Exchange交换机的介绍
生产者代码
生产者端端代码负责创建广播交换机,发送消息的时候不需要指定key
public class Recv1 { private final static String EXCHANGE_NAME = "exchange_fanout"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("39.107.221.166"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机,fanout扇形,即广播 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT); //获取队列 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列, fanout交换机不用routingkey channel.queueBind(queueName,EXCHANGE_NAME,""); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body="+new String(body,"utf-8")); //手工确认消息消费,不是多条确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; //消费,关闭消息消息自动确认,重要 channel.basicConsume(queueName,false,consumer); } }
消费者代码
消费者端负责创建队列,并将队列绑定到交换机上,不需要指定binding key
public class Recv1 { private final static String EXCHANGE_NAME = "exchange_fanout"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("39.107.221.166"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机,fanout扇形,即广播 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT); //获取队列 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列, fanout交换机不用routingkey channel.queueBind(queueName,EXCHANGE_NAME,""); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body="+new String(body,"utf-8")); //手工确认消息消费,不是多条确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; //消费,关闭消息消息自动确认,重要 channel.basicConsume(queueName,false,consumer); } }