AMQP-RabbitMQ/3/发布订阅模式

本文涉及的产品
函数计算FC,每月15万CU 3个月
简介: 3. 发布订阅模式 Publish/Subscribe - 全集监听fanout一次向多个消费者发送消息图示个人理解生产者定义Exchange,同时将Exchange的类型定义为fanout,并向该Exchange发送消息。

3. 发布订阅模式 Publish/Subscribe - 全集监听fanout

一次向多个消费者发送消息

  • 图示
    发布订阅模式

个人理解

  • 生产者定义Exchange,同时将Exchange的类型定义为fanout,并向该Exchange发送消息。
  • 消费者定义队列Queue,并将队列与该交换机进行绑定。之后交换机付负责将消息全量推送给每一个与之绑定的Queue

**RabbitMQ中消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列。实际上,生产者通常甚至不知道消息是否会被传递到任何队列。
相反,生产者只能向Exchange发送消息。Exchange所做的工作非常简单。一方面,它接收来自生产者的消息,另一方面将它们推送到队列。Exchange必须确切知道如何处理它收到的消息。它应该附加到特定队列吗?它应该附加到多个队列吗?或者它应该被丢弃。其规则由交换类型定义 。**
有几种交换类型可供选择:direct ,topic ,headers andfanout

  • fanout: 将它接收到的消息广播到所有绑定到它的消息队列上。(忽略路由键routingKey)
  • 生产者 - 发布者
package com.futao.springmvcdemo.mq.rabbit.ps;

import com.futao.springmvcdemo.mq.rabbit.ExchangeTypeEnum;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 发布订阅-发布者
 *
 * @author futao
 * Created on 2019-04-22.
 */
@Slf4j
public class Publisher {
    @SneakyThrows
    public static void main(String[] args) {
        @Cleanup
        Connection connection = RabbitMqConnectionTools.getConnection();
        @Cleanup
        Channel channel = connection.createChannel();
        //定义交换器类型
        channel.exchangeDeclare(ExchangeTypeEnum.FANOUT.getExchangeName(), BuiltinExchangeType.FANOUT);
        String msg = "Hello RabbitMq!";
        for (int i = 0; i < 20; i++) {
            channel.basicPublish(ExchangeTypeEnum.FANOUT.getExchangeName(), "", null, (msg + i).getBytes());
            log.info("Send msg:[{}] success", (msg + i));
        }
    }
}
  • 消费者1 - 订阅者1
package com.futao.springmvcdemo.mq.rabbit.ps;

import com.futao.springmvcdemo.mq.rabbit.ExchangeTypeEnum;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 发布订阅-订阅者
 *
 * @author futao
 * Created on 2019-04-22.
 */
@Slf4j
public class SubscriberOne {
    @SneakyThrows
    public static void main(String[] args) {
        Channel channel = RabbitMqConnectionTools.getChannel();
        //开启持久化队列
        boolean durable = true;
        channel.queueDeclare(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_ONE.getQueueName(), durable, false, false, null);
        //定义交换器类型
        channel.exchangeDeclare(ExchangeTypeEnum.FANOUT.getExchangeName(), BuiltinExchangeType.FANOUT);
        //将消息队列与Exchange交换器绑定
        channel.queueBind(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_ONE.getQueueName(), ExchangeTypeEnum.FANOUT.getExchangeName(), "");
        //告诉rabbitmq一次只发送一条消息,并且在前一个消息未被处理或者消费之前,不继续发送下一个消息
        channel.basicQos(1);
        log.info("Waiting for message...");
        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            log.info("收到消息:[{}],tag:[{}]", new String(message.getBody()), consumerTag);
            //acknowledgment应答
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            try {
                Thread.sleep(1000);
            } catch (Exception e) {

            }
        });
        //关闭自动应答
        boolean autoAck = false;
        channel.basicConsume(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_ONE.getQueueName(), autoAck, deliverCallback, consumerTag -> {
        });
    }
}
  • 消费者2 - 订阅者2
package com.futao.springmvcdemo.mq.rabbit.ps;

import com.futao.springmvcdemo.mq.rabbit.ExchangeTypeEnum;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 发布订阅-订阅者
 *
 * @author futao
 * Created on 2019-04-22.
 */
