3.2. 发布订阅模型(Publish/Subscribe)

简介: 发布订阅模型通过交换机实现消息一对多分发,生产者将消息发给交换机,由其广播至多个绑定队列,每个队列的消费者均可接收消息。Fanout交换机为广播模式,支持消息同时推送至所有绑定队列,适用于通知、日志等场景。交换机不存储消息,若无队列绑定则消息丢失。

3.2.1 介绍
工作队列模型一次只能将消息发给一个队列,绑定队列的多个消息者只能有一个消费者处理消息。如果一条消息要发给多个应用程序使用工作队列模型将无法实现。举例,下图中支付成功后支付服务将消息发给交易服务和通知服务,使用工作队列模型将无法实现。

使用发布订阅模型可以实现上图的需求,发布订阅模型可以实现一条消息发给多个队列,每个队列绑定到同一个交换机,最终实现了向多个消费者发送一条消息,这种模式称为“发布/订阅”模型。

发布订阅模型里,生产者只能将消息发送到交换机,由交换机将消息推送到队列,交换机可以将消息推送给绑定它的所有队列,也可以有针对性的将消息推送给某几个队列,这就相当于有一批消费者订阅了消息,交换机根据各自的订阅去推送消息,组成部分如下:
● Publisher:生产者,不再发送消息到队列中,而是发给交换机
● Exchange:交换机,一方面,接收生产者发送的消息。另一方面,将消息推送给队列,是将消息推送给某个特别队列、递交给所有队列、或是将消息丢弃,到底如何操作,取决于交换机的类型。
● Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
● Consumer:消费者,与以前一样,订阅队列,没有变化
注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力[可以暂存,但不支持持久化],因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
3.2.2 交换机类型
交换机是如何实现将消息推送给所有队列,还是有针对性的将消息推送给某几个队列呢?
实现不同的需求要选用不同类型的交换机,可用的交换机类型:direct, topic, headers 和fanout.
● Fanout:广播类型,将消息交给所有绑定到交换机的队列。
● Direct:直接类型,基于RoutingKey(路由key)发送给订阅了消息的队列,交换机根据routingkey去判断消息应该转发到哪个队列
● Topic:通配符类型(主题类型),与Direct类似,只不过RoutingKey可以使用通配符
● Headers:头匹配类型,基于MQ的消息头匹配,用的较少。
课堂中,我们讲解前面的三种交换机模式。
3.3. fanout交换机
3.3.1 介绍
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。在广播模式下,消息发送流程是这样的:

● 1) 可以有多个队列
● 2) 每个队列都要绑定到Exchange(交换机)
● 3) 生产者发送的消息,只能发送到交换机
● 4) 交换机把消息发送给绑定过的所有队列
● 5) 订阅队列的消费者接收消息,每个队列订阅的消费者只有一个能拿到消息。
3.3.2 测试
3.3.2.1 创建队列
我们的计划是这样的:

● 创建一个名为 hmall.fanout的交换机,类型是Fanout
● 创建两个队列fanout.queue1和fanout.queue2,绑定到交换机hmall.fanout
在控制台创建队列fanout.queue1:

在创建一个队列fanout.queue2:

3.3.2.2 创建交换机
然后再创建一个交换机hmall.fanout:

3.3.2.3 绑定队列到交换机
然后绑定两个队列到交换机:

3.3.2.4 发送消息
下边实现消息发送:
在publisher服务的SpringAmqpTest类中添加测试方法:
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "hmall.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
发送成功查看mq控制台消息转发到了绑定此交换机的两个队列

3.3.2.5 接收消息
下边实现消息接收:
在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
将consumer服务启动起来后可以通过rabbitmq控制台查看消费者监听情况
进入队列界面,点击fanout.queue1:

进入队列界面,查看consumers,下图表示fanout.queue1队列有一个监听者。

同样的方法可以查看fanout.queue2队列的监听情况。
fanout.queue1和fanout.queue2每个队列都有一个监听者。
下边执行发送消息程序,观察控制台,下图说明每个消费者成功收到消息。
消费者1接收到Fanout消息:【hello, everyone!】
消费者2接收到Fanout消息:【hello, everyone!】
3.3.2.6 启动多个消费者实例
下边我们把consumer服务启动两个实例

此时再观察fanout.queue1和fanout.queue2的监听者,发现每个队列有两个监听者

此时的结果相当于下图:

