RabbitMQ实例教程:发布/订阅者消息队列

简介:

消息交换机(Exchange)


  RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列。


wKiom1YZeRCgrXh9AAA8_sKBnCU136.jpg

  相反的,生产者只能发送消息给交换机(Exchange)。交换机的作用非常简单,一边接收从生产者发来的消息,另一边把消息推送到队列中。交换机必须清楚的知道消息如何处理它收到的每一条消息。是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过交换机的类型进行定义。


  交换机的类型有:direct,topic,headers 和 fanout。我们以fanout为例创建一个“logs”类型的交换机。


1
channel.exchangeDeclare( "logs" "fanout" );


  fanout交换机非常简单,它会广播它收到的所有队列的所有消息。


  交换机命名


  在前面的例子中,我们不了解交换机的任何概念,也能发送消息,这是因为我们使用了默认的交换机(""),但以后可以使用我们自定义的交换机了。


1
2
channel.basicPublish( "" "hello" , null, message.getBytes());  // 空字符串交换机
channel.basicPublish(  "logs" "" , null, message.getBytes());  //logs 交换机


  临时队列(Temporary Queues)


  在前面的例子中,我们为队列都指定了具体的名字(如hello和task_queue),给队列命名是非常重要的事情,因为生产者和消费者是队列名称来传递消息的。


  但是对于日志来说的消息队列,我们会监听所有的日志消息,而不是其中的一些子集。而且我们只关注当前发生的消息而不是历史消息,要解决这些问题需要这么做:


  首先,当我们连接Rabbit服务器时,我们需要一个新的空队列。我们可以自己随机生成一个队列名字或者让服务器随机生成一个队列名字。


  其次,当消息消费者失去连接时,队列应该自动删除。


  在Java中,我们使用不带参数的queueDeclare()方法创建一个非持久化的,唯一的,用后自动删除的队列。


1
String queueName = channel.queueDeclare().getQueue();


  queueName可能是像 amq.gen-JzTY20BRgKO-HjmUJj0wLg 这样的随机队列名。


  消息绑定(Bindings)


  前面我们创建了一个fanout类型的交换机和队列。现在需要告诉交换机发送消息到队列。交换机和队列之间的关系就是消息绑定(binding)。


wKioL1YZemfT9Am4AAA7uKydZt4912.jpg

  使用下面的代码logs交换机会将消息传递给队列。


1
channel.queueBind(queueName,  "logs" "" );


  将交换机和消息绑定放在一起


wKiom1YZetLSZ2z_AABbZcUcUF0159.jpg


  现在我们有一个提交日志的的消息生产者,它与我们之前的消息发送者并没有太大的区别,唯一不同的地方是我们将消息发送到 logs 交换机,而不是没有名字的交换机。当发送消息时,我们需要提供一个路由,尽管它在 fanout 交换机中并没有什么作用。下面是提交日志的Java代码。


  EmitLog.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package  com.favccxx.favrabbit;
import  com.rabbitmq.client.Channel;
import  com.rabbitmq.client.Connection;
import  com.rabbitmq.client.ConnectionFactory;
public  class  EmitLog {
  private  static  final  String EXCHANGE_NAME =  "logs" ;
  public  static  void  main(String[] argv)  throws  Exception {
   ConnectionFactory factory =  new  ConnectionFactory();
   factory.setHost( "localhost" );
   Connection connection = factory.newConnection();
   Channel channel = connection.createChannel();
   channel.exchangeDeclare(EXCHANGE_NAME,  "fanout" );
   String[] sendMsgs = { "I" "saw" "a" "dog" };
   String message = getMessage(sendMsgs);
   channel.basicPublish(EXCHANGE_NAME,  "" null , message.getBytes( "UTF-8" ));
   System.out.println( " [x] Sent '"  + message +  "'" );
   channel.close();
   connection.close();
  }
  private  static  String getMessage(String[] strings) {
   if  (strings.length <  1 )
    return  "info: Hello World!" ;
   return  joinStrings(strings,  " " );
  }
  private  static  String joinStrings(String[] strings, String delimiter) {
   int  length = strings.length;
   if  (length ==  0 )
    return  "" ;
   StringBuilder words =  new  StringBuilder(strings[ 0 ]);
   for  ( int  i =  1 ; i < length; i++) {
    words.append(delimiter).append(strings[i]);
   }
   return  words.toString();
  }
}



  正如上面所示,与消息服务器建立连接后,声明了一个交换机,这是因为系统不允许发布到空交换机。 如果没有队列绑定到交换机的话,消息就会丢失,但我们不用担心。如果没有消费者监听消息的话,我们就丢弃该消息。


  接收消息代码ReceiveLogs.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package  com.favccxx.favrabbit;
