轻松搞定RabbitMQ(四)——发布/订阅

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介:        翻译地址:http://www.rabbitmq.com/tutorials/tutorial-three-java.html        在前面的教程中,我们创建了一个工作队列,都是假设一个任务只交给一个消费者。

       翻译地址:http://www.rabbitmq.com/tutorials/tutorial-three-java.html

       在前面的教程中,我们创建了一个工作队列,都是假设一个任务只交给一个消费者。这次我们做一些完全不同的事儿——将消息发送给多个消费者。这种模式叫做“发布/订阅”。

       为了说明这个模式,我们将构建一个简单日志系统。它包含2段程序:第一个将发出日志消息,第二个接受并打印消息。

       如果在日志系统中每一个接受者(订阅者)都会的得到消息的拷贝。那样的话,我们可以运行一个接受者(订阅者)程序,直接把日志记录到硬盘。同时运行另一个接受(订阅者)程序,打印日志到屏幕上。

       说白了,发表日志消息将被广播给所有的接收者。


Exchanges(转发器)

       前面的博文汇总,我们都是基于一个队列发送和接受消息。现在介绍一下完整的消息传递模式。

       RabbitMQ消息模式的核心理念是:生产者没有直接发送任何消费到队列。实际上,生产者都不知道这个消费是发送给哪个队列的。

       相反,生产者只能发送消息给转发器,转发器是非常简单的。一方面它接受生产者的消息,另一方面向队列推送消息。转发器必须清楚的知道如何处理接收到的消息。附加一个特定的队列吗?附加多个队列?或者是否丢弃?这些规则通过转发器的类型进行定义。

       

       类型有:Direct、Topic、Headers和Fanout。我们关注最后一个。现在让我们创建一个该类型的转发器,定义如下:

channel.exchangeDeclare("logs", "fanout");
       fanout转发器非常简单,从名字就可以看出,它是广播接受到的消息给所有的队列。而这正好符合日志系统的需求。

Nameless exchange(匿名转发

       之前我们对转换器一无所知,却可以将消息发送到队列,那是可能是我们用了默认的转发器,转发器名为空字符串""。之前我们发布消息的代码是:

channel.basicPublish("", "hello", null, message.getBytes());
       第一个参数就是转发器的名字,空字符串表示模式或者匿名的转发器。消息通过队列的routingKey路由到指定的队列中去,如果存在的话。

       现在我们可以指定转发器的名字了:

channel.basicPublish( "logs", "", null, message.getBytes());


Temporary queues(临时队列)

       你可能还记得之前我们用队列时,会指定一个名字。队列有名字对我们来说是非常重要的——我们需要为消费者指定同一个队列。

       但这并不是我们的日志系统所关心的。我们要监听所有日志消息,而不仅仅是一类日志。我们只对对当前流动的消息感兴趣。解决这些问题,我盟需要完成两件事。

       首先,每当我盟连接到RabbitMQ时,需要一个新的空队列。为此我们需要创建一个随机名字的空队列,或者更好的,让服务器选好年则一个随机名字的空队列给我们。

       其次,一旦消费者断开连接,队列将自动删除。

我们提供一个无参的queueDeclare()方法,创建一个非持久化、独立的、自动删除的队列,且名字是随机生成的。

String queueName = channel.queueDeclare().getQueue();
queueName是一个随机队列名。看起来会像amq.gen-JzTY20BRgKO-HjmUJj0wLg。


Bindings(绑定)

       

       我们已经创建了一个广播的转发器和一个随机队列。现在需要告诉转发器转发消息到队列。这个关联转发器和队列的我们叫它Binding。

channel.queueBind(queueName, "logs", "");
这样,日志转发器将附加到日志队列上去。


完整的例子:

发送端代码(生产者)EmitLog.java

public class EmitLog {
	private final static String EXCHANGE_NAME = "logs";

	public static void main(String[] args) throws IOException {
		/**
		 * 创建连接连接到MabbitMQ
		 */
		ConnectionFactory factory = new ConnectionFactory();
		// 设置MabbitMQ所在主机ip或者主机名
		factory.setHost("127.0.0.1");
		// 创建一个连接
		Connection connection = factory.newConnection();
		// 创建一个频道
		Channel channel = connection.createChannel();
		// 指定转发——广播
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

		for(int i=0;i<3;i++){
			// 发送的消息
			String message = "Hello World!";
			channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
			System.out.println(" [x] Sent '" + message + "'");
		}

		// 关闭频道和连接
		channel.close();
		connection.close();
	}
}

消费者1 ReceiveLogs2Console.java

public class ReceiveLogs2Console {
	private static final String EXCHANGE_NAME = "logs";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		// 声明一个随机队列
		String queueName = channel.queueDeclare().getQueue();
	    channel.queueBind(queueName, EXCHANGE_NAME, "");
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    System.out.println(" [x] Received '" + message + "'");
			  }
			};
			channel.basicConsume(queueName, true, consumer);
	}
}