此时执行发送消息程序后四个消息者都可以收到消息吗?
通过测试我们发现:
在每个队列的消费者中,发送一条消息只会有一个消费者接收到消息。
每个队列默认采用轮询的方式向消费者推送消息。
3.3.3 小结
交换机的作用是什么?
● 接收publisher发送的消息
● 将消息按照规则路由到与之绑定的队列
● 不能缓存消息,路由失败,消息丢失
● FanoutExchange的会将消息路由到每个绑定的队列

相关文章
|
存储 Kubernetes 监控
K8S核心组件介绍
K8S核心组件介绍
|
存储 监控 安全
ONVIF协议介绍
ONVIF协议介绍
7575 0
|
Unix Linux
完全指南:mv命令用法、示例和注意事项 | Linux文件移动与重命名
完全指南:mv命令用法、示例和注意事项 | Linux文件移动与重命名
3921 0
|
8月前
|
缓存 JSON JavaScript
浅拷贝与深拷贝区别之技术方案及应用实例解析
浅拷贝与深拷贝是编程中对象复制的两种核心方式。本文先阐述两者的概念,再通过代码示例(如JavaScript的`Object.assign()`和Python的切片操作实现浅拷贝,或`JSON.parse()`与`copy.deepcopy()`实现深拷贝)展示区别,并总结常见场景应用,如游戏状态保存和数据快照管理。掌握它们的选择与实现,可有效提升代码性能与可靠性。附面试资料链接供学习:[点此获取](https://pan.quark.cn/s/4459235fee85)。
547 1
|
7月前
|
Java Spring 容器
DI依赖注入的几种手段
本内容介绍了依赖注入的四种方式:构造器注入、接口注入、Setter注入和注解注入,并重点比较了Spring中的@Autowired与Java标准注解@Resource的区别,包括来源和依赖查找策略。
380 0
|
25天前
|
监控 数据可视化 安全
版本管理与产品迭代:规划、执行、工具与复盘全流程
本文系统阐述如何将产品版本管理从“发布流程”升级为“战略执行工具”,提出战略型、平台型、功能型、维护型四大版本分层体系,结合目标对齐、迭代拆解、风险管控与复盘优化四步法,助力团队实现从被动响应到主动规划的跃迁,提升产品竞争力与研发效能。
|
8月前
|
消息中间件 Dubbo Java
Netty源码—10.Netty工具之时间轮
本文主要介绍了什么是时间轮、HashedWheelTimer是什么、HashedWheelTimer的使用、HashedWheelTimer的运行流程、HashedWheelTimer的核心字段、HashedWheelTimer的构造方法、HashedWheelTimer添加任务和执行任务、HashedWheelTimer的完整源码、HashedWheelTimer的总结和HashedWheelTimer的应用。
|
12月前
|
关系型数据库 MySQL 数据库
图解MySQL【日志】——两阶段提交
两阶段提交是为了解决Redo Log和Binlog日志在事务提交时可能出现的半成功状态,确保两者的一致性。它分为准备阶段和提交阶段,通过协调者和参与者协作完成。准备阶段中,协调者向所有参与者发送准备请求,参与者执行事务并回复是否同意提交;提交阶段中,若所有参与者同意,则协调者发送提交请求,否则发送回滚请求。MySQL通过这种方式保证了分布式事务的一致性,并引入组提交机制减少磁盘I/O次数,提升性能。
1033 5
图解MySQL【日志】——两阶段提交
|
JavaScript 前端开发 数据处理
【Vue面试题二十八】、vue要做权限管理该怎么做?如果控制到按钮级别的权限怎么做?
这篇文章讨论了Vue中实现权限管理的策略,包括接口权限、路由权限、菜单权限和按钮权限的控制方法,并提供了不同的实现方案及代码示例,以确保用户只能访问被授权的资源。
【Vue面试题二十八】、vue要做权限管理该怎么做?如果控制到按钮级别的权限怎么做?
|
消息中间件 运维 UED
消息队列运维实战:攻克消息丢失、重复与积压难题
消息队列(MQ)作为分布式系统中的核心组件,承担着解耦、异步处理和流量削峰等功能。然而,在实际应用中,消息丢失、重复和积压等问题时有发生,严重影响系统的稳定性和数据的一致性。本文将深入探讨这些问题的成因及其解决方案,帮助您在运维过程中有效应对这些挑战。
390 1