3.3 发布订阅模式
在开发过程中,有一些消息需要不同消费者进行不同的处理,如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe)
特点
- 生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中。
- 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机。
1、编写生产者
package com.zj.mq.publish; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /*生产者*/ public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.100"); connectionFactory.setPort(5672); connectionFactory.setUsername("MQzhang"); connectionFactory.setPassword("MQzhang"); connectionFactory.setVirtualHost("/"); //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.建立信道 Channel channel = connection.createChannel(); //4.创建交换机fanout /* * 参数一:交换机名称 * 参数二:交换机类型 * 参数三: 交换机是否持久化(关闭控制台是否还存在)*/ channel.exchangeDeclare("exchangeFanout", BuiltinExchangeType.FANOUT,false); //5.创建三个队列(分别模拟邮件发送、短信发送、站内信发送) channel.queueDeclare("mailQueue", false,false,false,null); channel.queueDeclare("messageQueue", false,false,false,null); channel.queueDeclare("stationQueue", false,false,false,null); //6.交换机绑定队列 /* * 参数一:队列名称 * 参数二:交换机名称 * 参数三:路由关键字,发布订阅模式不存在路由关键字*/ channel.queueBind("mailQueue","exchangeFanout",""); channel.queueBind("messageQueue","exchangeFanout",""); channel.queueBind("stationQueue","exchangeFanout",""); //7.往交换机发送消息 for (int i = 0; i < 10; i++) { channel.basicPublish("exchangeFanout","",null,("你好,MQ"+i).getBytes()); } //8.关闭资源 channel.close(); connection.close(); } }
2、站内信消费者(其他同理)
package com.zj.mq.publish; import com.rabbitmq.client.*; import com.sun.deploy.ui.AboutDialog; import java.io.IOException; import java.util.concurrent.TimeoutException; /*站内信消费者*/ public class ConsumerStation { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionaFactory = new ConnectionFactory(); connectionaFactory.setHost("192.168.66.100"); connectionaFactory.setPort(5672); connectionaFactory.setUsername("MQzhang"); connectionaFactory.setPassword("MQzhang"); connectionaFactory.setVirtualHost("/"); //2.创建连接 Connection connection = connectionaFactory.newConnection(); //3.创建信道 Channel channel = connection.createChannel(); //4.监听队列(一直在连接不会关闭连接) /* * 参数一:监听的队列名 * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。 * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。 * */ channel.basicConsume("stationQueue",true,new DefaultConsumer(channel){ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("发送站内信:"+message); } }); } }
发送站内信:你好,MQ0 发送站内信:你好,MQ1 发送站内信:你好,MQ2 发送站内信:你好,MQ3 发送站内信:你好,MQ4 发送站内信:你好,MQ5 发送站内信:你好,MQ6 发送站内信:你好,MQ7 发送站内信:你好,MQ8 发送站内信:你好,MQ9
当然也能创建多个消费者来监听同一个队列来提高消费速度。
3.4 路由模式
使用发布订阅模式时,所有消息都会发送到绑定的队列中(发送到绑定到交换机上的每个队列,队列再发送给消费者),但很多时候,不是所有消息都无差别的发布到所有队列中。比如电商网站的促销活动,双十一大促可能会发布到所有队列;而一些小的促销活动为了节约成本,只发布到站内信队列。此时需要使用路由模式(Routing)完成这一需求。
特点
- 每个队列绑定路由关键字RoutingKey
- 生产者将带有RoutingKey的消息发送给交换机,交换机根据RoutingKey转发到指定队列。路由模式使用direct交换机。
编写生产者
package com.zj.mq.route; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /*生产者*/ public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.100"); connectionFactory.setPort(5672); connectionFactory.setUsername("MQzhang"); connectionFactory.setPassword("MQzhang"); connectionFactory.setVirtualHost("/"); //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.建立信道 Channel channel = connection.createChannel(); //4.创建交换机fanout /* * 参数一:交换机名称 * 参数二:交换机类型 * 参数三: 交换机是否持久化(关闭控制台是否还存在)*/ channel.exchangeDeclare("exchangeRoute", BuiltinExchangeType.DIRECT,false); //5.创建三个队列(分别模拟邮件发送、短信发送、站内信发送) /* 参数1:队列名 * 参数2:是否持久化,true表示MQ重启后队列还在。 * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问 * 参数4:是否自动删除,true表示不再使用队列时自动删除队列 * 参数5:其他额外参数 * */ channel.queueDeclare("mailQueue", false,false,false,null); channel.queueDeclare("messageQueue", false,false,false,null); channel.queueDeclare("stationQueue", false,false,false,null); //6.交换机绑定队列 /* * 参数一:队列名称 * 参数二:交换机名称 * 参数三:路由关键字,一个队列可以有多个路由关键字 * */ channel.queueBind("mailQueue","exchangeRoute","import"); channel.queueBind("messageQueue","exchangeRoute","normal"); channel.queueBind("stationQueue","exchangeRoute","import"); //7.往交换机发送消息,路由关键字是import,表示交换机会将消息发送到带有import关键字的队列。 channel.basicPublish("exchangeRoute","import",null,("你好,import MQ").getBytes()); //路由关键字是normal表示交换机会将消息发送到带有normal关键字的队列 channel.basicPublish("exchangeRoute","normal",null,("你好,normal MQ").getBytes()); //8.关闭资源 channel.close(); connection.close(); } }
编写消费者
消费者还是和其他模式的消费者是一样的。这里以mailQuene举例子。
package com.zj.mq.route; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /*消费者*/ public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.100"); connectionFactory.setPort(5672); connectionFactory.setUsername("MQzhang"); connectionFactory.setPassword("MQzhang"); connectionFactory.setVirtualHost("/"); //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.创建信道 Channel channel = connection.createChannel(); //4.监听队列(一直在连接不会关闭连接) /* * 参数一:监听的队列名 * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。 * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。 * */ channel.basicConsume("mailQueue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接受消息为:"+message); } }); } }
3.5 通配符模式
通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机。
通配符规则:
- 消息设置RoutingKey时,RoutingKey由多个单词构成,中间以
.
分割。 - 队列设置RoutingKey时,
#
可以匹配任意多个单词,*
可以匹配任意一个单词。
编写生产者
package com.zj.mq.topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /*生产者*/ public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.100"); connectionFactory.setPort(5672); connectionFactory.setUsername("MQzhang"); connectionFactory.setPassword("MQzhang"); connectionFactory.setVirtualHost("/"); //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.建立信道 Channel channel = connection.createChannel(); //4.创建交换机fanout /* * 参数一:交换机名称 * 参数二:交换机类型 * 参数三: 交换机是否持久化(关闭控制台是否还存在)*/ channel.exchangeDeclare("exchangeTopic", BuiltinExchangeType.TOPIC,false); //5.创建三个队列(分别模拟邮件发送、短信发送、站内信发送) channel.queueDeclare("mailQueue", false,false,false,null); channel.queueDeclare("messageQueue", false,false,false,null); channel.queueDeclare("stationQueue", false,false,false,null); //6.交换机绑定队列 /* * 参数一:队列名称 * 参数二:交换机名称 * 参数三:路由关键字,【#.mail.#】 表示:mail前后可以匹配多个单词*/ channel.queueBind("mailQueue","exchangeTopic","#.mail.#"); channel.queueBind("messageQueue","exchangeTopic","#.message.#"); channel.queueBind("stationQueue","exchangeTopic","#.station.#"); //7.往交换机发送消息到三个队列 channel.basicPublish("exchangeTopic","mail.message.station",null,("你好,MQ").getBytes()); //8.关闭资源 channel.close(); connection.close(); } }
编写消费者
也是和其他模式的消费者是一样的只需要监听消费者。
package com.zj.mq.topic; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /*消费者*/ public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.100"); connectionFactory.setPort(5672); connectionFactory.setUsername("MQzhang"); connectionFactory.setPassword("MQzhang"); connectionFactory.setVirtualHost("/"); //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.创建信道 Channel channel = connection.createChannel(); //4.监听队列(一直在连接不会关闭连接) /* * 参数一:监听的队列名 * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。 * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。 * */ channel.basicConsume("mailQueue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接受消息为:"+message); } }); } }
四、SpringBoot整合RabbitMQ
之前我们使用原生JAVA操作RabbitMQ较为繁琐,接下来我们使用SpringBoot整合RabbitMQ,简化代码编写。
1.创建SpringBoot项目,引入RabbitMQ起步依赖(springboot版本是2.7.0)
<!-- RabbitMQ起步依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.编写配置文件
spring: rabbitmq: host: 192.168.66.100 port: 5672 username: MQzhang password: MQzhang virtual-host: / #日志格式 logging: pattern: console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
3.创建队列和交换机
SpringBoot整合RabbitMQ时,需要在配置类创建队列和交换机,写法如下:
package com.zj.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { private final String EXCHANGE_NAME = "boot_topic_exchange"; private final String QUEUE_NAME = "boot_queue"; // 创建交换机 @Bean(EXCHANGE_NAME) public Exchange getExchange() { return ExchangeBuilder .topicExchange(EXCHANGE_NAME) // 交换机类型和名称 .durable(true) // 是否持久化 .build(); } // 创建队列 @Bean(QUEUE_NAME) public Queue getMessageQueue() { return new Queue(QUEUE_NAME); // 队列名 } // 交换机绑定队列 @Bean public Binding bindMessageQueue(@Qualifier(EXCHANGE_NAME) Exchange exchange, @Qualifier(QUEUE_NAME) Queue queue) { return BindingBuilder .bind(queue) .to(exchange) .with("#.message.#") .noargs(); } }