RabbitMQ实例教程:发布/订阅者消息队列

简介:

消息交换机(Exchange)


  RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列。


wKiom1YZeRCgrXh9AAA8_sKBnCU136.jpg

  相反的,生产者只能发送消息给交换机(Exchange)。交换机的作用非常简单,一边接收从生产者发来的消息,另一边把消息推送到队列中。交换机必须清楚的知道消息如何处理它收到的每一条消息。是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过交换机的类型进行定义。


  交换机的类型有:direct,topic,headers 和 fanout。我们以fanout为例创建一个“logs”类型的交换机。


1
channel.exchangeDeclare( "logs" "fanout" );


  fanout交换机非常简单,它会广播它收到的所有队列的所有消息。


  交换机命名


  在前面的例子中,我们不了解交换机的任何概念,也能发送消息,这是因为我们使用了默认的交换机(""),但以后可以使用我们自定义的交换机了。


1
2
channel.basicPublish( "" "hello" , null, message.getBytes());  // 空字符串交换机
channel.basicPublish(  "logs" "" , null, message.getBytes());  //logs 交换机


  临时队列(Temporary Queues)


  在前面的例子中,我们为队列都指定了具体的名字(如hello和task_queue),给队列命名是非常重要的事情,因为生产者和消费者是队列名称来传递消息的。


  但是对于日志来说的消息队列,我们会监听所有的日志消息,而不是其中的一些子集。而且我们只关注当前发生的消息而不是历史消息,要解决这些问题需要这么做:


  首先,当我们连接Rabbit服务器时,我们需要一个新的空队列。我们可以自己随机生成一个队列名字或者让服务器随机生成一个队列名字。


  其次,当消息消费者失去连接时,队列应该自动删除。


  在Java中,我们使用不带参数的queueDeclare()方法创建一个非持久化的,唯一的,用后自动删除的队列。


1
String queueName = channel.queueDeclare().getQueue();


  queueName可能是像 amq.gen-JzTY20BRgKO-HjmUJj0wLg 这样的随机队列名。


  消息绑定(Bindings)


  前面我们创建了一个fanout类型的交换机和队列。现在需要告诉交换机发送消息到队列。交换机和队列之间的关系就是消息绑定(binding)。


wKioL1YZemfT9Am4AAA7uKydZt4912.jpg

  使用下面的代码logs交换机会将消息传递给队列。


1
channel.queueBind(queueName,  "logs" "" );


  将交换机和消息绑定放在一起


wKiom1YZetLSZ2z_AABbZcUcUF0159.jpg


  现在我们有一个提交日志的的消息生产者,它与我们之前的消息发送者并没有太大的区别,唯一不同的地方是我们将消息发送到 logs 交换机,而不是没有名字的交换机。当发送消息时,我们需要提供一个路由,尽管它在 fanout 交换机中并没有什么作用。下面是提交日志的Java代码。


  EmitLog.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package  com.favccxx.favrabbit;
import  com.rabbitmq.client.Channel;
import  com.rabbitmq.client.Connection;
import  com.rabbitmq.client.ConnectionFactory;
public  class  EmitLog {
  private  static  final  String EXCHANGE_NAME =  "logs" ;
  public  static  void  main(String[] argv)  throws  Exception {
   ConnectionFactory factory =  new  ConnectionFactory();
   factory.setHost( "localhost" );
   Connection connection = factory.newConnection();
   Channel channel = connection.createChannel();
   channel.exchangeDeclare(EXCHANGE_NAME,  "fanout" );
   String[] sendMsgs = { "I" "saw" "a" "dog" };
   String message = getMessage(sendMsgs);
   channel.basicPublish(EXCHANGE_NAME,  "" null , message.getBytes( "UTF-8" ));
   System.out.println( " [x] Sent '"  + message +  "'" );
   channel.close();
   connection.close();
  }
  private  static  String getMessage(String[] strings) {
   if  (strings.length <  1 )
    return  "info: Hello World!" ;
   return  joinStrings(strings,  " " );
  }
  private  static  String joinStrings(String[] strings, String delimiter) {
   int  length = strings.length;
   if  (length ==  0 )
    return  "" ;
   StringBuilder words =  new  StringBuilder(strings[ 0 ]);
   for  ( int  i =  1 ; i < length; i++) {
    words.append(delimiter).append(strings[i]);
   }
   return  words.toString();
  }
}



  正如上面所示,与消息服务器建立连接后,声明了一个交换机,这是因为系统不允许发布到空交换机。 如果没有队列绑定到交换机的话,消息就会丢失,但我们不用担心。如果没有消费者监听消息的话,我们就丢弃该消息。


  接收消息代码ReceiveLogs.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package  com.favccxx.favrabbit;
import  java.io.IOException;
import  com.rabbitmq.client.AMQP;
import  com.rabbitmq.client.Channel;
import  com.rabbitmq.client.Connection;
import  com.rabbitmq.client.ConnectionFactory;
import  com.rabbitmq.client.Consumer;
import  com.rabbitmq.client.DefaultConsumer;
import  com.rabbitmq.client.Envelope;
public  class  ReceiveLogs {
  private  static  final  String EXCHANGE_NAME =  "logs" ;
  public  static  void  main(String[] argv)  throws  Exception {
   ConnectionFactory factory =  new  ConnectionFactory();
   factory.setHost( "localhost" );
   Connection connection = factory.newConnection();
   Channel channel = connection.createChannel();
   channel.exchangeDeclare(EXCHANGE_NAME,  "fanout" );
   String queueName = channel.queueDeclare().getQueue();
   channel.queueBind(queueName, EXCHANGE_NAME,  "" );
   System.out.println( " [*] Waiting for messages. To exit press CTRL+C" );
   Consumer consumer =  new  DefaultConsumer(channel) {
    @Override
    public  void  handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
      byte [] body)  throws  IOException {
     String message =  new  String(body,  "UTF-8" );
     System.out.println( " [x] Received '"  + message +  "'" );
    }
   };
   channel.basicConsume(queueName,  true , consumer);
  }
}



  测试数据


  运行几个日志消息接收者实例,使用日志消息发送者发送消息,发现每个日志消息接收者都接收到同样的数据,说明发布订阅成功。


1
  [x] Received  'I saw a dog'




本文转自 genuinecx 51CTO博客,原文链接:http://blog.51cto.com/favccxx/1701738,如需转载请自行联系原作者
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5天前
|
消息中间件 测试技术
通过轻量消息队列(原MNS)主题HTTP订阅+ARMS实现自定义数据多渠道告警
轻量消息队列(原MNS)以其简单队列模型、轻量化协议及按量后付费模式,成为阿里云产品间消息传输首选。本文通过创建主题、订阅、配置告警集成等步骤,展示了该产品在实际应用中的部分功能,确保消息的可靠传输。
21 2
|
26天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
66 4
|
20天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
24天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
1月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
67 6
|
1月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
1月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
64 4
|
1月前
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
25 0
rabbitmq基础教程(ui,java,springamqp)
|
2月前
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
80 16
|
2月前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
73 9

相关产品

  • 云消息队列 MQ