[原创]AMQP-RabbitMQ/3/发布订阅模式

本文涉及的产品
函数计算FC,每月15万CU 3个月
简介: [原创]AMQP-RabbitMQ/3/发布订阅模式

3. 发布订阅模式 Publish/Subscribe - 全集监听fanout


一次向多个消费者发送消息


  • 图示

image.png


# 个人理解


  • 生产者定义Exchange,同时将Exchange的类型定义为fanout,并向该Exchange发送消息。
  • 消费者定义队列Queue,并将队列与该交换机进行绑定。之后交换机付负责将消息全量推送给每一个与之绑定的Queue


RabbitMQ中消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列。实际上,生产者通常甚至不知道消息是否会被传递到任何队列。


相反,生产者只能向Exchange发送消息。Exchange所做的工作非常简单。一方面,它接收来自生产者的消息,另一方面将它们推送到队列。Exchange必须确切知道如何处理它收到的消息。它应该附加到特定队列吗?它应该附加到多个队列吗?或者它应该被丢弃。其规则由交换类型定义 。


有几种交换类型可供选择:direct,topic,headers andfanout

  • fanout: 将它接收到的消息广播到所有绑定到它的消息队列上。(忽略路由键routingKey)
  • 生产者 - 发布者

package com.futao.springmvcdemo.mq.rabbit.ps;
import com.futao.springmvcdemo.mq.rabbit.ExchangeTypeEnum;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
/**
 * 发布订阅-发布者
 *
 * @author futao
 * Created on 2019-04-22.
 */
@Slf4j
public class Publisher {
    @SneakyThrows
    public static void main(String[] args) {
        @Cleanup
        Connection connection = RabbitMqConnectionTools.getConnection();
        @Cleanup
        Channel channel = connection.createChannel();
        //定义交换器类型
        channel.exchangeDeclare(ExchangeTypeEnum.FANOUT.getExchangeName(), BuiltinExchangeType.FANOUT);
        String msg = "Hello RabbitMq!";
        for (int i = 0; i < 20; i++) {
            channel.basicPublish(ExchangeTypeEnum.FANOUT.getExchangeName(), "", null, (msg + i).getBytes());
            log.info("Send msg:[{}] success", (msg + i));
        }
    }
}
  • 消费者1  -  订阅者1

package com.futao.springmvcdemo.mq.rabbit.ps;
import com.futao.springmvcdemo.mq.rabbit.ExchangeTypeEnum;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
/**
 * 发布订阅-订阅者
 *
 * @author futao
 * Created on 2019-04-22.
 */
@Slf4j
public class SubscriberOne {
    @SneakyThrows
    public static void main(String[] args) {
        Channel channel = RabbitMqConnectionTools.getChannel();
        //开启持久化队列
        boolean durable = true;
        channel.queueDeclare(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_ONE.getQueueName(), durable, false, false, null);
        //定义交换器类型
        channel.exchangeDeclare(ExchangeTypeEnum.FANOUT.getExchangeName(), BuiltinExchangeType.FANOUT);
        //将消息队列与Exchange交换器绑定
        channel.queueBind(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_ONE.getQueueName(), ExchangeTypeEnum.FANOUT.getExchangeName(), "");
        //告诉rabbitmq一次只发送一条消息,并且在前一个消息未被处理或者消费之前,不继续发送下一个消息
        channel.basicQos(1);
        log.info("Waiting for message...");
        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            log.info("收到消息:[{}],tag:[{}]", new String(message.getBody()), consumerTag);
            //acknowledgment应答
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
            }
        });
        //关闭自动应答
        boolean autoAck = false;
        channel.basicConsume(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_ONE.getQueueName(), autoAck, deliverCallback, consumerTag -> {
        });
    }
}
  • 消费者2  -  订阅者2

package com.futao.springmvcdemo.mq.rabbit.ps;
import com.futao.springmvcdemo.mq.rabbit.ExchangeTypeEnum;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
/**
 * 发布订阅-订阅者
 *
 * @author futao
 * Created on 2019-04-22.
 */