消费者2 ReceiveLogs2File.java

public class ReceiveLogs2File {
	private static final String EXCHANGE_NAME = "logs";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		// 声明一个随机队列
		String queueName = channel.queueDeclare().getQueue();
	    channel.queueBind(queueName, EXCHANGE_NAME, "");
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    print2File(message);
//			    System.out.println(" [x] Received '" + message + "'");
			  }
			};
			channel.basicConsume(queueName, true, consumer);
	}
	
	private static void print2File(String msg) {
		try {
			String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath();
			String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
			File file = new File(dir, logFileName + ".log");
			FileOutputStream fos = new FileOutputStream(file, true);
			fos.write(((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes());
			fos.flush();
			fos.close();
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}  
}
       可以看到我们1个生产者用于发送log消息,2个消费者,一个用于显示,一个用于记录文件。

       生产者声明了一个广播模式的转换器,订阅这个转换器的消费者都可以收到每一条消息。可以看到在生产者中,没有声明队列。这也验证了之前说的。生产者其实只关心exchange,至于exchange会把消息转发给哪些队列,并不是生产者关心的。

       2个消费者,一个打印日志,一个写入文件,除了这2个地方不一样,其他地方一模一样。也是声明一下广播模式的转换器,而队列则是随机生成的,消费者实例启动后,会创建一个随机实例,这个在管理页面可以看到(如图)。而实例关闭后,随机队列也会自动删除。最后将队列与转发器绑定。


       注:运行的时候要先运行2个消费者实例,然后在运行生产者实例。否则获取不到实例。

       看看最终的结果吧:





相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 Java 物联网
一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布
之前介绍了RabbitMQ以及如何在SpringBoot项目中整合使用RabbitMQ,看过的朋友都说写的比较详细,希望再总结一下目前比较流行的MQTT。所以接下来,就来介绍什么MQTT?它在IoT中有着怎样的作用?如何在项目中使用MQTT?
14779 5
一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布
|
监控 物联网 API
【.NET+MQTT】.NET6 环境下实现MQTT通信,以及服务端、客户端的双边消息订阅与发布的代码演示
MQTT广泛应用于工业物联网、智能家居、各类智能制造或各类自动化场景等。MQTT是一个基于客户端-服务器的消息发布/订阅传输协议,在很多受限的环境下,比如说机器与机器通信、机器与物联网通信等。好了,科普的废话不多说,下面直接通过.NET环境来实现一套MQTT通信demo,实现服务端与客户端的双边消息发布与订阅的功能和演示。
1341 0
【.NET+MQTT】.NET6 环境下实现MQTT通信,以及服务端、客户端的双边消息订阅与发布的代码演示
|
2月前
|
消息中间件 测试技术
通过轻量消息队列(原MNS)主题HTTP订阅+ARMS实现自定义数据多渠道告警
轻量消息队列(原MNS)以其简单队列模型、轻量化协议及按量后付费模式,成为阿里云产品间消息传输首选。本文通过创建主题、订阅、配置告警集成等步骤,展示了该产品在实际应用中的部分功能,确保消息的可靠传输。
64 2
|
消息中间件 存储 负载均衡
两个实验让我彻底弄懂了「订阅关系一致」
这篇文章,笔者想聊聊 RocketMQ 最佳实践之一:**保证订阅关系一致**。 订阅关系一致指的是同一个消费者 Group ID 下所有 Consumer 实例所订阅的 Topic 、Tag 必须完全一致。 如果订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。
两个实验让我彻底弄懂了「订阅关系一致」
|
传感器 负载均衡 物联网
MQTT v5共享订阅是怎么回事?如何使用共享订阅提高消息订阅的灵活性和可伸缩性?
MQTT v5共享订阅是怎么回事?如何使用共享订阅提高消息订阅的灵活性和可伸缩性?
587 1
|
消息中间件 安全 Go
动态订阅时 rocketmq-client-go 代码有map并发bug
动态订阅时 rocketmq-client-go 代码有map并发bug
73 2
|
8月前
|
消息中间件 Java RocketMQ
MQ产品使用合集之在同一个 Java 进程内建立三个消费对象并设置三个消费者组订阅同一主题和标签的情况下,是否会发生其中一个消费者组无法接收到消息的现象
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
154 1
|
8月前
|
传感器 监控 网络协议
MQTT 发布、订阅模式介绍
【2月更文挑战第17天】
512 6
MQTT 发布、订阅模式介绍
|
8月前
|
存储 负载均衡 安全
MQTT常见问题之MQTT使用共享订阅失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
8月前
|
消息中间件 Java
RabbitMQ中的消息发布-订阅模式是什么?如何实现?
RabbitMQ中的消息发布-订阅模式是什么?如何实现?
220 0