import  java.io.IOException;
import  com.rabbitmq.client.AMQP;
import  com.rabbitmq.client.Channel;
import  com.rabbitmq.client.Connection;
import  com.rabbitmq.client.ConnectionFactory;
import  com.rabbitmq.client.Consumer;
import  com.rabbitmq.client.DefaultConsumer;
import  com.rabbitmq.client.Envelope;
public  class  ReceiveLogs {
  private  static  final  String EXCHANGE_NAME =  "logs" ;
  public  static  void  main(String[] argv)  throws  Exception {
   ConnectionFactory factory =  new  ConnectionFactory();
   factory.setHost( "localhost" );
   Connection connection = factory.newConnection();
   Channel channel = connection.createChannel();
   channel.exchangeDeclare(EXCHANGE_NAME,  "fanout" );
   String queueName = channel.queueDeclare().getQueue();
   channel.queueBind(queueName, EXCHANGE_NAME,  "" );
   System.out.println( " [*] Waiting for messages. To exit press CTRL+C" );
   Consumer consumer =  new  DefaultConsumer(channel) {
    @Override
    public  void  handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
      byte [] body)  throws  IOException {
     String message =  new  String(body,  "UTF-8" );
     System.out.println( " [x] Received '"  + message +  "'" );
    }
   };
   channel.basicConsume(queueName,  true , consumer);
  }
}



  测试数据


  运行几个日志消息接收者实例,使用日志消息发送者发送消息,发现每个日志消息接收者都接收到同样的数据,说明发布订阅成功。


1
  [x] Received  'I saw a dog'




本文转自 genuinecx 51CTO博客,原文链接:http://blog.51cto.com/favccxx/1701738,如需转载请自行联系原作者
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
1月前
|
消息中间件 存储 监控
RabbitMQ:分布式系统中的高效消息队列
RabbitMQ:分布式系统中的高效消息队列
|
4月前
|
消息中间件 NoSQL 数据库
一文讲透消息队列RocketMQ实现消费幂等
这篇文章,我们聊聊消息队列中非常重要的最佳实践之一:消费幂等。
一文讲透消息队列RocketMQ实现消费幂等
|
1月前
|
消息中间件 Java
springboot整合消息队列——RabbitMQ
springboot整合消息队列——RabbitMQ
75 0
|
3月前
|
消息中间件 JSON Java
RabbitMQ消息队列
RabbitMQ消息队列
46 0
|
3月前
|
消息中间件
RabbitMQ 实现消息队列延迟
RabbitMQ 实现消息队列延迟
121 0
|
3天前
|
消息中间件 存储 数据安全/隐私保护
RabbitMQ使用教程
RabbitMQ使用教程
10 2
|
20天前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
15 0
|
1月前
|
消息中间件 Linux 开发工具
Linux系统安装RabbitMQ详细教程
Linux系统安装RabbitMQ详细教程
24 0
|
1月前
|
消息中间件 存储 中间件
【SpringCloud Stream消息驱动、设计思想以及整合rabbitmq消息队列案例--学习笔记】
【SpringCloud Stream消息驱动、设计思想以及整合rabbitmq消息队列案例--学习笔记】
50 0
|
1月前
|
消息中间件 缓存 API

相关产品

  • 云消息队列 MQ