RabbitMQ消息模型之发布订阅Publish-Subscribe

简介: RabbitMQ消息模型之发布订阅Publish-Subscribe

发布订阅模型 Publish/Subscribe

发布订阅模型也称为广播模型,交换机类型需要指定为Fanout,正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。每个消费者都监听自己的队列,所以同一个消息,会被所有的消费者共同消费。Fanout 这种交换类型并不能给我们带来很大的灵活性,它只能进行无意识的广播。

在广播模式下,消息发送流程是这样的:

  • 可以有多个消费者。
  • 每个消费者有自己的Queue。
  • 每个队列都要绑定到Exchange。
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 交换机把消息发送给绑定过的所有队列。
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费。

创建生产者

public class MyProducer {
    @Test
    public void test() throws Exception {
        // Fanout模式不需要指定队列
        String queue = "";
        // 交换机
        String exchange = "logs";
        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);
        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(exchange, "fanout");
        for (int i = 0; i < 3; i++) {
            // 发布消息
            channel.basicPublish(exchange, queue, null, ("DEBUG LOG -> " + i).getBytes());
            channel.basicPublish(exchange, queue, null, ("INFO LOG -> " + i).getBytes());
            channel.basicPublish(exchange, queue, null, ("WARN LOG -> " + i).getBytes());
            channel.basicPublish(exchange, queue, null, ("ERROR LOG -> " + i).getBytes());
        }
    }
}

创建消费者1

public class MyConsumer1 {
    public static void main(String[] args) throws Exception {
        // 指定交换机
        String exchange = "logs";
        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);
        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare("logs", "fanout");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //将临时队列绑定exchange
        channel.queueBind(queue, "logs", "");
        //处理消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1: " + new String(body));
                // TODO 业务处理
            }
        });
    }
}

创建消费者2

public class MyConsumer2 {
    public static void main(String[] args) throws Exception {
        // 指定交换机
        String exchange = "logs";
        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);
        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare("logs", "fanout");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //将临时队列绑定exchange
        channel.queueBind(queue, "logs", "");
        //处理消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2: " + new String(body));
            }
        });
    }
}

两个消费者同时都收到了消息。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
18天前
|
消息中间件
RabbitMQ消息模型之Routing-Direct
RabbitMQ消息模型之Routing-Direct
20 1
|
4月前
|
消息中间件 JSON 缓存
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
|
4月前
|
消息中间件 JSON 缓存
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(基于SpringBoot)
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(基于SpringBoot)
|
5月前
|
消息中间件 Java Kafka
RabbitMQ安装和5种不同的消息模型(BasicQueue,WorkQueue,Fanout Exchange,Direct Exchange,Topic Exchange)与SpringAMQP
RabbitMQ安装和5种不同的消息模型(BasicQueue,WorkQueue,Fanout Exchange,Direct Exchange,Topic Exchange)与SpringAMQP
|
10月前
|
消息中间件 算法 Java
RabbitMQ灵活运用,怎么理解五种消息模型
RabbitMQ灵活运用,怎么理解五种消息模型
83 0
|
消息中间件 存储 负载均衡
理解RabbitMQ中的AMQP-0-9-1模型
之前有个打算在学习RabbitMQ之前,把AMQP详细阅读一次,挑出里面的重点内容。后来找了下RabbitMQ的官方文档,发现了有一篇文档专门介绍了RabbitMQ中实现的AMQP模型部分,于是直接基于此文档和个人理解写下这篇文章。
156 0
理解RabbitMQ中的AMQP-0-9-1模型
|
消息中间件 Java Spring
【RabbitMQ】Basic Queue 简单队列模型与WorkQueue
【RabbitMQ】Basic Queue 简单队列模型与WorkQueue
76 0
【RabbitMQ】Basic Queue 简单队列模型与WorkQueue
|
消息中间件 Java Kafka
RabbitMQ安装以及消息模型使用攻略
🍅程序员小王的博客:程序员小王的博客 🍅 欢迎点赞 👍 收藏 ⭐留言 📝 🍅 如有编辑错误联系作者,如果有比较好的文章欢迎分享给我,我会取其精华去其糟粕 🍅java自学的学习路线:java自学的学习路线
202 0
RabbitMQ安装以及消息模型使用攻略
|
消息中间件 Java 中间件
【Alibaba中间件技术系列】「RocketMQ技术专题」带你一起去探索RocketMQ服务架构的线程模型分析
【Alibaba中间件技术系列】「RocketMQ技术专题」带你一起去探索RocketMQ服务架构的线程模型分析
233 1
【Alibaba中间件技术系列】「RocketMQ技术专题」带你一起去探索RocketMQ服务架构的线程模型分析
|
存储 消息中间件 负载均衡
终于弄明白 RocketMQ 存储模型
RocketMQ 优异的性能表现,必然绕不开其优秀的存储模型 。 这篇文章,笔者按照自己的理解 , 尝试分析 RocketMQ 的存储模型,希望对大家有所启发。