RabbitMQ 实战教程(五) 主题

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: RabbitMQ 实战教程(五) 主题

虽然使用direct类型的转发器,改善了我们的日志系统。但是仍然存在一些局限性:它不能够基于多重条件进行路由选择。我们有可能希望不仅根据日志的级别,而且想根据日志的来源进行订阅。为了在我们的系统中实现上述的需求,我们需要了解一个更复杂的转发器:topic类型的转发器。



主题转发(Topic exchange)


使用topic类型的转发器,不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。你可以定义任何数量的标识符,上限为255个字节。


绑定键和选择键的形式一样。topic类型的转发器和direct类型的转发器很类似,一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。

* 可以匹配一个标识符。
# 可以匹配0个或多个标识符。

topic类型的转发器非常强大,可以实现其他类型的转发器。当一个队列与绑定键#绑定,将会收到所有的消息,类似fanout类型的转发器。当绑定键中不包含任何#与*时,类似direct类型的转发器。


通过下图,我们大概可以了解到topic类型的转发器的处理流程。

1.png


案例实战


发送端,连接到RabbitMQ(此时服务需要启动),发送一条数据,然后退出。

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String[] LOG_LEVEL_ARR = {"dao.debug", "dao.info", "dao.error",
"service.debug", "service.info",
"service.error","controller.debug",
"controller.info", "controller.error"};
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置MabbitMQ, 主机ip或者主机名
factory.setHost("127.0.0.1");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 指定一个转发器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 发送消息
for (String severity : LOG_LEVEL_ARR) {
String message = "somnus-MSG log : [" +severity+ "]" + UUID.randomUUID().toString();
// 发布消息至转发器
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] 发送消息是: '" + message + "'");
}
// 关闭连接
channel.close();
connection.close();
}
}

接受端,不断等待服务器推送消息,然后在控制台输出。

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String[] LOG_LEVEL_ARR = {"#", "dao.error", "*.error", "dao.*", "service.#", "*.controller.#"};
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置MabbitMQ, 主机ip或者主机名
factory.setHost("127.0.0.1");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 指定一个转发器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 设置日志级别
int rand = new Random().nextInt(5);
String severity = LOG_LEVEL_ARR[rand];
// 创建一个非持久的、唯一的、自动删除的队列
String queueName = channel.queueDeclare().getQueue();
// 绑定转发器和队列
channel.queueBind(queueName, EXCHANGE_NAME, severity);
// 打印
System.out.println(" [*] 等待消息进入. 请按 CTRL+C 结束");
System.out.println(" [*] LOG INFO : " + severity);
// 创建队列消费者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] 接收消息是: '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}

我们多开几个ReceiveLogsTopic。然后,启动EmitLogTopic进行消息发送。打印结果如下

ReceiveLogsTopic1

[*] 等待消息进入. 请按 CTRL+C 结束
[*] LOG INFO : dao.error
[x] 接收消息是: 'somnus-MSG log : [dao.error]041cd8ba-df7d-4d20-a11f-ba21a0c2a02a'

ReceiveLogsTopic2

[*] 等待消息进入. 请按 CTRL+C 结束
[*] LOG INFO : *.error
[x] 接收消息是: 'Liang-MSG log : [dao.error]041cd8ba-df7d-4d20-a11f-ba21a0c2a02a'
[x] 接收消息是:'somnus-MSG log : [service.error]e3565f12-9782-4c22-a91c-f513f31b037d'
[x] 接收消息是:'somnus-MSG log : [controller.error]4436101a-3346-41f6-a9af-b8a4fbda451e'

ReceiveLogsTopic3

[*] 等待消息进入. 请按 CTRL+C 结束
[*] LOG INFO : #
[x] 接收消息是: 'somnus-MSG log : [dao.debug]4eb08245-2c05-490b-a5a5-2742cb70d831'
[x] 接收消息是: 'somnus-MSG log : [dao.info]e9d4073b-1e61-4c6f-b531-ac42eaa346af'
[x] 接收消息是: 'somnus-MSG log : [dao.error]041cd8ba-df7d-4d20-a11f-ba21a0c2a02a'
[x] 接收消息是: 'somnus-MSG log : [service.debug]0ec84cbf-47ab-4813-a5db-e57d5e78830e'
[x] 接收消息是: 'somnus-MSG log : [service.info]2e12e1b7-7a09-4eb7-8ad1-8e53f533121c'
[x] 接收消息是: 'somnus-MSG log : [service.error]e3565f12-9782-4c22-a91c-f513f31b037d'
[x] 接收消息是: 'somnus-MSG log : [controller.debug]94e5be72-15f6-496d-84f3-2a107bafc92b'
[x] 接收消息是: 'somnus-MSG log : [controller.info]62bbe378-617d-4214-beb4-98cc53e73272'
[x] 接收消息是: 'somnus-MSG log : [controller.error]4436101a-3346-41f6-a9af-b8a4fbda451e'

