公众号merlinsea
Direct Exchange 直连交换机
1、将⼀个队列绑定到交换机上,要求该消息与⼀个特定的路由键完全匹配
2、例⼦:如果⼀个队列绑定到该交换机上要求路由键“aabb”,则只有被标记为“aabb”的消息才被转发,不会转发aabb.cc,也不会 转发gg.aabb,只会转发aabb 3、会处理路由键,必须【完全匹配】才进行转发
Direct Exchange交换机介绍
生产者代码
生产者在发送消息的时候要指定这个消息的key
public class Send { private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("39.107.221.166"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); //JDK7语法,自动关闭,创建连接 try (Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel()) { //绑定交换机,直连交换机 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); String error = "lianglin error日志消息"; String info = "lianglin info日志消息"; String debug = "lianglin debug日志消息"; channel.basicPublish(EXCHANGE_NAME,"errorRoutingKey",null,error.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME,"infoRoutingKey",null,info.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME,"debugRoutingKey",null,debug.getBytes(StandardCharsets.UTF_8)); System.out.println("direct消息发送成功"); } } }
消费者代码
消费者端会创建队列,创建的队列和exchange绑定的时候需要指定binding key
public class Recv1 { private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("39.107.221.166"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机,fanout扇形,即广播 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); //获取队列 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列, direct交换机需要指定routingkey //队列在接收端创建 channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey"); channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey"); channel.queueBind(queueName,EXCHANGE_NAME,"debugRoutingKey"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body="+new String(body,"utf-8")); //手工确认消息消费,不是多条确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; //消费,关闭消息消息自动确认,重要 channel.basicConsume(queueName,false,consumer); } }
说明:
在实际开发的过程中,是不会像上面这样编写的,通常创建交换机,队列等操作要交由管理员来执行,消费者和生产者只负责消费消息和生产消息。