RabbitMQ实例教程:路由选择

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介:

  在前面的例子中,我们构建了一个简单的日志系统来日志消息通过广播传送到多个接受者。本文将介绍如何订阅消息的子集。比如,我们能够将关键的错误信息写到日志文件中,同时也能够在控制台打印所有的日志消息。


  消息绑定(Bindings)


  在前面的例子中,我们使用下面的代码方式再次绑定。


1
channel.queueBind(queueName, EXCHANGE_NAME,  "" );


  绑定表述的是交换机和队列之间的关系,可以理解为队列对交换机中的消息感兴趣。


  绑定的参数是 routingKey,为避免混淆,我们将basic_publish参数称为binding key,下面是使用关键字绑定binding的例子。


1
channel.queueBind(queueName, EXCHANGE_NAME,  "black" );

  

  它表示绑定关键字依赖于交换机类型,对于我们之前定义的 fanout 交换机,它会忽略绑定的关键字。即为所有订阅者发送消息。


  Direct 交换机(Direct exchange)


  在前面的例子中,我们将所有消息广播给了所有消费者,现在我们想在原有的基础上做些改进对日志消息进行过滤。比如我们只把严重错误的日志写入日志文件,这样能节省磁盘空间。


  fanout交换机的灵活性比较差,它只是盲目的广播所有消息,现在使用direct交换机代替它。direct交换机的算法非常简单-当绑定关键字与消息的路由关键字匹配时就会将消息传递给队列。


wKiom1Yb_BaBH97lAABhpyYpPKI061.jpg


  如上图所示,X交换机绑定了两个队列Q1和Q2。Q1队列绑定了orange关键字,Q2队列绑定green和black关键字。路由中带有orange路由关键字的消息会转发给Q1,带有black或green关键字的消息会转发给Q2,其它的消息就会废弃。


  多重绑定(Multiple bindings)


wKioL1Yb_DSCBsmYAABZ02FvZAM164.jpg


  一个路由使用同一个绑定代码绑定到多个队列是合法的。上图中,X路由器使用black代码绑定到Q1和Q2。这种情况下,direct交换机会像fanout交换机一样,把消息广播给所有队列。


  提交日志


  我们继续使用日志系统作为我们的模型。我们使用direct交换机来代替fanout交换机,将日志等级severity作为路由代码,这样我们就能根据日志等级决定接收哪些消息。


  (1)创建direct交换机


1
channel.exchangeDeclare(EXCHANGE_NAME,  "direct" );


  (2)准备发送消息


1
channel.basicPublish(EXCHANGE_NAME, severity,  null , message.getBytes());

    

  (3)将日志等级划分为:info,warning和error,根据日志等级创建不同的绑定。


  消息订阅


  接收消息与之前中的例子一样,不同之处在于会对每个 Severity 创建一个新的绑定。


1
2
3
4
String queueName = channel.queueDeclare().getQueue();
for (String severity : argv){    
   channel.queueBind(queueName, EXCHANGE_NAME, severity);
}


  大总结

wKiom1Yb_LPjrxnCAAB_Hhio3IQ776.jpg


  EmitLogDirect.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package  com.favccxx.favrabbit;
 
import  com.rabbitmq.client.Channel;
import  com.rabbitmq.client.Connection;
import  com.rabbitmq.client.ConnectionFactory;
 
public  class  EmitLogDirect {
 
     private  static  final  String EXCHANGE_NAME =  "direct_logs" ;
 
     public  static  void  main(String[] argv)  throws  Exception {
         ConnectionFactory factory =  new  ConnectionFactory();
         factory.setHost( "localhost" );
         Connection connection = factory.newConnection();
         Channel channel = connection.createChannel();
 
         channel.exchangeDeclare(EXCHANGE_NAME,  "direct" );
 
         String[] serveritys = { "debug" "info" "warning" "error" };
         String[] messages = { "This is a DEBUG message!" "This is a INFO message!" "This is a WARNING message!" "This is a ERROR message!" };
         
         for ( int  i= 0 ; i<serveritys.length; i++){
             channel.basicPublish(EXCHANGE_NAME, serveritys[i],  null , messages[i].getBytes( "UTF-8" ));
             System.out.println( " [x] Sent '"  + serveritys[i] +  "':'"  + messages[i] +  "'" );
         }
         
         
         channel.close();
         connection.close();
     }
 
}


  控制台输出内容如下


1
2
3
4
  [x] Sent  'debug' : 'This is a DEBUG message!'
  [x] Sent  'info' : 'This is a INFO message!'
  [x] Sent  'warning' : 'This is a WARNING message!'
  [x] Sent  'error' : 'This is a ERROR message!'


  为了简化代码实现,我们使用了一个for循环来分别接收各种不同严重程度的日志信息。但在一个真实的应用环境中,我们可能需要3~4个接收器。


  ReceiveLogsDirect.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package  com.favccxx.favrabbit;
 
import  java.io.IOException;
 
