RabbitMQ实例教程:发布/订阅者消息队列

简介:

消息交换机(Exchange)


  RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列。


wKiom1YZeRCgrXh9AAA8_sKBnCU136.jpg

  相反的,生产者只能发送消息给交换机(Exchange)。交换机的作用非常简单,一边接收从生产者发来的消息,另一边把消息推送到队列中。交换机必须清楚的知道消息如何处理它收到的每一条消息。是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过交换机的类型进行定义。


  交换机的类型有:direct,topic,headers 和 fanout。我们以fanout为例创建一个“logs”类型的交换机。


1
channel.exchangeDeclare( "logs" "fanout" );


  fanout交换机非常简单,它会广播它收到的所有队列的所有消息。


  交换机命名


  在前面的例子中,我们不了解交换机的任何概念,也能发送消息,这是因为我们使用了默认的交换机(""),但以后可以使用我们自定义的交换机了。


1
2
channel.basicPublish( "" "hello" , null, message.getBytes());  // 空字符串交换机
channel.basicPublish(  "logs" "" , null, message.getBytes());  //logs 交换机


  临时队列(Temporary Queues)


  在前面的例子中,我们为队列都指定了具体的名字(如hello和task_queue),给队列命名是非常重要的事情,因为生产者和消费者是队列名称来传递消息的。


  但是对于日志来说的消息队列,我们会监听所有的日志消息,而不是其中的一些子集。而且我们只关注当前发生的消息而不是历史消息,要解决这些问题需要这么做:


  首先,当我们连接Rabbit服务器时,我们需要一个新的空队列。我们可以自己随机生成一个队列名字或者让服务器随机生成一个队列名字。


  其次,当消息消费者失去连接时,队列应该自动删除。


  在Java中,我们使用不带参数的queueDeclare()方法创建一个非持久化的,唯一的,用后自动删除的队列。


1
String queueName = channel.queueDeclare().getQueue();


  queueName可能是像 amq.gen-JzTY20BRgKO-HjmUJj0wLg 这样的随机队列名。


  消息绑定(Bindings)


  前面我们创建了一个fanout类型的交换机和队列。现在需要告诉交换机发送消息到队列。交换机和队列之间的关系就是消息绑定(binding)。


wKioL1YZemfT9Am4AAA7uKydZt4912.jpg

  使用下面的代码logs交换机会将消息传递给队列。


1
channel.queueBind(queueName,  "logs" "" );


  将交换机和消息绑定放在一起


wKiom1YZetLSZ2z_AABbZcUcUF0159.jpg


  现在我们有一个提交日志的的消息生产者,它与我们之前的消息发送者并没有太大的区别,唯一不同的地方是我们将消息发送到 logs 交换机,而不是没有名字的交换机。当发送消息时,我们需要提供一个路由,尽管它在 fanout 交换机中并没有什么作用。下面是提交日志的Java代码。


  EmitLog.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package  com.favccxx.favrabbit;
import  com.rabbitmq.client.Channel;
import  com.rabbitmq.client.Connection;
import  com.rabbitmq.client.ConnectionFactory;
public  class  EmitLog {
  private  static  final  String EXCHANGE_NAME =  "logs" ;
  public  static  void  main(String[] argv)  throws  Exception {
   ConnectionFactory factory =  new  ConnectionFactory();
   factory.setHost( "localhost" );
   Connection connection = factory.newConnection();
   Channel channel = connection.createChannel();
   channel.exchangeDeclare(EXCHANGE_NAME,  "fanout" );
   String[] sendMsgs = { "I" "saw" "a" "dog" };
   String message = getMessage(sendMsgs);
   channel.basicPublish(EXCHANGE_NAME,  "" null , message.getBytes( "UTF-8" ));
   System.out.println( " [x] Sent '"  + message +  "'" );
   channel.close();
   connection.close();
  }
  private  static  String getMessage(String[] strings) {
   if  (strings.length <  1 )
    return  "info: Hello World!" ;
   return  joinStrings(strings,  " " );
  }
  private  static  String joinStrings(String[] strings, String delimiter) {
   int  length = strings.length;
   if  (length ==  0 )
    return  "" ;
   StringBuilder words =  new  StringBuilder(strings[ 0 ]);
   for  ( int  i =  1 ; i < length; i++) {
    words.append(delimiter).append(strings[i]);
   }
   return  words.toString();
  }
}



  正如上面所示,与消息服务器建立连接后,声明了一个交换机,这是因为系统不允许发布到空交换机。 如果没有队列绑定到交换机的话,消息就会丢失,但我们不用担心。如果没有消费者监听消息的话,我们就丢弃该消息。


  接收消息代码ReceiveLogs.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package  com.favccxx.favrabbit;
