RabbitMQ工作模式3 Pub/Sub订阅模式

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: RabbitMQ工作模式3 Pub/Sub订阅模式

模式说明

从订阅模式之后,一条消息可以被多个消费者同时消费 ,上述的两种工作模式只能一个消息被一个消费者进行消费,不能实现共用

生产者把消息发给交换机,交换机再把消息路由分发给不同的队列,消费者监听队列去获取消息,一个消息可以被多个消费者同时消费

交换机(Exchange)

一方面,接收生产者发送的消息,另一方面,知道如何处理消息,例如递交给某个特别队列,递交给全部队列,或是将消息丢弃,到底如何操作,取决于Exchange的类型

Exchange有常见的3种类型

1 fanout:广播,将消息交给所有绑定到交换机的队列

2 direct:定向,将消息交给指定routing key(路由键)的队列

3 topic:通配符,将消息交给符合routing pattern(路由模式)的队列

它只负责转发消息,不负责存储消息的能力,因此如果没有任何队列与交换机绑定,或者没有符合路由规则的队列,那么消息会丢失

image.png首先队列是空的

image.png


代码需求:不同的消费者接收到相同的队列信息,执行不同的操作,打印到控制台,保存到数据库

生产者

package com.wyh.producer;
/**
 * @program: SpringBoot-RabbitMQ
 * @description: RabbitMQ生产者 Pub/Sub订阅模式
 * @author: 魏一鹤
 * @createDate: 2022-03-23 22:40
 **/
import com.rabbitmq.client.*;
import javax.naming.Name;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 *  producer主要用来发送消息
 *
 *
**/
public class PubSub_producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立工厂  一般连接都是通过连接工厂进行连接 所以要创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
//2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数
        factory.setHost("127.0.0.1");//设置主机ip  172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1)
        factory.setPort(5672);//设置端口 默认值也是5672
        factory.setVirtualHost("/itcast_wyh");//设置虚拟机 默认值/ 杠
        factory.setUsername("weiyihe");//设置用户名 默认值 guest 游客
        factory.setPassword("weiyihe");//设置密码 默认值 guest 游客
        //3 创建连接Connection
        Connection connection = factory.newConnection();
//4 创建channel
        Channel channel = connection.createChannel();
//5 创建交换机 Exchange  exchangeDeclare有很多参数 下面一一说明
        //参数1 String exchange,  交换机名称
        //参数2 BuiltinExchangeType type,      交换机类型 是一个枚举类型 共有四个值 下面一一介绍
        //枚举1 DIRECT("direct"), 定向
        //枚举2 FANOUT("fanout"), 扇形(广播) 发送消息到每一个与之绑定队列
        //枚举3 TOPIC("topic"),  通配符
        //枚举4 HEADERS("headers"); 参数匹配
        //参数3 boolean durable, 释放持久化
        //参数4 boolean autoDelete,   自动删除
        //参数5 boolean internal,     内部使用 一般false
        //参数6 Map<String, Object> arguments  参数
        //交换机名称
        String exChangeName="test_exchange_wyh";
        channel.exchangeDeclare(exChangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//6 创建队列
        //队列名称
        String queueName1="test_queueName1";
        String queueName2="test_queueName2";
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
//7 绑定队列和交换机的关系 让它们两个组合起来
        //queueBind有三个参数
        //参数1 String queue 队列名称
        //参数2 String exchange   交换机名称
        //参数3 String routingKey  路由键 绑定规则
        //如果交换机的类型为fanout(广播) 那么routingKey设置为空字符串""
        channel.queueBind(queueName1,exChangeName,"");
        channel.queueBind(queueName2,exChangeName,"");
//8 发送消息
        //9 释放资源
    }
}


执行代码运行,发现多了两个队列,就是我们程序中写的队列名称

image.png


并且多了个交换机,且它的类型是fanout

image.png


点击交换机,查看绑定关系


image.png


继续编写发送消息的代码

package com.wyh.producer;
/**
 * @program: SpringBoot-RabbitMQ
 * @description: RabbitMQ生产者 Pub/Sub订阅模式
 * @author: 魏一鹤
 * @createDate: 2022-03-23 22:40
 **/