import  com.rabbitmq.client.AMQP;
import  com.rabbitmq.client.Channel;
import  com.rabbitmq.client.Connection;
import  com.rabbitmq.client.ConnectionFactory;
import  com.rabbitmq.client.Consumer;
import  com.rabbitmq.client.DefaultConsumer;
import  com.rabbitmq.client.Envelope;
 
public  class  ReceiveLogsDirect {
 
     private  static  final  String EXCHANGE_NAME =  "direct_logs" ;
 
     public  static  void  main(String[] argv)  throws  Exception {
         ConnectionFactory factory =  new  ConnectionFactory();
         factory.setHost( "localhost" );
         Connection connection = factory.newConnection();
         Channel channel = connection.createChannel();
 
         channel.exchangeDeclare(EXCHANGE_NAME,  "direct" );
         String queueName = channel.queueDeclare().getQueue();
 
         String[] serveritys = {  "debug" "info" "warning" "error"  };
         for  (String severity : serveritys) {
             channel.queueBind(queueName, EXCHANGE_NAME, severity);
             Consumer consumer =  new  DefaultConsumer(channel) {
                 @Override
                 public  void  handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                         byte [] body)  throws  IOException {
                     String message =  new  String(body,  "UTF-8" );
                     System.out.println( " [x] Received '"  + envelope.getRoutingKey() +  "':'"  + message +  "'" );
                 }
             };
             channel.basicConsume(queueName,  true , consumer);
         }
 
     }
}


  控制台输出内容如下


1
2
3
4
  [x] Received  'debug' : 'This is a DEBUG message!'
  [x] Received  'info' : 'This is a INFO message!'
  [x] Received  'warning' : 'This is a WARNING message!'
  [x] Received  'error' : 'This is a ERROR message!'





本文转自 genuinecx 51CTO博客,原文链接:http://blog.51cto.com/favccxx/1702327,如需转载请自行联系原作者
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 监控 Docker
Docker环境下快速部署RabbitMQ教程。
就这样,你成功地用魔法召唤出了RabbitMQ,还把它和你的应用程序连接了起来。现在,消息会像小溪流水一样,在你的系统中自由流淌。别忘了,兔子们不喜欢孤独,他们需要你细心的关怀,不时地监控它们,确保他们的世界运转得井井有条。
337 18
|
消息中间件 Java RocketMQ
RocketMQ实战教程之RocketMQ安装
这是一篇关于RocketMQ安装的实战教程,主要介绍了在CentOS系统上使用传统安装和Docker两种方式安装RocketMQ。首先,系统需要是64位,并且已经安装了JDK 1.8。传统安装包括下载安装包,解压并启动NameServer和Broker。Docker安装则涉及安装docker和docker-compose,然后通过docker-compose.yaml文件配置并启动服务。教程还提供了启动命令和解决问题的提示。
|
消息中间件 Kafka
面试题Kafka问题之RabbitMQ的路由配置工作如何解决
面试题Kafka问题之RabbitMQ的路由配置工作如何解决
277 57
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
174 0
rabbitmq基础教程(ui,java,springamqp)
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
929 2
|
消息中间件 监控 Ubuntu
RabbitMQ安装配置,超详细版教程
以上步骤为您提供了在Linux环境下安装RabbitMQ的详细过程。安装Erlang作为基础,然后通过添加官方源并安装RabbitMQ本身,最后对服务进行配置并启用Web管理界面。这些步骤操作简单直观,只需要跟随上述指南,即可在短时间内将RabbitMQ服务器运行起来,并进行进一步的配置和管理。不要忘记硬件和网络资源对性能的影响,确保RabbitMQ能够满足您的应用需求。
1152 0
|
消息中间件 存储 Apache
RocketMQ实战教程之常见概念和模型
Apache RocketMQ 实战教程介绍了其核心概念和模型。消息是基本的数据传输单元,主题是消息的分类容器,支持字节、数字和短划线命名,最长64个字符。消息类型包括普通、顺序、事务和定时/延时消息。消息队列是实际存储和传输消息的容器,是主题的分区。消费者分组是一组行为一致的消费者的逻辑集合,也有命名限制。此外,文档还提到了一些使用约束和建议,如主题和消费者组名的命名规则,消息大小限制,请求超时时间等。RocketMQ 提供了多种消息模型,包括发布/订阅模型,有助于理解和优化消息处理。
|
消息中间件 存储 Java
RocketMQ实战教程之NameServer与BrokerServer
这是一个关于RocketMQ实战教程的概要,主要讨论NameServer和BrokerServer的角色。NameServer负责管理所有BrokerServer,而BrokerServer存储和传输消息。生产者和消费者通过NameServer找到合适的Broker进行交互,不需要直接知道Broker的具体信息。工作流程包括生产者向NameServer查询后发送消息到Broker,以及消费者同样通过NameServer获取消息进行消费。这种设计类似于服务注册中心的概念,便于系统扩展和集群管理。
|
消息中间件 Java RocketMQ
教程:Spring Boot整合RocketMQ的配置与优化
教程:Spring Boot整合RocketMQ的配置与优化
|
消息中间件 Java Spring
最新spingboot整合rabbitmq详细教程
最新spingboot整合rabbitmq详细教程