@Slf4j
public class SubscriberTwo {
    @SneakyThrows
    public static void main(String[] args) {
        Channel channel = RabbitMqConnectionTools.getChannel();
        //开启持久化队列
        boolean durable = true;
        channel.queueDeclare(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_TWO.getQueueName(), durable, false, false, null);
        //定义交换器类型为fanout
        channel.exchangeDeclare(ExchangeTypeEnum.FANOUT.getExchangeName(), BuiltinExchangeType.FANOUT);
        //将消息队列与Exchange交换器进行绑定
        channel.queueBind(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_TWO.getQueueName(), ExchangeTypeEnum.FANOUT.getExchangeName(), "");
        //告诉rabbitmq一次只发送一条消息,并且在前一个消息未被处理或者消费之前,不继续发送下一个消息
        channel.basicQos(1);
        log.info("Waiting for message...");
        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            log.info("收到消息:[{}],tag:[{}]", new String(message.getBody()), consumerTag);
            //acknowledgment应答
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            try {
                Thread.sleep(2000);
            } catch (Exception e) {
            }
        });
        //关闭自动应答
        boolean autoAck = false;
        channel.basicConsume(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_TWO.getQueueName(), autoAck, deliverCallback, consumerTag -> {
        });
    }
}
  • 注意


  • 没有在发布者定义队列,而是定义了交换器Exchange。发布者将消息发送到Exchange,而不是Queue
  • 在订阅者端,每个订阅者定义了自己的消息队列,并且将自己的消息队列与Exchange进行绑定。则在每次发布者向相应的Exchange发送消息的时候,Exchange会将消息发送至订阅了该Exchange的队列。(即:每个订阅者收到的消息都是一样的)
  • 测试结果

>>> 订阅者1
[main] INFO mq.rabbit.ps.SubscriberOne - Waiting for message...
[pool-1-thread-4] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!0],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-5] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!1],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-6] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!2],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-7] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!3],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-8] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!4],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-9] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!5],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-10] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!17],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-22] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!18],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-23] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!19],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
>>> 订阅者2
[main] INFO mq.rabbit.ps.SubscriberTwo - Waiting for message...
[pool-1-thread-4] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!0],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-5] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!1],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-6] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!2],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-7] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!3],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-8] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!4],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-9] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!5],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-10] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!17],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-22] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!18],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-23] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!19],tag:[amq.ct


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 存储 数据安全/隐私保护
深入学习RabbitMQ五种模式(一)
深入学习RabbitMQ五种模式(一)
77 0
|
消息中间件
消息中间件系列教程(16) -RabbitMQ-应答模式
消息中间件系列教程(16) -RabbitMQ-应答模式
68 0
|
消息中间件 Java Maven
消息中间件系列教程(12) -RabbitMQ-消息确认机制
消息中间件系列教程(12) -RabbitMQ-消息确认机制
83 0
|
消息中间件 存储 Java
【RabbitMQ四】——RabbitMQ发布订阅模式(Publish/Subscribe)
【RabbitMQ四】——RabbitMQ发布订阅模式(Publish/Subscribe)
350 1
|
7月前
|
消息中间件 存储
深入学习RabbitMQ五种模式(二)
深入学习RabbitMQ五种模式(二)
57 0
|
消息中间件 应用服务中间件 nginx
【RabbitMQ六】——RabbitMQ主题模式(Topic)
【RabbitMQ六】——RabbitMQ主题模式(Topic)
432 1
|
消息中间件 安全 中间件
消息中间件学习笔记--RabbitMQ(二、模式)
消息中间件学习笔记--RabbitMQ(二、模式)
111 9
消息中间件学习笔记--RabbitMQ(二、模式)
|
XML 消息中间件 数据格式
[原创]AMQP-RabbitMQ/6/RPC模式/关注消息处理结果
[原创]AMQP-RabbitMQ/6/RPC模式/关注消息处理结果
[原创]AMQP-RabbitMQ/6/RPC模式/关注消息处理结果