前言
我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。在本节我们将向其中添加一些特别的功能-比方说我们只让某个消费者订阅发布的部分消息。例如我们只把严重错误消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
绑定是交换机和队列之间的桥梁关系。也可以这么理解:
队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey来表示也可称该参数为binding key,创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");绑定之后的意义由其交换类型决定。
Direct exchange介绍
我们希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志消息避免浪费磁盘空间。Fanout这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的广播,在这里我们将使用direct这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingKey队列中去。
在上面这张图中,我们可以看到X绑定了两个队列,绑定类型是direct。队列Q1绑定键为orange,队列Q2绑定键有两个:一个绑定键为black,另一个绑定键为green.
在这种绑定情况下,生产者发布消息到exchange上,绑定键为orange的消息会被发布到队列
Q1。绑定键为blackgreen和的消息会被发布到队列Q2,其他消息类型的消息将被丢弃。
多重绑定
当然如果exchange的绑定类型是direct,但是它绑定的多个队列的key如果都相同,在这种情况下虽然绑定类型是direct但是它表现的就和fanout有点类似了,就跟广播差不多,如下图所示。
实战代码
工具类:
public class untils { public static Channel getChannel() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.231.132"); factory.setUsername("admin"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
消费者1:
public class ReceiveLogsDirect01 { private static final String EXCHANG_NAME="direct_logs"; public static void main(String[] args) throws Exception{ Channel channel = untils.getChannel(); /** * 声明交换机 */ channel.exchangeDeclare(EXCHANG_NAME, BuiltinExchangeType.DIRECT); String queueName="disk"; channel.queueDeclare(queueName,false,false,false,null); channel.queueBind(queueName,EXCHANG_NAME,"error"); System.out.println("正在准备接收消息...."); DeliverCallback deliverCallback=(Consumer,delivert)-> { String s = new String(delivert.getBody()); System.out.println("错误消息--->:"+s); }; channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{} ); } }
消费者2:
public class ReceiveLogsDirect02 { private static final String EXCHANG_NAME="direct_logs"; public static void main(String[] args) throws Exception{ Channel channel = untils.getChannel(); /** * 声明交换机 */ channel.exchangeDeclare(EXCHANG_NAME, BuiltinExchangeType.DIRECT); String queueName="console"; channel.queueDeclare(queueName,false,false,false,null); channel.queueBind(queueName,EXCHANG_NAME,"info"); channel.queueBind(queueName,EXCHANG_NAME,"warning"); System.out.println("正在准备接收消息...."); DeliverCallback deliverCallback=(Consumer, delivert)-> { String s = new String(delivert.getBody()); System.out.println("info warning消息--->:"+s); }; channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{} ); } }
生产者:
public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args)throws Exception { Channel channel = untils.getChannel(); //创建多个bindingKeyMap Map<String,String> bindingKeyMap=new HashMap<>(); bindingKeyMap.put("info","普通info消息"); bindingKeyMap.put("warning","警告warning消息"); bindingKeyMap.put("error","错误error消息"); //debug没有的消费者接收这个消息,所以就失去 bindingKeyMap.put("debug","调试debug消息"); for (Map.Entry<String,String> bindingKeyEntry:bindingKeyMap.entrySet()) { String bindingKey=bindingKeyEntry.getKey(); String message=bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,bindingKey,null,message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发出消息"+message); } } }
结果: