RabbitMQ主题模式(通配符模式)
前言
通过本篇博客能够简单使用RabbitMQ的主题模式。
本篇博客主要是博主通过官网总结出的RabbitMQ主题模式。其中如果有误欢迎大家及时指正。
什么是Topic模式
Topic模式与Direct模式相比,他们都可以根据Routing key把消息路由到对应的队列上,但是Topic模式相较于Direct来说,它可以基于多个标准进行路由。也就是在队列绑定Routing key的时候使用通配符。这使我们相较于Direct模式灵活性更大。
使用Topic模式的要点
routing key必须是由"."进行分隔的单词列表,最大限制为255字节
通配符规则
- "*"可以代替一个单词。
- "#"可以代替零个或多个单词。
示例
创建了三个绑定:Q1绑定了绑定键“.orange”。和Q2的".*.rabbit"和“lazy.#”。
1.一个消息的路由键为"quick.orange.rabbit" 时,它将会被送到队列Q1和Q2。
2.一个消息的路由键为"quick.orange.fox"时,它将会背诵到队列Q1
3.一个消息的路由键为"lazy.brown.fox"时,它将被送到队列Q2
4.一个消息的路由键为"quick.brown.fox",没有匹配任何队列,消息将会丢失。
5.一个消息的路由键为"lazy.orange.new.rabbit",它将被送到队列Q2
6.一个消息的路由键为"orang"或者"quick.orange.new.rabbit"没有匹配到任何队列消息将丢失。
代码示例
Pom文件引入RabbtiMQ依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.10.0</version> </dependency>
RabbitMQ工具类
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author : [WangWei] * @version : [v1.0] * @className : RabbitMQUtils * @description : [rabbitmq工具类] * @createTime : [2023/1/17 8:49] * @updateUser : [WangWei] * @updateTime : [2023/1/17 8:49] * @updateRemark : [描述说明本次修改内容] */ public class RabbitMQUtils { /* * @version V1.0 * Title: getConnection * @author Wangwei * @description 创建rabbitmq连接 * @createTime 2023/1/17 8:52 * @param [] * @return com.rabbitmq.client.Connection */ public static Connection getConnection() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("ip"); factory.setPort(5672); factory.setVirtualHost("虚拟主机"); factory.setUsername("用户名"); factory.setPassword("密码"); //创建连接 Connection connection=factory.newConnection(); return connection; } /* * @version V1.0 * Title: getChannel * @author Wangwei * @description 创建信道 * @createTime 2023/1/17 8:55 * @param [] * @return com.rabbitmq.client.Channel */ public static Channel getChannel() throws IOException, TimeoutException { Connection connection=getConnection(); Channel channel=connection.createChannel(); return channel; } }
生产者
import com.rabbitmq.client.Channel; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * @author : [WangWei] * @version : [v1.0] * @className : Producer * @description : [生产者] * @createTime : [2023/2/1 9:38] * @updateUser : [WangWei] * @updateTime : [2023/2/1 9:38] * @updateRemark : [描述说明本次修改内容] */ public class Producer { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { //建立连接 RabbitMQUtils.getConnection(); //声明通道 Channel channel = RabbitMQUtils.getChannel(); //创建topic类型交换机并命名为logs channel.exchangeDeclare(EXCHANGE_NAME,"topic"); //声明routingKey String severityInfo="info.log.test"; String severityError="error.test"; String severityError2="log.error.test"; //循环发送2条消息 for (int i = 0; i <2 ; i++) { String msg="info.log.test:"+i; /*推送消息 *交换机命名,不填写使用默认的交换机 * routingKey -路由键- * props:消息的其他属性-路由头等正文 * msg消息正文 */ channel.basicPublish(EXCHANGE_NAME,severityInfo,null,msg.getBytes(StandardCharsets.UTF_8)); System.out.println(msg); } //循环发送2条消息 for (int i = 0; i <2 ; i++) { String msg="主题模式error.test:"+i; /*推送消息 *交换机命名,不填写使用默认的交换机 * routingKey -路由键- * props:消息的其他属性-路由头等正文 * msg消息正文 */ channel.basicPublish(EXCHANGE_NAME,severityError,null,msg.getBytes(StandardCharsets.UTF_8)); System.out.println(msg); } //循环发送2条消息 for (int i = 0; i <2 ; i++) { String msg="log.error.test:"+i; /*推送消息 *交换机命名,不填写使用默认的交换机 * routingKey -路由键- * props:消息的其他属性-路由头等正文 * msg消息正文 */ channel.basicPublish(EXCHANGE_NAME,severityError2,null,msg.getBytes(StandardCharsets.UTF_8)); System.out.println(msg); } } }
消费者1
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author : [WangWei] * @version : [v1.0] * @className : ConsumerOne * @description : [消费者1] * @createTime : [2023/2/1 9:39] * @updateUser : [WangWei] * @updateTime : [2023/2/1 9:39] * @updateRemark : [描述说明本次修改内容] */ public class ConsumerOne { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { RabbitMQUtils.getConnection(); Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic"); String queueName = channel.queueDeclare().getQueue(); //声明routingKey (error) String severityError="error.*"; //交换机与队列进行绑定-如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失 //queueName绑定了direct_logs交换机并且绑定了routingKey channel.queueBind(queueName, EXCHANGE_NAME,severityError ); //因为Rabbitmq服务器将异步地向我们推送消息,所以我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备好使用它们。 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
消费者2
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author : [WangWei] * @version : [v1.0] * @className : ConsumerTwo * @description : [消费者2] * @createTime : [2023/2/1 9:38] * @updateUser : [WangWei] * @updateTime : [2023/2/1 9:38] * @updateRemark : [描述说明本次修改内容] */ public class ConsumerTwo { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { RabbitMQUtils.getConnection(); Channel channel = RabbitMQUtils.getChannel(); //创建fanout类型交换机并命名为logs channel.exchangeDeclare(EXCHANGE_NAME,"topic"); //创建了一个非持久的、排他的、自动删除的队列,并生成了一个名称 String queueName = channel.queueDeclare().getQueue(); //声明routingKey (info,error,warning) String severityInfo="info.#"; String severityError="*.error.*"; //交换机与队列进行绑定-如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失 //queueName绑定了direct_logs交换机并且绑定了3个routingKey channel.queueBind(queueName, EXCHANGE_NAME,severityInfo ); channel.queueBind(queueName, EXCHANGE_NAME,severityError ); //因为Rabbitmq服务器将异步地向我们推送消息,所以我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备好使用它们。 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
效果
总结
通过使用通配符实现灵活性的应用有很多,例如nginx的请求转发,gateway为请求过滤等等都是使用了统配符的技术。通过这种联想来对知识进行结构化,找相同和不同,思考能力和学习力也会有很大的提高。