import com.rabbitmq.client.*;
import javax.naming.Name;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 *  producer主要用来发送消息
 *
 *
**/
public class PubSub_producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立工厂  一般连接都是通过连接工厂进行连接 所以要创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
//2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数
        factory.setHost("127.0.0.1");//设置主机ip  172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1)
        factory.setPort(5672);//设置端口 默认值也是5672
        factory.setVirtualHost("/itcast_wyh");//设置虚拟机 默认值/ 杠
        factory.setUsername("weiyihe");//设置用户名 默认值 guest 游客
        factory.setPassword("weiyihe");//设置密码 默认值 guest 游客
        //3 创建连接Connection
        Connection connection = factory.newConnection();
//4 创建channel
        Channel channel = connection.createChannel();
//5 创建交换机 Exchange  exchangeDeclare有很多参数 下面一一说明
        //参数1 String exchange,  交换机名称
        //参数2 BuiltinExchangeType type,      交换机类型 是一个枚举类型 共有四个值 下面一一介绍
        //枚举1 DIRECT("direct"), 定向
        //枚举2 FANOUT("fanout"), 扇形(广播) 发送消息到每一个与之绑定队列
        //枚举3 TOPIC("topic"),  通配符
        //枚举4 HEADERS("headers"); 参数匹配
        //参数3 boolean durable, 释放持久化
        //参数4 boolean autoDelete,   自动删除
        //参数5 boolean internal,     内部使用 一般false
        //参数6 Map<String, Object> arguments  参数
        //交换机名称
        String exChangeName="test_exchange_wyh";
        channel.exchangeDeclare(exChangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//6 创建队列
        //队列名称
        String queueName1="test_queueName1";
        String queueName2="test_queueName2";
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
//7 绑定队列和交换机的关系 让它们两个组合起来
        //queueBind有三个参数
        //参数1 String queue 队列名称
        //参数2 String exchange   交换机名称
        //参数3 String routingKey  路由键 绑定规则
        //如果交换机的类型为fanout(广播) 那么routingKey设置为空字符串""
        channel.queueBind(queueName1,exChangeName,"");
        channel.queueBind(queueName2,exChangeName,"");
//8 发送消息
        //定义消息信息
        String body="我是订阅模式的消息";
        channel.basicPublish(exChangeName,"",null,body.getBytes());
//9 释放资源
        channel.close();
        connection.close();
    }
}

运行代码,发现队列1和队列2分别受到了消息

image.png

消费者1

package com.wyh.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @program: SpringBoot-RabbitMQ
 * @description: RabbitMQ消费者Consumer 工作队列模式
 * @author: 魏一鹤
 * @createDate: 2022-03-24 22:08
 **/
/**
 *  consumer主要用来消费消息
 *
 *
 **/
public class PubSub_consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立工厂  一般连接都是通过连接工厂进行连接 所以要创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
//2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数
        factory.setHost("127.0.0.1");//设置主机ip  172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1)
        factory.setPort(5672);//设置端口 默认值也是5672
        factory.setVirtualHost("/itcast_wyh");//设置虚拟机 默认值/ 杠
        factory.setUsername("weiyihe");//设置用户名 默认值 guest 游客
        factory.setPassword("weiyihe");//设置密码 默认值 guest 游客
        //3 创建连接Connection
        Connection connection = factory.newConnection();
//4 创建channel
        Channel channel = connection.createChannel();
//队列名称
        String queueName1="test_queueName1";
        String queueName2="test_queueName2";
