RabbitMQ中的消息发布-订阅模式是什么?如何实现?

简介: RabbitMQ中的消息发布-订阅模式是什么?如何实现?

RabbitMQ中的消息发布-订阅模式是什么?如何实现?

RabbitMQ中的消息发布-订阅模式是一种常见的消息传递模式,用于将消息广播给多个消费者。在这种模式下,一个生产者将消息发送到一个交换机(Exchange),而交换机将消息广播给所有与之绑定的队列(Queue)。每个队列都有一个消费者来接收消息并进行处理。

下面是一个使用Java代码实现RabbitMQ消息发布-订阅模式的示例:

首先,我们需要创建一个连接工厂,并设置RabbitMQ服务器的主机地址。然后,使用连接工厂创建一个连接,并使用连接创建一个通道。接着,我们声明一个交换机,并将交换机的类型设置为fanout,表示消息将被广播到所有与之绑定的队列。

// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 创建连接
Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare("my_exchange", "fanout");

在声明交换机时,我们需要指定交换机的名称和类型。这里我们将交换机的名称设置为my_exchange,类型设置为fanout。

然后,我们可以通过调用basicPublish方法来发送消息到交换机。在发送消息时,我们需要将交换机的名称设置为目标交换机的名称,并将routingKey参数设置为空字符串。

String message = "Hello, RabbitMQ!";
channel.basicPublish("my_exchange", "", null, message.getBytes());

在上述代码中,我们将交换机的名称设置为my_exchange,表示消息将被广播到与之绑定的所有队列。

接下来,我们可以创建多个队列,并将这些队列绑定到交换机上。每个队列都有一个消费者来接收消息并进行处理。

// 创建队列并绑定到交换机
String queueName1 = channel.queueDeclare().getQueue();
channel.queueBind(queueName1, "my_exchange", "");
String queueName2 = channel.queueDeclare().getQueue();
channel.queueBind(queueName2, "my_exchange", "");

在上述代码中,我们使用queueDeclare方法创建一个匿名队列,并获取队列的名称。然后,我们使用queueBind方法将队列绑定到交换机上,将交换机的名称设置为my_exchange,routingKey参数设置为空字符串。

最后,我们可以通过消费者来接收消息。在消费者中,我们需要使用basicConsume方法来指定要消费的队列和消息处理逻辑。

channel.basicConsume(queueName1, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("Received message from queue 1: " + message);
    }
});
channel.basicConsume(queueName2, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("Received message from queue 2: " + message);
    }
});

在上述代码中,我们使用basicConsume方法来指定要消费的队列,将队列的名称设置为queueName1和queueName2,并传入一个自定义的DefaultConsumer对象。在handleDelivery方法中,我们可以处理接收到的消息。

通过以上步骤,我们就可以实现RabbitMQ中的消息发布-订阅模式。生产者将消息发送到交换机,交换机将消息广播给所有与之绑定的队列,每个队列都有一个消费者来接收并处理消息。

需要注意的是,消息发布-订阅模式中的消息是广播给所有队列的,因此每个队列都会接收到相同的消息。如果需要实现消息的点对点传递,可以使用RabbitMQ的消息路由模式。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 存储 负载均衡
Rabbitmq direct模式保证一个队列只对应一个消费者
Rabbitmq direct模式保证一个队列只对应一个消费者
112 0
|
5月前
|
消息中间件 存储 网络协议
我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用
我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用
|
4月前
|
Java Maven
SpringBoot集成RabbitMQ-三种模式的实现
SpringBoot集成RabbitMQ-三种模式的实现
93 0
|
3月前
|
物联网 Go 网络性能优化
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式【1月更文挑战第21天】【1月更文挑战第104篇】
109 1
|
3月前
|
消息中间件 Java
Java操作RabbitMQ单一生产-消费者模式
Java操作RabbitMQ单一生产-消费者模式
32 0
|
3月前
|
消息中间件 存储 数据安全/隐私保护
深入学习RabbitMQ五种模式(一)
深入学习RabbitMQ五种模式(一)
40 0
|
1月前
|
消息中间件 Java RocketMQ
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
12 1
|
1月前
|
消息中间件 存储 NoSQL
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
27 1
|
4月前
|
消息中间件 中间件 Kafka
RocketMQ源码(二)消息消费的模式到底是Push还是Pull?
RocketMQ源码(二)消息消费的模式到底是Push还是Pull?
82 1
|
5月前
|
传感器 负载均衡 物联网
MQTT v5共享订阅是怎么回事?如何使用共享订阅提高消息订阅的灵活性和可伸缩性?
MQTT v5共享订阅是怎么回事?如何使用共享订阅提高消息订阅的灵活性和可伸缩性?
124 1