前言
WorkQueues模型、Fanout交换机、Direct交换机、Topic交换机、基于SpringBoot注解声明队列和交换机、消息转换器。
一、WorkQueues模型
- Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
- 当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。
在控制台创建一个work.queue的队列:
消息发送
循环发送,模拟大量消息堆积现象。
/** * workQueue * 向队列中不停发送消息,模拟消息堆积。 */ @Test public void testWorkQueue() throws InterruptedException { // 队列名称 String queueName = "simple.queue"; // 消息 String message = "hello, message_"; for (int i = 0; i < 50; i++) { // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息 rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } }
消息接收
模拟多个消费者绑定同一个队列,我们添加2个方法,并且设置不同睡眠时间模拟不同性能读取
@RabbitListener(queues = "work.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "work.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(200); }
- 消费者1很快完成了自己的25条消息
- 消费者2却在缓慢的处理自己的25条消息。
也就是说
消息是平均分配
给每个消费者,并没有考虑到消费者的处理能力
。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。
能者多劳
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
spring: rabbitmq: listener: simple: prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
再次测试,发现结果如下:
可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。
正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。
Work模型的使用:
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量
二、交换机类型
1.Fanout交换机
发送流程:
- 可以有多个队列
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机
- 交换机把消息发送给绑定过的所有队列
- 订阅队列的消费者都能拿到消息
在控制台创建fanout.queue1、fanoutqueue2队列和dragon.fanout交换机,并将队列绑定到交换机:
消息发送
@Test public void testFanoutExchange() { // 交换机名称 String exchangeName = "dragon.fanout"; // 消息 String message = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName, "", message); }
消息接收
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到Fanout消息:【" + msg + "】"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消费者2接收到Fanout消息:【" + msg + "】"); }
交换机的作用是什么?
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列
2.Direct交换机
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
- 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
在控制台声明两个队列direct.queue1和direct.queue2,然后声明一个direct类型的交换机,绑定队列和交换机(使用red和blue作为key,绑定direct.queue1到dragon.direct;使用red和yellow作为key,绑定direct.queue2到dragon.direct):
消息接收
@RabbitListener(queues = "direct.queue1") public void listenDirectQueue1(String msg) { System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】"); } @RabbitListener(queues = "direct.queue2") public void listenDirectQueue2(String msg) { System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】"); }
消息发送
@Test public void testSendDirectExchange() { // 交换机名称 String exchangeName = "dragon.direct"; // 消息 String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "red", message); }
绑定红色的都会接收到信息:
描述下Direct交换机与Fanout交换机的差异?
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
3.Topic交换机
- Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。
只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!- BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert
- 通配符规则:
- #:匹配一个或多个词
- *:匹配不多不少恰好1个词
- 举例:
dragon.#:能够匹配dragon.stu.insert 或者 dragon.stu
dragon.*:只能匹配dragon.stu
假如此时publisher发送的消息使用的RoutingKey共有四种:
- china.news 代表有中国的新闻消息;
- china.weather 代表中国的天气消息;
- japan.news 则代表日本新闻
- japan.weather 代表日本的天气消息;
解释:
- topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
- china.news
- china.weather
- topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
- china.news
- japan.news
消息发送
/** * topicExchange */ @Test public void testSendTopicExchange() { // 交换机名称 String exchangeName = "dragon.topic"; // 消息 String message = "喜报!孙悟空大战哥斯拉,胜!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "china.news", message); }
消息接收
@RabbitListener(queues = "topic.queue1") public void listenTopicQueue1(String msg){ System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】"); } @RabbitListener(queues = "topic.queue2") public void listenTopicQueue2(String msg){ System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】"); }
描述下Direct交换机与Topic交换机的差异?
- Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
- Topic交换机与队列绑定时的bindingKey可以指定通配符
- #:代表0个或多个词
- *:代表1个词
三、编程式声明队列和交换机
SpringAMQP提供了一个Queue类,用来创建队列:
fanout示例
@Configuration public class FanoutConfiguration { /** * 声明交换机 * @return Fanout类型交换机 */ @Bean public FanoutExchange fanoutExchange(){ // 还可以是 return ExchangeBuilder.fanoutExchange("dragon.fanout").build(); return new FanoutExchange("dragon.fanout"); } /** * 第1个队列 */ @Bean public Queue fanoutQueue1(){ // 还可以是 return QueueBuilder.durable("fanout.queue1").build(); return new Queue("fanout.queue1"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } /** * 第2个队列 */ @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } //绑定队列和交换机的另一方法 // @Bean // public Binding bindingQueue2(){ // return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange()); // } }
direct示例
@Configuration public class DirectConfiguration { /** * 声明交换机 * @return Direct类型交换机 */ @Bean public DirectExchange directExchange(){ return ExchangeBuilder.directExchange("dragon.direct").build(); } /** * 第1个队列 */ @Bean public Queue directQueue1(){ return new Queue("direct.queue1"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){ return BindingBuilder.bind(directQueue1).to(directExchange).with("red"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){ return BindingBuilder.bind(directQueue1).to(directExchange).with("blue"); } /** * 第2个队列 */ @Bean public Queue directQueue2(){ return new Queue("direct.queue2"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){ return BindingBuilder.bind(directQueue2).to(directExchange).with("red"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){ return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow"); } }
基于注解
上面的代码还是很多的,基于注解的方式也能够代替上面的繁杂配置,下面演示direct和topic交换机,队列的代码
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "dragon.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "dragon.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】"); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg){ System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String msg){ System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】"); }
四、消息转换器
随便创建一个队列,然后发送Map对象,你会发现消息格式很不友好
@Test public void testSendMap() throws InterruptedException { // 准备消息 Map<String,Object> msg = new HashMap<>(); msg.put("name", "柳岩"); msg.put("age", 21); // 发送消息 rabbitTemplate.convertAndSend("object.queue", msg); }
控制台查看:
- Spring的消息发送代码接收的消息体是一个Object,而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
- 只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大
有安全漏洞
可读性差
- 根据上面测试,显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
j解决:
引入依赖:
<dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>
注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。其转换器配置。
添加配置:
@Bean public MessageConverter messageConverter(){ // 1.定义消息转换器 Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息 jackson2JsonMessageConverter.setCreateMessageIds(true); return jackson2JsonMessageConverter; }
总结
以上就是所有讲解。