@Slf4j
public class SubscriberTwo {
    @SneakyThrows
    public static void main(String[] args) {
        Channel channel = RabbitMqConnectionTools.getChannel();
        //开启持久化队列
        boolean durable = true;
        channel.queueDeclare(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_TWO.getQueueName(), durable, false, false, null);
        //定义交换器类型为fanout
        channel.exchangeDeclare(ExchangeTypeEnum.FANOUT.getExchangeName(), BuiltinExchangeType.FANOUT);
        //将消息队列与Exchange交换器进行绑定
        channel.queueBind(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_TWO.getQueueName(), ExchangeTypeEnum.FANOUT.getExchangeName(), "");
        //告诉rabbitmq一次只发送一条消息,并且在前一个消息未被处理或者消费之前,不继续发送下一个消息
        channel.basicQos(1);
        log.info("Waiting for message...");
        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            log.info("收到消息:[{}],tag:[{}]", new String(message.getBody()), consumerTag);
            //acknowledgment应答
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            try {
                Thread.sleep(2000);
            } catch (Exception e) {

            }
        });
        //关闭自动应答
        boolean autoAck = false;
        channel.basicConsume(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_TWO.getQueueName(), autoAck, deliverCallback, consumerTag -> {
        });
    }
}
  • 注意

    • 没有在发布者定义队列,而是定义了交换器Exchange。发布者将消息发送到Exchange,而不是Queue
    • 在订阅者端,每个订阅者定义了自己的消息队列,并且将自己的消息队列与Exchange进行绑定。则在每次发布者向相应的Exchange发送消息的时候,Exchange会将消息发送至订阅了该Exchange的队列。(即:每个订阅者收到的消息都是一样的)
  • 测试结果
>>> 订阅者1
[main] INFO mq.rabbit.ps.SubscriberOne - Waiting for message...
[pool-1-thread-4] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!0],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-5] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!1],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-6] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!2],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-7] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!3],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-8] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!4],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-9] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!5],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-10] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!17],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-22] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!18],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-23] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!19],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]


>>> 订阅者2
[main] INFO mq.rabbit.ps.SubscriberTwo - Waiting for message...
[pool-1-thread-4] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!0],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-5] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!1],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-6] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!2],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-7] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!3],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-8] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!4],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-9] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!5],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-10] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!17],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-22] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!18],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-23] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!19],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
Oracle Java 应用服务中间件
Linux下安装jdk和Tomcat
Linux下安装jdk和Tomcat
292 1
|
SQL 数据可视化 关系型数据库
5个实用的SQLite数据库可视化工具(GUI)
5个实用的SQLite数据库可视化工具(GUI)
2849 3
|
XML Android开发 数据格式
Android CheckedTextView 使用+实例
CheckedTextView是什么 主要XML属性 在点击事件里判断状态设置状态 第一次点击无效 实例 1.主界面CheckedTextViewActivity.java 2.主布局activity_textview_ctv.xml 3.复选框Adapter 4.复选框adapter对应布局 5.单选框adapter 6.单选框adapter对应布局 7.逻辑处理从adapter放在主界面处
830 0
Android CheckedTextView 使用+实例
|
9月前
|
存储 人工智能 安全
基于区块链的数字身份认证:重塑身份安全的新范式
基于区块链的数字身份认证:重塑身份安全的新范式
989 16
|
安全 Java Apache
十个方法破解Java生成随机密码的小窍门
十个方法破解Java生成随机密码的小窍门
|
机器学习/深度学习 算法 数据可视化
Fisher模型在统计学和机器学习领域通常指的是Fisher线性判别分析(Fisher's Linear Discriminant Analysis,简称LDA)
Fisher模型在统计学和机器学习领域通常指的是Fisher线性判别分析(Fisher's Linear Discriminant Analysis,简称LDA)
|
存储 JSON 前端开发
Javaweb之SpringBootWeb案例之阿里云OSS服务集成的详细解析
Javaweb之SpringBootWeb案例之阿里云OSS服务集成的详细解析
450 0
|
SQL DataWorks API
DataWorks产品使用合集之如何解决查询列数太多不展示结果
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
计算机视觉
医学影像处理系统源码(PACS)
医学影像处理系统源码(PACS)
269 0
leetcode代码记录(最长连续递增序列
leetcode代码记录(最长连续递增序列
107 2