RabbitMQ 实战教程(五) 主题

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: RabbitMQ 实战教程(五) 主题

虽然使用direct类型的转发器,改善了我们的日志系统。但是仍然存在一些局限性:它不能够基于多重条件进行路由选择。我们有可能希望不仅根据日志的级别,而且想根据日志的来源进行订阅。为了在我们的系统中实现上述的需求,我们需要了解一个更复杂的转发器:topic类型的转发器。



主题转发(Topic exchange)


使用topic类型的转发器,不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。你可以定义任何数量的标识符,上限为255个字节。


绑定键和选择键的形式一样。topic类型的转发器和direct类型的转发器很类似,一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。

* 可以匹配一个标识符。
# 可以匹配0个或多个标识符。

topic类型的转发器非常强大,可以实现其他类型的转发器。当一个队列与绑定键#绑定,将会收到所有的消息,类似fanout类型的转发器。当绑定键中不包含任何#与*时,类似direct类型的转发器。


通过下图,我们大概可以了解到topic类型的转发器的处理流程。

1.png


案例实战


发送端,连接到RabbitMQ(此时服务需要启动),发送一条数据,然后退出。

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String[] LOG_LEVEL_ARR = {"dao.debug", "dao.info", "dao.error",
"service.debug", "service.info",
"service.error","controller.debug",
"controller.info", "controller.error"};
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置MabbitMQ, 主机ip或者主机名
factory.setHost("127.0.0.1");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 指定一个转发器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 发送消息
for (String severity : LOG_LEVEL_ARR) {
String message = "somnus-MSG log : [" +severity+ "]" + UUID.randomUUID().toString();
// 发布消息至转发器
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] 发送消息是: '" + message + "'");
}
// 关闭连接
channel.close();
connection.close();
}
}

接受端,不断等待服务器推送消息,然后在控制台输出。

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String[] LOG_LEVEL_ARR = {"#", "dao.error", "*.error", "dao.*", "service.#", "*.controller.#"};
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置MabbitMQ, 主机ip或者主机名
factory.setHost("127.0.0.1");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 指定一个转发器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 设置日志级别
int rand = new Random().nextInt(5);
String severity = LOG_LEVEL_ARR[rand];
// 创建一个非持久的、唯一的、自动删除的队列
String queueName = channel.queueDeclare().getQueue();
// 绑定转发器和队列
channel.queueBind(queueName, EXCHANGE_NAME, severity);
// 打印
System.out.println(" [*] 等待消息进入. 请按 CTRL+C 结束");
System.out.println(" [*] LOG INFO : " + severity);
// 创建队列消费者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] 接收消息是: '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}

我们多开几个ReceiveLogsTopic。然后,启动EmitLogTopic进行消息发送。打印结果如下

ReceiveLogsTopic1

[*] 等待消息进入. 请按 CTRL+C 结束
[*] LOG INFO : dao.error
[x] 接收消息是: 'somnus-MSG log : [dao.error]041cd8ba-df7d-4d20-a11f-ba21a0c2a02a'

ReceiveLogsTopic2

[*] 等待消息进入. 请按 CTRL+C 结束
[*] LOG INFO : *.error
[x] 接收消息是: 'Liang-MSG log : [dao.error]041cd8ba-df7d-4d20-a11f-ba21a0c2a02a'
[x] 接收消息是:'somnus-MSG log : [service.error]e3565f12-9782-4c22-a91c-f513f31b037d'
[x] 接收消息是:'somnus-MSG log : [controller.error]4436101a-3346-41f6-a9af-b8a4fbda451e'

ReceiveLogsTopic3

