引言
代码已上传至Github,有兴趣的同学可以下载看看:https://github.com/ylw-github/RabbitMQ-Demo
前面博客讲解了RabbitMQ的五种队列形式《消息中间件系列教程(06) -RabbitMQ -五种队列形式》,主要讲解一下五种队列的代码实现。
主要分为:
- 点对点队列模式(简单)
- 工作队列模式(公平性)
- 发布订阅模式
- 路由模式Routing
- 通配符模式Topics
本文主要讲解发布订阅模式。
发布订阅模式
功能实现:一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个交换机,多个队列,多个消费者。
思路解读:
- 一个生产者,多个消费者
- 每一个消费者都有自己的一个队列
- 生产者没有直接发消息到队列中,而是发送到交换机
- 每个消费者的队列都绑定到交换机上
- 消息通过交换机到达每个消费者的队列
该模式就是Fanout Exchange(扇型交换机)将消息路由给绑定到它身上的所有队列。注意:交换机没有存储消息功能,如果消息发送到没有绑定消费队列的交换机,消息则丢失。
1.用户发邮件案例讲解
1.新建Maven项目RabbitMQ-Demo
2.添加Maven依赖:
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> </dependencies>
3.连接工具类
package com.ylw.rabbitmq; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitMQConnecUtils { public static Connection newConnection() throws IOException, TimeoutException { // 1.定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.设置服务器地址 factory.setHost("127.0.0.1"); // 3.设置协议端口号 factory.setPort(5672); // 4.设置vhost factory.setVirtualHost("OrderHost"); // 5.设置用户名称 factory.setUsername("OrderAdmin"); // 6.设置用户密码 factory.setPassword("123456"); // 7.创建新的连接 Connection newConnection = factory.newConnection(); return newConnection; } }
1.1 生产者
public class Producer { private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { // 1.创建新的连接 Connection connection = RabbitMQConnecUtils.newConnection(); // 2.创建通道 Channel channel = connection.createChannel(); // 3.绑定的交换机 参数1交互机名称 参数2 exchange类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String msg = "fanout_exchange_msg"; // 4.发送消息 channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); // System.out.println("生产者发送msg:" + msg); // // 5.关闭通道、连接 // channel.close(); // connection.close(); // 注意:如果消费没有绑定交换机和队列,则消息会丢失 } }
1.2 消费者
邮件消费者:
public class ConsumerEmailFanout { private static final String QUEUE_NAME = "consumerFanout_email"; private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { // 1.创建新的连接 Connection connection = RabbitMQConnecUtils.newConnection(); // 2.创建通道 Channel channel = connection.createChannel(); // 3.消费者关联队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("邮件消费者获取生产者消息:" + msg); } }; // 5.消费者监听队列消息 channel.basicConsume(QUEUE_NAME, true, consumer); } }
短信消费者:
public class ConsumerSMSFanout { private static final String QUEUE_NAME = "ConsumerFanout_sms"; private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { // 1.创建新的连接 Connection connection = RabbitMQConnecUtils.newConnection(); // 2.创建通道 Channel channel = connection.createChannel(); // 3.消费者关联队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("短信消费者获取生产者消息:" + msg); } }; // 5.消费者监听队列消息 channel.basicConsume(QUEUE_NAME, true, consumer); } }
3. 测试
启动生产者,并关闭,让其在RabbitMQ里面注册交换机,在控制台可以看出注册成功(如果不启动,可以手动注册,如下图Add a new exchange):
启动邮件消费者和短信消费者,在控制台可以看出有两个队列:
再启动生产者,可以看到消费者消费信息: