RabbitMQ 实战教程(五) 主题

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: RabbitMQ 实战教程(五) 主题

虽然使用direct类型的转发器,改善了我们的日志系统。但是仍然存在一些局限性:它不能够基于多重条件进行路由选择。我们有可能希望不仅根据日志的级别,而且想根据日志的来源进行订阅。为了在我们的系统中实现上述的需求,我们需要了解一个更复杂的转发器:topic类型的转发器。



主题转发(Topic exchange)


使用topic类型的转发器,不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。你可以定义任何数量的标识符,上限为255个字节。


绑定键和选择键的形式一样。topic类型的转发器和direct类型的转发器很类似,一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。

* 可以匹配一个标识符。
# 可以匹配0个或多个标识符。

topic类型的转发器非常强大,可以实现其他类型的转发器。当一个队列与绑定键#绑定,将会收到所有的消息,类似fanout类型的转发器。当绑定键中不包含任何#与*时,类似direct类型的转发器。


通过下图,我们大概可以了解到topic类型的转发器的处理流程。

1.png


案例实战


发送端,连接到RabbitMQ(此时服务需要启动),发送一条数据,然后退出。

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String[] LOG_LEVEL_ARR = {"dao.debug", "dao.info", "dao.error",
"service.debug", "service.info",
"service.error","controller.debug",
"controller.info", "controller.error"};
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置MabbitMQ, 主机ip或者主机名
factory.setHost("127.0.0.1");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 指定一个转发器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 发送消息
for (String severity : LOG_LEVEL_ARR) {
String message = "somnus-MSG log : [" +severity+ "]" + UUID.randomUUID().toString();
// 发布消息至转发器
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] 发送消息是: '" + message + "'");
}
// 关闭连接
channel.close();
connection.close();
}
}

接受端,不断等待服务器推送消息,然后在控制台输出。

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String[] LOG_LEVEL_ARR = {"#", "dao.error", "*.error", "dao.*", "service.#", "*.controller.#"};
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置MabbitMQ, 主机ip或者主机名
factory.setHost("127.0.0.1");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 指定一个转发器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 设置日志级别
int rand = new Random().nextInt(5);
String severity = LOG_LEVEL_ARR[rand];
// 创建一个非持久的、唯一的、自动删除的队列
String queueName = channel.queueDeclare().getQueue();
// 绑定转发器和队列
channel.queueBind(queueName, EXCHANGE_NAME, severity);
// 打印
System.out.println(" [*] 等待消息进入. 请按 CTRL+C 结束");
System.out.println(" [*] LOG INFO : " + severity);
// 创建队列消费者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] 接收消息是: '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}

我们多开几个ReceiveLogsTopic。然后,启动EmitLogTopic进行消息发送。打印结果如下

ReceiveLogsTopic1

[*] 等待消息进入. 请按 CTRL+C 结束
[*] LOG INFO : dao.error
[x] 接收消息是: 'somnus-MSG log : [dao.error]041cd8ba-df7d-4d20-a11f-ba21a0c2a02a'

ReceiveLogsTopic2

[*] 等待消息进入. 请按 CTRL+C 结束
[*] LOG INFO : *.error
[x] 接收消息是: 'Liang-MSG log : [dao.error]041cd8ba-df7d-4d20-a11f-ba21a0c2a02a'
[x] 接收消息是:'somnus-MSG log : [service.error]e3565f12-9782-4c22-a91c-f513f31b037d'
[x] 接收消息是:'somnus-MSG log : [controller.error]4436101a-3346-41f6-a9af-b8a4fbda451e'

ReceiveLogsTopic3

[*] 等待消息进入. 请按 CTRL+C 结束
[*] LOG INFO : #
[x] 接收消息是: 'somnus-MSG log : [dao.debug]4eb08245-2c05-490b-a5a5-2742cb70d831'
[x] 接收消息是: 'somnus-MSG log : [dao.info]e9d4073b-1e61-4c6f-b531-ac42eaa346af'
[x] 接收消息是: 'somnus-MSG log : [dao.error]041cd8ba-df7d-4d20-a11f-ba21a0c2a02a'
[x] 接收消息是: 'somnus-MSG log : [service.debug]0ec84cbf-47ab-4813-a5db-e57d5e78830e'
[x] 接收消息是: 'somnus-MSG log : [service.info]2e12e1b7-7a09-4eb7-8ad1-8e53f533121c'
[x] 接收消息是: 'somnus-MSG log : [service.error]e3565f12-9782-4c22-a91c-f513f31b037d'
[x] 接收消息是: 'somnus-MSG log : [controller.debug]94e5be72-15f6-496d-84f3-2a107bafc92b'
[x] 接收消息是: 'somnus-MSG log : [controller.info]62bbe378-617d-4214-beb4-98cc53e73272'
[x] 接收消息是: 'somnus-MSG log : [controller.error]4436101a-3346-41f6-a9af-b8a4fbda451e'

ReceiveLogsTopic4

[*] 等待消息进入. 请按 CTRL+C 结束
[*] LOG INFO : service.#
[x] 接收消息是: 'somnus-MSG log : [service.debug]0ec84cbf-47ab-4813-a5db-e57d5e78830e'
[x] 接收消息是: 'somnus-MSG log : [service.info]2e12e1b7-7a09-4eb7-8ad1-8e53f533121c'
[x] 接收消息是: 'somnus-MSG log : [service.error]e3565f12-9782-4c22-a91c-f513f31b037d'

我们发现,ReceiveLogsTopic1、ReceiveLogsTopic2、ReceiveLogsTopic3、ReceiveLogsTopic4同时收到了属于自己匹配的消息。尤其是ReceiveLogsTopic1类似于direct类型的转发器,ReceiveLogsTopic3通过“#”匹配到所有消息。


相关连接:

RabbitMq3.6官方文档

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
打赏
0
0
0
0
5
分享
相关文章
RocketMQ实战—10.营销系统代码优化
本文主要介绍了如何对营销系统的四大促销场景的代码进行优化,包括:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
RocketMQ实战—3.基于RocketMQ升级订单系统架构
本文主要介绍了基于MQ实现订单系统核心流程的异步化改造、基于MQ实现订单系统和第三方系统的解耦、基于MQ实现将订单数据同步给大数据团队、秒杀系统的技术难点以及秒杀商详页的架构设计和基于MQ实现秒杀系统的异步化架构。
RocketMQ实战—3.基于RocketMQ升级订单系统架构
RocketMQ实战—6.生产优化及运维方案
本文围绕RocketMQ集群的使用与优化,详细探讨了六个关键问题。首先,介绍了如何通过ACL配置实现RocketMQ集群的权限控制,防止不同团队间误用Topic。其次,讲解了消息轨迹功能的开启与追踪流程,帮助定位和排查问题。接着,分析了百万消息积压的处理方法,包括直接丢弃、扩容消费者或通过新Topic间接扩容等策略。此外,提出了针对RocketMQ集群崩溃的金融级高可用方案,确保消息不丢失。同时,讨论了为RocketMQ增加限流功能的重要性及实现方式,以提升系统稳定性。最后,分享了从Kafka迁移到RocketMQ的双写双读方案,确保数据一致性与平稳过渡。
RocketMQ实战—7.生产集群部署和生产参数
本文详细介绍了RocketMQ生产集群的部署与调优过程,包括集群规划、环境搭建、参数配置和优化策略。
RocketMQ实战—7.生产集群部署和生产参数
RocketMQ实战—2.RocketMQ集群生产部署
本文主要介绍了大纲什么是消息中间件、消息中间件的技术选型、RocketMQ的架构原理和使用方式、消息中间件路由中心的架构原理、Broker的主从架构原理、高可用的消息中间件生产部署架构、部署一个小规模的RocketMQ集群进行压测、如何对RocketMQ集群进行可视化的监控和管理、进行OS内核参数和JVM参数的调整、如何对小规模RocketMQ集群进行压测、消息中间件集群生产部署规划梳理。
RocketMQ实战—2.RocketMQ集群生产部署
RocketMQ实战—5.消息重复+乱序+延迟的处理
本文围绕RocketMQ的使用与优化展开,分析了优惠券重复发放的原因及解决方案。首先,通过案例说明了优惠券系统因消息重复、数据库宕机或消费失败等原因导致重复发券的问题,并提出引入幂等性机制(如业务判断法、Redis状态判断法)来保证数据唯一性。其次,探讨了死信队列在处理消费失败时的作用,以及如何通过重试和死信队列解决消息处理异常。接着,分析了订单库同步中消息乱序的原因,提出了基于顺序消息机制的代码实现方案,确保消息按序处理。此外,介绍了利用Tag和属性过滤数据提升效率的方法,以及延迟消息机制优化定时退款扫描的功能。最后,总结了RocketMQ生产实践中的经验.
RocketMQ实战—5.消息重复+乱序+延迟的处理
RocketMQ实战—4.消息零丢失的方案
本文分析了用户支付完成后未收到红包的问题,深入探讨了RocketMQ事务消息机制的实现原理及其在确保消息零丢失中的作用。首先,通过全链路分析发现消息可能在推送、存储或消费环节丢失。接着,介绍了RocketMQ事务消息机制如何通过half消息、本地事务执行及回调确认来保证消息发送成功,并详细解析了其底层原理,如half消息对消费者不可见、rollback与commit操作等。同时,对比了同步重试方案,指出其在复杂场景下的局限性。
RocketMQ实战—4.消息零丢失的方案
RocketMQ实战—1.订单系统面临的技术挑战
本文详细分析了一个订单系统的设计与技术挑战。首先,介绍了订单系统的整体架构、业务流程及负载情况,包括电商购物流程、核心和非核心业务流程,以及真实生产中的负载压力。接着,探讨了系统面临的主要技术问题:支付后发券、发红包等操作导致性能下降;退款流程复杂且易失败;与第三方系统耦合带来的不稳定;大数据团队直接查询数据库影响性能;秒杀活动时数据库压力剧增等。最后,通过放大100倍压力的方法,梳理了高并发下的技术挑战,如核心链路优化、后台线程补偿机制、第三方系统解耦、数据获取方式改进等,为订单系统的优化提供了全面的参考。
RocketMQ实战—1.订单系统面临的技术挑战
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等