3.1. 工作队列模型(WorkQueues)
3.1.1 介绍
在入门程序中实现的是一种最简单的工作队列模型,消费者直接绑定到队列上,如下图:
这种方式实现最基本的异步通信,一个生产者,一个队列,一个消费者,生产者将消息发到队列,消费者从队列接收消息。
由于绑定队列的消费者只有一个所以处理消息的能力就比较弱,下边情况将不适合这种方式:
- 如果有大量的任务就需要多个消费者去共同处理。
- 当生产者发送消息的速度远远大于消费者处理任务的速度此时由于消费者只有一个将造成消息堆积。
为了解决上边的问题可以使用到下边的方式,让多个消费者绑定到一个队列,共同消费队列中的消息。
3.1.2 测试
3.1.2.1 创建队列
接下来我们测试工作队列模型。
首先,我们在控制台创建一个新的队列,命名为work.queue:
3.1.2.2 发送消息程序
这次我们循环发送,模拟大量消息堆积现象。
在publisher服务中的SpringAmqpTest类中添加一个测试方法:
/** * workQueue * 向队列中不停发送消息,模拟消息堆积。 */ @Test public void testWorkQueue() throws InterruptedException { // 队列名称 String queueName = "work.queue"; // 消息 String message = "hello, message_"; for (int i = 0; i < 50; i++) { // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息 rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } }
3.1.2.3 接收消息程序
要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:
@RabbitListener(queues = "work.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "work.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(200); }
注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:
- 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
- 消费者2 sleep了200毫秒,相当于每秒处理5个消息
3.1.2.4 测试
测试流程:
- 启动ConsumerApplication。
观察mq控制台点击“work.queue”发现消费者程序和mq建立了两个通道(连接),在监听队列。
- 执行publisher服务中刚刚编写的发送方法testWorkQueue。
- 观察消费端控制台中消费消息的情况
最终结果如下:
消费者1接收到消息:【hello, message_0】21:06:00.869555300 消费者2........接收到消息:【hello, message_1】21:06:00.884518 消费者1接收到消息:【hello, message_2】21:06:00.907454400 消费者1接收到消息:【hello, message_4】21:06:00.953332100 消费者1接收到消息:【hello, message_6】21:06:00.997867300 消费者1接收到消息:【hello, message_8】21:06:01.042178700 消费者2........接收到消息:【hello, message_3】21:06:01.086478800 消费者1接收到消息:【hello, message_10】21:06:01.087476600 消费者1接收到消息:【hello, message_12】21:06:01.132578300 消费者1接收到消息:【hello, message_14】21:06:01.175851200 消费者1接收到消息:【hello, message_16】21:06:01.218533400 消费者1接收到消息:【hello, message_18】21:06:01.261322900 消费者2........接收到消息:【hello, message_5】21:06:01.287003700 消费者1接收到消息:【hello, message_20】21:06:01.304412400 消费者1接收到消息:【hello, message_22】21:06:01.349950100 消费者1接收到消息:【hello, message_24】21:06:01.394533900 消费者1接收到消息:【hello, message_26】21:06:01.439876500 消费者1接收到消息:【hello, message_28】21:06:01.482937800 消费者2........接收到消息:【hello, message_7】21:06:01.488977100 消费者1接收到消息:【hello, message_30】21:06:01.526409300 消费者1接收到消息:【hello, message_32】21:06:01.572148 消费者1接收到消息:【hello, message_34】21:06:01.618264800 消费者1接收到消息:【hello, message_36】21:06:01.660780600 消费者2........接收到消息:【hello, message_9】21:06:01.689189300 消费者1接收到消息:【hello, message_38】21:06:01.705261 消费者1接收到消息:【hello, message_40】21:06:01.746927300 消费者1接收到消息:【hello, message_42】21:06:01.789835 消费者1接收到消息:【hello, message_44】21:06:01.834393100 消费者1接收到消息:【hello, message_46】21:06:01.875312100 消费者2........接收到消息:【hello, message_11】21:06:01.889969500 消费者1接收到消息:【hello, message_48】21:06:01.920702500 消费者2........接收到消息:【hello, message_13】21:06:02.090725900 消费者2........接收到消息:【hello, message_15】21:06:02.293060600 消费者2........接收到消息:【hello, message_17】21:06:02.493748 消费者2........接收到消息:【hello, message_19】21:06:02.696635100 消费者2........接收到消息:【hello, message_21】21:06:02.896809700 消费者2........接收到消息:【hello, message_23】21:06:03.099533400 消费者2........接收到消息:【hello, message_25】21:06:03.301446400 消费者2........接收到消息:【hello, message_27】21:06:03.504999100 消费者2........接收到消息:【hello, message_29】21:06:03.705702500 消费者2........接收到消息:【hello, message_31】21:06:03.906601200 消费者2........接收到消息:【hello, message_33】21:06:04.108118500 消费者2........接收到消息:【hello, message_35】21:06:04.308945400 消费者2........接收到消息:【hello, message_37】21:06:04.511547700 消费者2........接收到消息:【hello, message_39】21:06:04.714038400 消费者2........接收到消息:【hello, message_41】21:06:04.916192700 消费者2........接收到消息:【hello, message_43】21:06:05.116286400 消费者2........接收到消息:【hello, message_45】21:06:05.318055100 消费者2........接收到消息:【hello, message_47】21:06:05.520656400 消费者2........接收到消息:【hello, message_49】21:06:05.723106700
可以看到消费者1和消费者2竟然每人消费了25条消息:
- 消费者1很快完成了自己的25条消息
- 消费者2却在缓慢的处理自己的25条消息。
也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。
3.1.2.5 能者多劳
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
spring: rabbitmq: listener: simple: prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
再次测试,发现结果如下:
消费者1接收到消息:【hello, message_0】21:12:51.659664200 消费者2........接收到消息:【hello, message_1】21:12:51.680610 消费者1接收到消息:【hello, message_2】21:12:51.703625 消费者1接收到消息:【hello, message_3】21:12:51.724330100 消费者1接收到消息:【hello, message_4】21:12:51.746651100 消费者1接收到消息:【hello, message_5】21:12:51.768401400 消费者1接收到消息:【hello, message_6】21:12:51.790511400 消费者1接收到消息:【hello, message_7】21:12:51.812559800 消费者1接收到消息:【hello, message_8】21:12:51.834500600 消费者1接收到消息:【hello, message_9】21:12:51.857438800 消费者1接收到消息:【hello, message_10】21:12:51.880379600 消费者2........接收到消息:【hello, message_11】21:12:51.899327100 消费者1接收到消息:【hello, message_12】21:12:51.922828400 消费者1接收到消息:【hello, message_13】21:12:51.945617400 消费者1接收到消息:【hello, message_14】21:12:51.968942500 消费者1接收到消息:【hello, message_15】21:12:51.992215400 消费者1接收到消息:【hello, message_16】21:12:52.013325600 消费者1接收到消息:【hello, message_17】21:12:52.035687100 消费者1接收到消息:【hello, message_18】21:12:52.058188 消费者1接收到消息:【hello, message_19】21:12:52.081208400 消费者2........接收到消息:【hello, message_20】21:12:52.103406200 消费者1接收到消息:【hello, message_21】21:12:52.123827300 消费者1接收到消息:【hello, message_22】21:12:52.146165100 消费者1接收到消息:【hello, message_23】21:12:52.168828300 消费者1接收到消息:【hello, message_24】21:12:52.191769500 消费者1接收到消息:【hello, message_25】21:12:52.214839100 消费者1接收到消息:【hello, message_26】21:12:52.238998700 消费者1接收到消息:【hello, message_27】21:12:52.259772600 消费者1接收到消息:【hello, message_28】21:12:52.284131800 消费者2........接收到消息:【hello, message_29】21:12:52.306190600 消费者1接收到消息:【hello, message_30】21:12:52.325315800 消费者1接收到消息:【hello, message_31】21:12:52.347012500 消费者1接收到消息:【hello, message_32】21:12:52.368508600 消费者1接收到消息:【hello, message_33】21:12:52.391785100 消费者1接收到消息:【hello, message_34】21:12:52.416383800 消费者1接收到消息:【hello, message_35】21:12:52.439019 消费者1接收到消息:【hello, message_36】21:12:52.461733900 消费者1接收到消息:【hello, message_37】21:12:52.485990 消费者1接收到消息:【hello, message_38】21:12:52.509219900 消费者2........接收到消息:【hello, message_39】21:12:52.523683400 消费者1接收到消息:【hello, message_40】21:12:52.547412100 消费者1接收到消息:【hello, message_41】21:12:52.571191800 消费者1接收到消息:【hello, message_42】21:12:52.593024600 消费者1接收到消息:【hello, message_43】21:12:52.616731800 消费者1接收到消息:【hello, message_44】21:12:52.640317 消费者1接收到消息:【hello, message_45】21:12:52.663111100 消费者1接收到消息:【hello, message_46】21:12:52.686727 消费者1接收到消息:【hello, message_47】21:12:52.709266500 消费者2........接收到消息:【hello, message_48】21:12:52.725884900 消费者1接收到消息:【hello, message_49】21:12:52.746299900
可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。
正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。
3.1.3 小结
Work模型的使用:
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量
3.2. 发布订阅模型(Publish/Subscribe)
3.2.1 介绍
工作队列模型一次只能将消息发给一个队列,绑定队列的多个消息者只能有一个消费者处理消息。如果一条消息要发给多个应用程序使用工作队列模型将无法实现。举例,下图中支付成功后支付服务将消息发给交易服务和通知服务,使用工作队列模型将无法实现。
使用发布订阅模型可以实现上图的需求,发布订阅模型可以实现一条消息发给多个队列,每个队列绑定到同一个交换机,最终实现了向多个消费者发送一条消息,这种模式称为“发布/订阅”模型。
发布订阅模型里,生产者只能将消息发送到交换机,由交换机将消息推送到队列,交换机可以将消息推送给绑定它的所有队列,也可以有针对性的将消息推送给某几个队列,这就相当于有一批消费者订阅了消息,交换机根据各自的订阅去推送消息,组成部分如下:
- Publisher:生产者,不再发送消息到队列中,而是发给交换机
- Exchange:交换机,一方面,接收生产者发送的消息。另一方面,将消息推送给队列,是将消息推送给某个特别队列、递交给所有队列、或是将消息丢弃,到底如何操作,取决于交换机的类型。
- Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
- Consumer:消费者,与以前一样,订阅队列,没有变化
注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力[可以暂存,但不支持持久化],因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
3.2.2 交换机类型
交换机是如何实现将消息推送给所有队列,还是有针对性的将消息推送给某几个队列呢?
实现不同的需求要选用不同类型的交换机,可用的交换机类型:direct, topic, headers 和fanout.
- Fanout:广播类型,将消息交给所有绑定到交换机的队列。
- Direct:直接类型,基于RoutingKey(路由key)发送给订阅了消息的队列,交换机根据routingkey去判断消息应该转发到哪个队列
- Topic:通配符类型(主题类型),与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配类型,基于MQ的消息头匹配,用的较少。
课堂中,我们讲解前面的三种交换机模式。
3.3. Fanout交换机
3.3.1 介绍
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。在广播模式下,消息发送流程是这样的:
- 1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者接收消息,每个队列订阅的消费者只有一个能拿到消息。
3.3.2 测试
3.3.2.1 创建队列
我们的计划是这样的:
- 创建一个名为
hmall.fanout的交换机,类型是Fanout - 创建两个队列
fanout.queue1和fanout.queue2,绑定到交换机hmall.fanout
在控制台创建队列fanout.queue1:
在创建一个队列fanout.queue2:
3.3.2.2 创建交换机
然后再创建一个交换机hmall.fanout:
3.3.2.3 绑定队列到交换机
然后绑定两个队列到交换机:
3.3.2.4 发送消息
下边实现消息发送:
在publisher服务的SpringAmqpTest类中添加测试方法:
@Test public void testFanoutExchange() { // 交换机名称 String exchangeName = "hmall.fanout"; // 消息 String message = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName, "", message); }
发送成功查看mq控制台消息转发到了绑定此交换机的两个队列
3.3.2.5 接收消息
下边实现消息接收:
在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到Fanout消息:【" + msg + "】"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消费者2接收到Fanout消息:【" + msg + "】"); }
将consumer服务启动起来后可以通过rabbitmq控制台查看消费者监听情况
进入队列界面,点击fanout.queue1:
进入队列界面,查看consumers,下图表示fanout.queue1队列有一个监听者。
同样的方法可以查看fanout.queue2队列的监听情况。
fanout.queue1和fanout.queue2每个队列都有一个监听者。
下边执行发送消息程序,观察控制台,下图说明每个消费者成功收到消息。
消费者1接收到Fanout消息:【hello, everyone!】 消费者2接收到Fanout消息:【hello, everyone!】
3.3.2.6 启动多个消费者实例
下边我们把consumer服务启动两个实例
此时再观察fanout.queue1和fanout.queue2的监听者,发现每个队列有两个监听者
此时的结果相当于下图:
此时执行发送消息程序后四个消息者都可以收到消息吗?
通过测试我们发现:
在每个队列的消费者中,发送一条消息只会有一个消费者接收到消息。
每个队列默认采用轮询的方式向消费者推送消息。
3.3.3 小结
交换机的作用是什么?
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列