导入
一个技术的衍生必然是为了解决现实出现的问题,在讲这个问题之前我们先了解一下传统开发中关于服务调用出现的问题(痛点)有哪些?
我们为什么要使用MQ?
①、同步——超时
在多服务体系架构中,必然存在着多个服务之间的调用关系,当用户提交了订单,订单服务会调用支付服务执行用户的金钱操作,执行完毕之后紧接着调用商品服务对商家的商品信息(库存、成交量、收入等)进行更新,执行完毕之后又调用物流服务(对接发货公司、收发地带你)对用户买的商品进行物流实时同步。每一次的服务调用都要等待另一个服务的执行完毕,整个流程下来很耗时。对于系统来说要求实时性要强,可立即得到结果,而同步调用还存在着其他问题:
资源浪费:调用链中每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源
级联失败:如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致真个微服务群鼓掌
②、服务耦合高
订单服务需要分别调用支付服务、商品服务、物流服务,调用者需要等待服务提供者响应,但是如果作为上游服务的物流服务突然宕机,这样会导致订单服务也会出现问题,用户下单失败;并且如果每次加入新需求,此时如果还需要调用通信服务实时给用户同步下单情况这样一个需求,我们就要修改原来的代码,耦合度很高
③、流量高峰
一般在秒杀或团抢活动中使用广泛。比方说订单系统经过测试组接口测试发现最多只能承受100万次请求,而在面对618、双一这种购物狂潮的高峰期,如果一下来了500万的请求系统时是无法处理的,可能会导致数据库无法承受这么大的压力,响应变慢或者直接挂掉
有没有什么办法能够帮助解决上面出现的问题呢?面对上述的种种问题,伟大的人类就提出了一个奇思妙想——把数据暂存。可不可把所有要传输的消息放在一个容器中,当大量请求来的时候先把一部分的消息(逻辑)暂存在容器中,然后慢慢去处理? 于是出现了消息队列的概念
MQ简介
MQ是什么?
Message Queue,消息队列,是基础数据结构中“先进先出”的一种数据结构。把要传输的消息(数据)放在队列中,用队列机制来实现消息传递——生产者生产消息把消息放入队列,然后消费者去处理。消费者可以到指定队列拉取消息,或者订阅响应的队列,由MQ服务端给其推送消息。
为什么要使用队列这种数据结构呢?
我们知道队列具有先进先出的特点。而消息队列就是将消息放到队列里,用队列做存储消息的介质,看作是一个容器。那这里的消息我们可以指代文本字符串,也可以是更复杂的嵌入对象。消息的发送放称为生产者,消息的接收方称为消费者
MQ要解决什么问题?
结合前面讲到应用场景出现的问题,我们来针对性讨论一下方案:
①、同步请求—>异步请求
通过引入MQ之后,我为了提升系统响应性能,我们可以它改造为异步,那异步请求有什么好处?它是如何解决同步出现的问题?
异步调用的本质是一种事件驱动模式:
- 耦合度低(不需要调用对方,只需要发布事件,事件去响应即可)
- 吞吐量提升(不需要等待对方,执行时间更短,吞吐量更大)
- 故障隔离(如果出现故障更容易排查)
- 流量削峰(broker做缓存)
②、高耦合—>低耦合
如果使用消息队列,当订单服务执行完成之后,发送一条消息到队列中,其余三个服务读取到这条消息,那么它立刻开始进行业务的执行。使用了消息队列后,消息的发送方和接收方并不需要知道彼此,这样相互之间也就是没有直接关系,即解耦。
③、削峰
这种峰值流量场景一般是集中于一小段时间内,为了防止系统在这个峰值时间内被流量冲垮,可以采用消息队列来削弱峰值流量,此时的消息队列就可以理解为是一个缓冲区,比方说系统只能处理100万请求,但此时同时有500万请求来临,我们就可以把把多余的400万请求先放到队列中,等系统根据自己处理请求的能力去消息队列去消费。
一般用来解决应用解耦,异步消息,流量削峰等问题,目的是为了实现高性能、高可用、可伸缩和最终一致性架构
MQ的产品有哪些?
RabbitMQ、ActiveMQ、RocketMQ、ZeroMQ、Kafka、IBM WebSphere 等
通过我提出来的应用场景想必大家对于为什么要使用MQ有了初步的了解,接下来我们重点讨论一下RabbitMQ
RabbitMQ
RabbitMQ是什么?
RabbitMQ是一个由Erlang语言开发的AMQP的开源中间件
补充:AMQP是什么?
AMQP我们可以看作是一种协议活规范,而RabbitMQ是基于这个协议下的实现框架。类似于:JDBC和mysql
RabbitMQ的工作原理是什么?
架构图
组件 |
描述 |
生产者(Producer) |
发送消息的应用程序,将消息发送到Broker |
消费者(Consumer) |
接收消息的应用程序,从RabbitMQ Broker获取消息进行处理 |
Broker |
RabbitMQ消息代理服务器,负责接收和分发消息。 |
交换机(Exchange) |
接收生产者发送的消息,并根据路由键routingKey规则将消息路由到跟交换机绑定的一个或多个队列。 交换机主要用来将生产者生产出来的消息,传送到对应的队列中,即交换机是一个消息传送的媒介(具有存储-转发功能),如果路由不到,或许会返回给 Producer(生产者) ,或许会被直接丢弃掉 |
绑定(Binding) |
定义交换机和队列之间的关系,指定消息的路由规则。 |
路由键(Routing Key) |
生产者在发送消息时,将消息附带的路由键发送给交换机,交换机根据路由键将消息路由到相应的队列。 生产者将消息发给交换器的时候,一般会指定一个 RoutingKey(路由键),用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效 |
队列(Queue) |
存储消息的容器,消费者从队列中获取消息进行处理。 它是消息的容器,也是消息的终点 |
整个消息传输流程为:
1. 生产者将消息发送到交换机;
2. 交换机根据路由键将消息路由到相应的队列;
3. 消费者从队列中获取消息进行处理。
RabbitMQ有哪些工作模式?
官方网站:RabbitMQ Tutorials — RabbitMQ
①、Simple(简单工作模式)
特点:一个队列只有一个消费者
消息分发的方式。不同工作模式指的是消息路由的策略和方式不同
内部使用的默认交换机
生产者将消息发送到队列,消费者从队列取出消息
②、Work Queues(工作模式)
特点:多个消费者监听同一个队列
在一个队列中如果有多个消费者,消费者之间对于同一个消息的关系是竞争关系,同一条消息只能由一个消费者消费。但是分担压力,比方说有10条消息,C1处理13579消息,C2处理246810消息。顺序取消息,C1取一条,C2取一条,消费规则是轮询
应用场景:
③、Pub/Sub(订阅模式)
特点:多个消息队列,每个消息队列有一个消费者监听
X:交换机,
生产者生产了消息发给交换机,交换机路由分发给不同的消费者,消费者监听获取消息。一个消息可以被多个消费者同时接收消费
每个消费者都监听自己的队列
使用场景:
A服务可以通过异常处理,如果A服务发送后出现问题可以回滚,什么都有可能出现问题,而我们的目的是保持一致性
④、Routing(路由模式)
特点:一个交换机绑定多个消息队列,每个消息队列都有自己唯一的key,每个消息队列有一个消费者监听
Topics(通配符/主题模式)
*:一个单词
#:零个或多个单词
实战演练
依赖
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency> </dependencies>
生产者
package com.itheima.producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer_HelloWorld { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置参数 factory.setHost("114.115.170.214"); factory.setPort(5672); factory.setVirtualHost("/itcast"); factory.setUsername("admin"); factory.setPassword("admin"); //3.创建连接 Connection Connection connection = factory.newConnection(); //4.创建Channel Channel channel = connection.createChannel(); //5.创建队列Queue channel.queueDeclare("denglimei",true,false,false,null); //要发送的消息 String body ="hello rabbitmq~~~"; //6.发送消息 channel.basicPublish("","denglimei",null,body.getBytes()); //7.释放资源 channel.close(); connection.close(); } }
消费者
package com.itheima.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer_HelloWorld { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置参数 factory.setHost("114.115.170.214"); factory.setPort(5672); factory.setVirtualHost("/itcast"); factory.setUsername("admin"); factory.setPassword("admin"); //3.创建连接 Connection Connection connection = factory.newConnection(); //4.创建Channel Channel channel = connection.createChannel(); //5.创建队列Queue channel.queueDeclare("hello_world",true,false,false,null); //匿名内部类 Consumer consumer = new DefaultConsumer(channel){ /* * 回调方法,当收到消息后,会自动执行该方法 * 1.consumerTag:标识 * 2.envelope:获取一些信息,交换机,路由key * 3.properties:配置信息 * 4.body:数据 * */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long start = System.currentTimeMillis(); for (int i = 0; i < 10000; i++) { System.out.println("第"+i+"条"); } long end = System.currentTimeMillis(); System.out.println(start-end); } }; //接收消息 channel.basicConsume("hello_world",true,consumer); //注意:消费者作为监听者,不要去关闭资源,否则如何监听资源? } }
总结
本次先对MQ和衍生出来的RabbitMQ先做了基本介绍,后续会针对RabbitMQ如何在项目中如何进行削峰、如何解决死信队列等内容做具体介绍,敬请期待~