ReceiveLogsTopic4

[*] 等待消息进入. 请按 CTRL+C 结束
[*] LOG INFO : service.#
[x] 接收消息是: 'somnus-MSG log : [service.debug]0ec84cbf-47ab-4813-a5db-e57d5e78830e'
[x] 接收消息是: 'somnus-MSG log : [service.info]2e12e1b7-7a09-4eb7-8ad1-8e53f533121c'
[x] 接收消息是: 'somnus-MSG log : [service.error]e3565f12-9782-4c22-a91c-f513f31b037d'

我们发现,ReceiveLogsTopic1、ReceiveLogsTopic2、ReceiveLogsTopic3、ReceiveLogsTopic4同时收到了属于自己匹配的消息。尤其是ReceiveLogsTopic1类似于direct类型的转发器,ReceiveLogsTopic3通过“#”匹配到所有消息。


相关连接:

RabbitMq3.6官方文档

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
19天前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
192 1
|
6月前
|
消息中间件 Java 数据库
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
|
6月前
|
消息中间件 搜索推荐 调度
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
|
6月前
|
消息中间件 存储 NoSQL
RocketMQ实战—6.生产优化及运维方案
本文围绕RocketMQ集群的使用与优化,详细探讨了六个关键问题。首先,介绍了如何通过ACL配置实现RocketMQ集群的权限控制,防止不同团队间误用Topic。其次,讲解了消息轨迹功能的开启与追踪流程,帮助定位和排查问题。接着,分析了百万消息积压的处理方法,包括直接丢弃、扩容消费者或通过新Topic间接扩容等策略。此外,提出了针对RocketMQ集群崩溃的金融级高可用方案,确保消息不丢失。同时,讨论了为RocketMQ增加限流功能的重要性及实现方式,以提升系统稳定性。最后,分享了从Kafka迁移到RocketMQ的双写双读方案,确保数据一致性与平稳过渡。
|
4月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
2371 9
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
4月前
|
消息中间件 监控 Docker
Docker环境下快速部署RabbitMQ教程。
就这样,你成功地用魔法召唤出了RabbitMQ,还把它和你的应用程序连接了起来。现在,消息会像小溪流水一样,在你的系统中自由流淌。别忘了,兔子们不喜欢孤独,他们需要你细心的关怀,不时地监控它们,确保他们的世界运转得井井有条。
245 18
|
6月前
|
消息中间件 NoSQL 大数据
RocketMQ实战—5.消息重复+乱序+延迟的处理
本文围绕RocketMQ的使用与优化展开,分析了优惠券重复发放的原因及解决方案。首先,通过案例说明了优惠券系统因消息重复、数据库宕机或消费失败等原因导致重复发券的问题,并提出引入幂等性机制(如业务判断法、Redis状态判断法)来保证数据唯一性。其次,探讨了死信队列在处理消费失败时的作用,以及如何通过重试和死信队列解决消息处理异常。接着,分析了订单库同步中消息乱序的原因,提出了基于顺序消息机制的代码实现方案,确保消息按序处理。此外,介绍了利用Tag和属性过滤数据提升效率的方法,以及延迟消息机制优化定时退款扫描的功能。最后,总结了RocketMQ生产实践中的经验.
RocketMQ实战—5.消息重复+乱序+延迟的处理
|
6月前
|
消息中间件 Java 测试技术
RocketMQ实战—7.生产集群部署和生产参数
本文详细介绍了RocketMQ生产集群的部署与调优过程,包括集群规划、环境搭建、参数配置和优化策略。
RocketMQ实战—7.生产集群部署和生产参数
|
6月前
|
消息中间件 NoSQL Java
RocketMQ实战—10.营销系统代码优化
本文主要介绍了如何对营销系统的四大促销场景的代码进行优化,包括:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
|
6月前
|
消息中间件 存储 Kafka
RocketMQ实战—4.消息零丢失的方案
本文分析了用户支付完成后未收到红包的问题,深入探讨了RocketMQ事务消息机制的实现原理及其在确保消息零丢失中的作用。首先,通过全链路分析发现消息可能在推送、存储或消费环节丢失。接着,介绍了RocketMQ事务消息机制如何通过half消息、本地事务执行及回调确认来保证消息发送成功,并详细解析了其底层原理,如half消息对消费者不可见、rollback与commit操作等。同时,对比了同步重试方案,指出其在复杂场景下的局限性。
RocketMQ实战—4.消息零丢失的方案