[*] 等待消息进入. 请按 CTRL+C 结束
[*] LOG INFO : #
[x] 接收消息是: 'somnus-MSG log : [dao.debug]4eb08245-2c05-490b-a5a5-2742cb70d831'
[x] 接收消息是: 'somnus-MSG log : [dao.info]e9d4073b-1e61-4c6f-b531-ac42eaa346af'
[x] 接收消息是: 'somnus-MSG log : [dao.error]041cd8ba-df7d-4d20-a11f-ba21a0c2a02a'
[x] 接收消息是: 'somnus-MSG log : [service.debug]0ec84cbf-47ab-4813-a5db-e57d5e78830e'
[x] 接收消息是: 'somnus-MSG log : [service.info]2e12e1b7-7a09-4eb7-8ad1-8e53f533121c'
[x] 接收消息是: 'somnus-MSG log : [service.error]e3565f12-9782-4c22-a91c-f513f31b037d'
[x] 接收消息是: 'somnus-MSG log : [controller.debug]94e5be72-15f6-496d-84f3-2a107bafc92b'
[x] 接收消息是: 'somnus-MSG log : [controller.info]62bbe378-617d-4214-beb4-98cc53e73272'
[x] 接收消息是: 'somnus-MSG log : [controller.error]4436101a-3346-41f6-a9af-b8a4fbda451e'

ReceiveLogsTopic4

[*] 等待消息进入. 请按 CTRL+C 结束
[*] LOG INFO : service.#
[x] 接收消息是: 'somnus-MSG log : [service.debug]0ec84cbf-47ab-4813-a5db-e57d5e78830e'
[x] 接收消息是: 'somnus-MSG log : [service.info]2e12e1b7-7a09-4eb7-8ad1-8e53f533121c'
[x] 接收消息是: 'somnus-MSG log : [service.error]e3565f12-9782-4c22-a91c-f513f31b037d'

我们发现,ReceiveLogsTopic1、ReceiveLogsTopic2、ReceiveLogsTopic3、ReceiveLogsTopic4同时收到了属于自己匹配的消息。尤其是ReceiveLogsTopic1类似于direct类型的转发器,ReceiveLogsTopic3通过“#”匹配到所有消息。


相关连接:

RabbitMq3.6官方文档

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
43 0
rabbitmq基础教程(ui,java,springamqp)
|
3月前
|
消息中间件 数据采集 中间件
RabbitMQ的使用—实战
RabbitMQ的使用—实战
124 0
|
6月前
|
消息中间件 新零售 弹性计算
云消息队列 RabbitMQ 版入门训练营,解锁对比开源优势与零基础实战
欢迎加入「云消息队列 RabbitMQ 版入门训练营」。
183 20
|
4月前
|
消息中间件 缓存 Java
RocketMQ的JAVA落地实战
RocketMQ作为一款高性能、高可靠、高实时、分布式特点的消息中间件,其核心作用主要体现在异步处理、削峰填谷以及系统解耦三个方面。
213 0
|
5月前
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
275 2
|
5月前
|
消息中间件 监控 Ubuntu
RabbitMQ安装配置,超详细版教程
以上步骤为您提供了在Linux环境下安装RabbitMQ的详细过程。安装Erlang作为基础,然后通过添加官方源并安装RabbitMQ本身,最后对服务进行配置并启用Web管理界面。这些步骤操作简单直观,只需要跟随上述指南,即可在短时间内将RabbitMQ服务器运行起来,并进行进一步的配置和管理。不要忘记硬件和网络资源对性能的影响,确保RabbitMQ能够满足您的应用需求。
335 0
|
6月前
|
消息中间件 搜索推荐 RocketMQ
消息队列 MQ使用问题之如何将一个主题的多个分区分布到不同的Broker上
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
8月前
|
消息中间件 存储 Apache
RocketMQ实战教程之常见概念和模型
Apache RocketMQ 实战教程介绍了其核心概念和模型。消息是基本的数据传输单元,主题是消息的分类容器,支持字节、数字和短划线命名,最长64个字符。消息类型包括普通、顺序、事务和定时/延时消息。消息队列是实际存储和传输消息的容器,是主题的分区。消费者分组是一组行为一致的消费者的逻辑集合,也有命名限制。此外,文档还提到了一些使用约束和建议,如主题和消费者组名的命名规则,消息大小限制,请求超时时间等。RocketMQ 提供了多种消息模型,包括发布/订阅模型,有助于理解和优化消息处理。
|
7月前
|
消息中间件 Java RocketMQ
教程:Spring Boot整合RocketMQ的配置与优化
教程:Spring Boot整合RocketMQ的配置与优化
|
7月前
|
消息中间件 Java Spring
最新spingboot整合rabbitmq详细教程
最新spingboot整合rabbitmq详细教程