//接收消息  它的参数比较多 下面一一说明
        //  String queue, 队列名称
        //  boolean autoAck, 是否自动确认  消费者收到消息会自动告诉MQ它收到了消息
        //  Consumer callback  回调对象 可以监听一些方法
        //consumer本质是一个接口 需要创建它的实现类
        Consumer consumer=new DefaultConsumer(channel){
//匿名内部类 重写它的方法
            //回调方法 当收到消息后,会自动执行该方法 它有一些参数
            //String consumerTag 标识
            //Envelope envelope 可以获取一些信息 比如交换机 路由key
            //AMQP.BasicProperties properties 配置信息
            // byte[] body 数据
            //
            @Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//System.out.println("consumerTag = " + consumerTag);
                //System.out.println("exchange = " + envelope.getExchange());
                //System.out.println("routingKey = " + envelope.getRoutingKey());
                //System.out.println("properties = " + properties);
                System.out.println("订阅模式消费者1接收的消息 = " + new String(body));
                System.out.println("将日志打印在控制台");
            }
/**   打印的信息
             订阅模式消费者1接收的消息 = 日志信息:张三调用了findAll方法,日志级别为:info
             将日志打印在控制台
            **/
        };
        channel.basicConsume(queueName2,true,consumer);
//消费者本质是一个监听 所以不要去关闭资源
    }
}

消费者2

package com.wyh.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @program: SpringBoot-RabbitMQ
 * @description: RabbitMQ消费者Consumer 工作队列模式
 * @author: 魏一鹤
 * @createDate: 2022-03-24 22:08
 **/
/**
 *  consumer主要用来消费消息
 *
 *
 **/
public class PubSub_consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立工厂  一般连接都是通过连接工厂进行连接 所以要创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
//2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数
        factory.setHost("127.0.0.1");//设置主机ip  172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1)
        factory.setPort(5672);//设置端口 默认值也是5672
        factory.setVirtualHost("/itcast_wyh");//设置虚拟机 默认值/ 杠
        factory.setUsername("weiyihe");//设置用户名 默认值 guest 游客
        factory.setPassword("weiyihe");//设置密码 默认值 guest 游客
        //3 创建连接Connection
        Connection connection = factory.newConnection();
//4 创建channel
        Channel channel = connection.createChannel();
//队列名称
        String queueName1="test_queueName1";
        String queueName2="test_queueName2";
//接收消息  它的参数比较多 下面一一说明
        //  String queue, 队列名称
        //  boolean autoAck, 是否自动确认  消费者收到消息会自动告诉MQ它收到了消息
        //  Consumer callback  回调对象 可以监听一些方法
        //consumer本质是一个接口 需要创建它的实现类
        Consumer consumer=new DefaultConsumer(channel){
//匿名内部类 重写它的方法
            //回调方法 当收到消息后,会自动执行该方法 它有一些参数
            //String consumerTag 标识
            //Envelope envelope 可以获取一些信息 比如交换机 路由key
            //AMQP.BasicProperties properties 配置信息
            // byte[] body 数据
            //
            @Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//System.out.println("consumerTag = " + consumerTag);
                //System.out.println("exchange = " + envelope.getExchange());
                //System.out.println("routingKey = " + envelope.getRoutingKey());
                //System.out.println("properties = " + properties);
                System.out.println("订阅模式消费者2接收的消息 = " + new String(body));
                System.out.println("将日志存到数据库");
            }
/**
             打印的信息
             订阅模式消费者2接收的消息 = 日志信息:张三调用了findAll方法,日志级别为:info
             将日志存到数据库
            **/
        };
        channel.basicConsume(queueName1,true,consumer);
//消费者本质是一个监听 所以不要去关闭资源
    }
}

可以发现,消费者分别接收相同的消息,执行不同的操作

订阅模式小结

 生产者需要声明交换机,且声明类型,不同的类型影响着不同的结果,需要绑定 交换机和队列的关系,然后由多个消费者分别接收,实现消息复用

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
57 3
通过轻量消息队列(原MNS)主题HTTP订阅+ARMS实现自定义数据多渠道告警
轻量消息队列(原MNS)以其简单队列模型、轻量化协议及按量后付费模式,成为阿里云产品间消息传输首选。本文通过创建主题、订阅、配置告警集成等步骤,展示了该产品在实际应用中的部分功能,确保消息的可靠传输。
66 2
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
100 2
RabbitMQ广播模式
RabbitMQ广播模式
95 1
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
消息队列 MQ使用问题之如何在内外网环境下使用单组节点单副本模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决