一、同步通信 VS 异步通信
同步通信:双方在同一个时钟信号的控制下,进行数据的接收和发送,来一个时钟,发送端发送,接收端接收,他们彼此之间的工作状态是一致的,例如直播、打电话。
优点:
- 时效性强,能够立即得到结果
缺点:
- 耦合性较高:每次加入新的需求,都需要修改原有代码
- 性能下降:调用者需要等待服务提供者响应,若调用链过长则响应时间等于每次调用时间之和
- 资源利用率低:调用链中的每个服务在等待响应的过程中,不能释放请求占用的资源,高并发的情况下会造成资源的极度浪费
- 级联失败:如果服务提供者出现问题,所有的调用方也会跟着出问题
适用场景:业务要求时效性高
异步通信:异步通信在发送字符时,所发送的字符之间的时间间隔可以是任意的。例如微信聊天。
在异步调用过程常见的实现就是事件驱动模式,系统中发生的事件会触发相应的事件处理器或监听器
,从而实现特定的业务逻辑或功能。
例如在如下的支付场景中,当有请求发送给支付服务时,支付服务就会通知Broker,接着后续的订阅事件就会接收到请求,开始同时处理业务,但是支付服务不用等到后续订阅事件完成后再返回,而是将请求通知给Broker之后支付服务就会返回结果。
优点:
- 服务解耦
- 性能提升,吞吐量提高
- 服务之间没有强依赖,不用担心级联失败问题(故障隔离)
- 流量削峰
缺点:
- 依赖于Broker的可靠性、安全性和吞吐能力
- 结构复杂后,业务没有了明显的流水线,难以追踪管理
适用场景:对于并发和吞吐量的要求高,时效性的要求低
二、MQ——消息队列
MQ(消息队列):存放消息的队列,也是事件驱动架构的Broker。
常见的消息队列实现对比:
RabbitMQ
RabbitMQ是基于Erlang语言开发的消息通信中间件,RabbitMQ的性能以及可用性较好,国内应用较为广泛,所以对RabbitMQ进行重点学习。
RabbitMQ的官网地址:https://www.rabbitmq.com
RabbitMQ安装
可以根据自己的需求在RabbitMQ的官网进行查看:下载和安装 RabbitMQ — 兔子MQ
RabbitMQ的整体架构
首先,Publisher会把消息发送给exchange(交换机),exchange负责路由再把消息投递到queue(队列),queue负责暂存消息,Consumer会从队列中获取消息并处理消息。
RabbitMQ中的几个概念:
• channel :操作 MQ 的工具
• exchange :路由消息到队列中
• queue :缓存消息
• virtual host :虚拟主机,是对 queue 、 exchange 等资源的逻辑分组
常见消息模型
RabbitMQ的官方文档中给出了5个MQ的Demo实例,可以分为如下:
- 基本消息队列(BasicQueue)
- 工作消息队列(WorkQueue)
发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种:
Fanout Exchange:广播
Direct Exchange:路由
Topic Exchange:主题
基本消息队列(BasicQueue)
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息
在RabbitMQ中需要了解的端口:
在使用端口时,需要在云服务器上开放所用的端口
基本消息队列的消息发送流程:
- 建立Connection
- 创建Channel
- 利用Channel声明队列
- 利用Channel向队列中发送消息
代码实现:
public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("x.x.x.x"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("xx"); factory.setPassword("xx"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.发送消息 String message = "hello, rabbitmq!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("发送消息成功:【" + message + "】"); // 5.关闭通道和连接 channel.close(); connection.close(); } }
运行结果:
基本消息队列的消息接收流程:
- 建立Connection
- 创建Channel
- 利用Channel声明队列
- 定义Consumer的消费行为handleDelivery()
- 利用Channel将消费者与队列进行绑定
代码实现:
public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("x.x.x.x"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("xx"); factory.setPassword("xx"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.订阅消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.处理消息 String message = new String(body); System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息。。。。"); } }
运行结果:
上述实现方式相对比较复杂,就引入了SpringAMQP来实现。
AMQP:是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
SpringAMQP:SpringAMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
那么利用SpringAMQP来实现基本消息队列的流程如下:
- 在父工程中引入spring-amqp的依赖
- 在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
- 在consumer服务中编写消费逻辑,绑定simple.queue这个队列
具体实现:
1、在父工程中引入spring-amqp的依赖:
<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2、在publisher中编写测试方法,向simple.queue发送消息:
在publisher服务的配置文件中添加mq的连接信息:
spring: rabbitmq: host: # rabbitMQ的ip地址 port: 5672 # 端口 username: # 用户名 password: # 密码 virtual-host: # 虚拟主机
在publisher服务中新建一个测试类,编写测试方法:
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage2SimpleQueue() { String queueName = "simple.queue"; String message = "hello, spring amqp!"; rabbitTemplate.convertAndSend(queueName, message); } }
在RabbitMQ中的simple队列中查询信息:
3、在consumer服务中编写消费逻辑,监听simple.queue
在consumer服务的配置文件中添加mq连接信息:
spring: rabbitmq: host: # rabbitMQ的ip地址 port: 5672 # 端口 username: # 用户名 password: # 密码 virtual-host: # 虚拟主机
在consumer服务中新建一个类,编写具体的消费逻辑:
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg) throws InterruptedException { System.out.println("消费者接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(20); } }
运行启动类:
服务器的异步通信——RabbitMQ2:https://developer.aliyun.com/article/1521836