import  java.io.IOException;
import  com.rabbitmq.client.AMQP;
import  com.rabbitmq.client.Channel;
import  com.rabbitmq.client.Connection;
import  com.rabbitmq.client.ConnectionFactory;
import  com.rabbitmq.client.Consumer;
import  com.rabbitmq.client.DefaultConsumer;
import  com.rabbitmq.client.Envelope;
public  class  ReceiveLogs {
  private  static  final  String EXCHANGE_NAME =  "logs" ;
  public  static  void  main(String[] argv)  throws  Exception {
   ConnectionFactory factory =  new  ConnectionFactory();
   factory.setHost( "localhost" );
   Connection connection = factory.newConnection();
   Channel channel = connection.createChannel();
   channel.exchangeDeclare(EXCHANGE_NAME,  "fanout" );
   String queueName = channel.queueDeclare().getQueue();
   channel.queueBind(queueName, EXCHANGE_NAME,  "" );
   System.out.println( " [*] Waiting for messages. To exit press CTRL+C" );
   Consumer consumer =  new  DefaultConsumer(channel) {
    @Override
    public  void  handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
      byte [] body)  throws  IOException {
     String message =  new  String(body,  "UTF-8" );
     System.out.println( " [x] Received '"  + message +  "'" );
    }
   };
   channel.basicConsume(queueName,  true , consumer);
  }
}



  测试数据


  运行几个日志消息接收者实例,使用日志消息发送者发送消息,发现每个日志消息接收者都接收到同样的数据,说明发布订阅成功。


1
  [x] Received  'I saw a dog'




本文转自 genuinecx 51CTO博客,原文链接:http://blog.51cto.com/favccxx/1701738,如需转载请自行联系原作者
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件 Java 测试技术
消息队列 MQ使用问题之数据流出规则是否支持平台的云RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
15天前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
52 4
|
13天前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
19天前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
59 4
|
2月前
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
75 16
|
2月前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
70 9
|
2月前
|
消息中间件 监控 数据处理
解决方案 | 云消息队列RabbitMQ实践
解决方案 | 云消息队列RabbitMQ实践
45 1
|
2月前
|
消息中间件 弹性计算 运维
云消息队列RabbitMQ实践
本评测报告详细分析了阿里云云消息队列 RabbitMQ 版的实践原理、部署体验及核心优势。报告认为其在解决消息积压、脑裂难题及弹性伸缩方面表现优秀,但建议进一步细化架构优化策略和技术细节描述。部署文档详尽,对初学者友好,但仍需加强网络配置和版本兼容性说明。实际部署展示了其高可用性和成本优化能力,适用于高并发消息处理和分布式系统数据同步。为进一步提升方案,建议增加安全性配置指导、性能调优建议及监控告警系统设置。
|
3月前
|
消息中间件 存储 Java
【干货】看看我司消息队列用啥,全网最接地气pulsar教程(含业务解耦demo源码)
本文介绍了Apache Pulsar消息队列系统的核心特性及其与其它消息队列的区别,通过Docker安装Pulsar及Pulsar Manager,并结合电商业务场景,对比了串行执行与使用Pulsar实现异步解耦的优势,最后通过Java代码示例展示了如何利用Pulsar解决实际业务问题。
137 2
【干货】看看我司消息队列用啥,全网最接地气pulsar教程(含业务解耦demo源码)

相关产品

  • 云消息队列 MQ