RabbitMQ 一二事(3) - 订阅模式(微信公众号模式)的应用

简介: 之前讲的消费者互相可以把队列中的消息全部读取,但是不是读完整的所有信息 那么采用订阅模式就行,这就是微信公众号的模式, 比如10个人订阅了我的公众号"BeJavaGod",当我发送一条消息的时候, 那么这10个人都能收到我的消息并且查看,比如本条消息,对吧? 生产者制造消息发送给交换机X...

之前讲的消费者互相可以把队列中的消息全部读取,但是不是读完整的所有信息

那么采用订阅模式就行,这就是微信公众号的模式,

比如10个人订阅了我的公众号"BeJavaGod",当我发送一条消息的时候,

那么这10个人都能收到我的消息并且查看,比如本条消息,对吧?

生产者制造消息发送给交换机X,而不是发送给队列,队列和交换机绑定,消费者从各自的队列中获得消息

这样则实现一个生产者发送的所有消息都能被所有的消费者同时接收到

需要注意的地方是,在生产者创建消息发送到交换机时,此时没有队列,那么消息则丢失,消费者的队列绑定后再次发送则消息传达,原理是消息必须存放在队列中

生产者:

 1 public class Send {
 2 
 3     private final static String EXCHANGE_NAME = "test_exchange_fanout";
 4 
 5     public static void main(String[] argv) throws Exception {
 6         // 获取到连接以及mq通道
 7         Connection connection = ConnectionUtil.getConnection();
 8         Channel channel = connection.createChannel();
 9 
10         // 声明exchange
11         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
12 
13         // 消息内容
14         String message = "id=1001";
15         channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
16         System.out.println(" [x] Sent '" + message + "'");
17 
18         channel.close();
19         connection.close();
20     }
21 }

消费者1

 1 public class Recv {
 2 
 3     private final static String QUEUE_NAME = "test_queue_fanout_1";
 4 
 5     private final static String EXCHANGE_NAME = "test_exchange_fanout";
 6 
 7     public static void main(String[] argv) throws Exception {
 8 
 9         // 获取到连接以及mq通道
10         Connection connection = ConnectionUtil.getConnection();
11         Channel channel = connection.createChannel();
12 
13         // 声明队列
14         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
15 
16         // 绑定队列到交换机
17         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
18 
19         // 同一时刻服务器只会发一条消息给消费者
20         channel.basicQos(1);
21 
22         // 定义队列的消费者
23         QueueingConsumer consumer = new QueueingConsumer(channel);
24         // 监听队列,手动返回完成
25         channel.basicConsume(QUEUE_NAME, false, consumer);
26 
27         // 获取消息
28         while (true) {
29             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
30             String message = new String(delivery.getBody());
31             System.out.println(" [x] Received '" + message + "'");
32             Thread.sleep(10);
33 
34             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
35         }
36     }
37 }

消费者2

 1 public class Recv2 {
 2 
 3     private final static String QUEUE_NAME = "test_queue_fanout_2";
 4 
 5     private final static String EXCHANGE_NAME = "test_exchange_fanout";
 6 
 7     public static void main(String[] argv) throws Exception {
 8 
 9         // 获取到连接以及mq通道
10         Connection connection = ConnectionUtil.getConnection();
11         Channel channel = connection.createChannel();
12 
13         // 声明队列
14         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
15 
16         // 绑定队列到交换机
17         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
18 
19         // 同一时刻服务器只会发一条消息给消费者
20         channel.basicQos(1);
21 
22         // 定义队列的消费者
23         QueueingConsumer consumer = new QueueingConsumer(channel);
24         // 监听队列,手动返回完成
25         channel.basicConsume(QUEUE_NAME, false, consumer);
26 
27         // 获取消息
28         while (true) {
29             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
30             String message = new String(delivery.getBody());
31             System.out.println(" [x] Received '" + message + "'");
32             Thread.sleep(10);
33 
34             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
35         }
36     }
37 }

ok,这样就是最简单的订阅demo

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
人工智能 关系型数据库 OLAP
305 0
|
5月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
4119 9
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
8月前
|
消息中间件 存储 前端开发
MQ有什么应用场景
MQ有什么应用场景
|
8月前
百炼-我的智能体应用在微信公众号渠道无法得到输出
微信公众号,总是出现思考中,请回复“继续”,而在我的应用观测中已经体现了输出
329 0
|
8月前
|
小程序
【04】微信支付商户申请下户到配置完整流程-微信开放平台移动APP应用通过-微信商户继续申请-微信开户函-视频声明-以及对公打款验证-申请+配置完整流程-优雅草卓伊凡
【04】微信支付商户申请下户到配置完整流程-微信开放平台移动APP应用通过-微信商户继续申请-微信开户函-视频声明-以及对公打款验证-申请+配置完整流程-优雅草卓伊凡
564 1
【04】微信支付商户申请下户到配置完整流程-微信开放平台移动APP应用通过-微信商户继续申请-微信开户函-视频声明-以及对公打款验证-申请+配置完整流程-优雅草卓伊凡
|
8月前
|
消息中间件 人工智能 自然语言处理
基于 RocketMQ 事件驱动架构的 AI 应用实践
基于 RocketMQ 事件驱动架构的 AI 应用实践
284 2
|
9月前
|
消息中间件 监控 数据挖掘
【有奖实践】轻量消息队列(原 MNS)订阅 OSS 事件实时处理文件变动
当你需要对对象存储 OSS(Object Storage Service)中的文件变动进行实时处理、同步、监听、业务触发、日志记录等操作时,你可以通过设置 OSS 的事件通知规则,自定义关注的文件,并将 OSS 事件推送到轻量消息队列(原 MNS)的队列或主题中,开发者的服务即可及时收到相关通知,并通过消费消息进行后续的业务处理。
208 95
|
9月前
|
消息中间件 对象存储
轻量消息队列(原 MNS)订阅 OSS 事件实践
使用轻量消息队列订阅OSS事件,实时处理文件变动,赢取ins风U型枕(限量500个)。访问活动页面,完成实操并上传截图即可参与领奖。活动时间:即日起至2025年2月28日16:00。奖品数量有限,先到先得,快来报名吧!
158 2
|
10月前
|
JSON 小程序 UED
微信小程序 app.json 配置文件解析与应用
本文介绍了微信小程序中 `app.json` 配置文件的详细
1534 12

热门文章

最新文章