服务器的异步通信——RabbitMQ1

简介: 服务器的异步通信——RabbitMQ

一、同步通信 VS 异步通信

同步通信:双方在同一个时钟信号的控制下,进行数据的接收和发送,来一个时钟,发送端发送,接收端接收,他们彼此之间的工作状态是一致的,例如直播、打电话。

优点:

  • 时效性强,能够立即得到结果

缺点:

  • 耦合性较高:每次加入新的需求,都需要修改原有代码
  • 性能下降:调用者需要等待服务提供者响应,若调用链过长则响应时间等于每次调用时间之和
  • 资源利用率低:调用链中的每个服务在等待响应的过程中,不能释放请求占用的资源,高并发的情况下会造成资源的极度浪费
  • 级联失败:如果服务提供者出现问题,所有的调用方也会跟着出问题

适用场景:业务要求时效性高

异步通信:异步通信在发送字符时,所发送的字符之间的时间间隔可以是任意的。例如微信聊天。

在异步调用过程常见的实现就是事件驱动模式,系统中发生的事件会触发相应的事件处理器或监听器

,从而实现特定的业务逻辑或功能。

例如在如下的支付场景中,当有请求发送给支付服务时,支付服务就会通知Broker,接着后续的订阅事件就会接收到请求,开始同时处理业务,但是支付服务不用等到后续订阅事件完成后再返回,而是将请求通知给Broker之后支付服务就会返回结果。


优点:

  • 服务解耦
  • 性能提升,吞吐量提高
  • 服务之间没有强依赖,不用担心级联失败问题(故障隔离)
  • 流量削峰

缺点:

  • 依赖于Broker的可靠性、安全性和吞吐能力
  • 结构复杂后,业务没有了明显的流水线,难以追踪管理

适用场景:对于并发和吞吐量的要求高,时效性的要求低

二、MQ——消息队列

MQ(消息队列):存放消息的队列,也是事件驱动架构的Broker。

常见的消息队列实现对比:

RabbitMQ

RabbitMQ是基于Erlang语言开发的消息通信中间件,RabbitMQ的性能以及可用性较好,国内应用较为广泛,所以对RabbitMQ进行重点学习。

RabbitMQ的官网地址:https://www.rabbitmq.com

RabbitMQ安装

可以根据自己的需求在RabbitMQ的官网进行查看:下载和安装 RabbitMQ — 兔子MQ

RabbitMQ的整体架构

首先,Publisher会把消息发送给exchange(交换机),exchange负责路由再把消息投递到queue(队列),queue负责暂存消息,Consumer会从队列中获取消息并处理消息。

RabbitMQ中的几个概念:

• channel :操作 MQ 的工具

• exchange :路由消息到队列中

• queue :缓存消息

• virtual host :虚拟主机,是对 queue 、 exchange 等资源的逻辑分组

常见消息模型

RabbitMQ的官方文档中给出了5个MQ的Demo实例,可以分为如下:

  • 基本消息队列(BasicQueue)
  • 工作消息队列(WorkQueue)

发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种:

               Fanout Exchange:广播


               Direct Exchange:路由


               Topic Exchange:主题

基本消息队列(BasicQueue)

官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

  • publisher:消息发布者,将消息发送到队列queue
  • queue:消息队列,负责接受并缓存消息
  • consumer:订阅队列,处理队列中的消息


在RabbitMQ中需要了解的端口:

在使用端口时,需要在云服务器上开放所用的端口

基本消息队列的消息发送流程:

  1. 建立Connection
  1. 创建Channel
  2. 利用Channel声明队列
  3. 利用Channel向队列中发送消息

代码实现:

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("x.x.x.x");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("xx");
        factory.setPassword("xx");
        // 1.2.建立连接
        Connection connection = factory.newConnection();
 
        // 2.创建通道Channel
        Channel channel = connection.createChannel();
 
        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);
 
        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");
 
        // 5.关闭通道和连接
        channel.close();
        connection.close();
 
    }
}

运行结果:

基本消息队列的消息接收流程

  1. 建立Connection
  2. 创建Channel
  3. 利用Channel声明队列
  1. 定义Consumer的消费行为handleDelivery()
  2. 利用Channel将消费者与队列进行绑定

代码实现:

public class ConsumerTest {
 
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("x.x.x.x");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("xx");
        factory.setPassword("xx");
        // 1.2.建立连接
        Connection connection = factory.newConnection();
 
        // 2.创建通道Channel
        Channel channel = connection.createChannel();
 
        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);
 
        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

运行结果:

上述实现方式相对比较复杂,就引入了SpringAMQP来实现。

AMQP:是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

SpringAMQP:SpringAMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

SpringAMQP的官方地址

那么利用SpringAMQP来实现基本消息队列的流程如下:

  1. 在父工程中引入spring-amqp的依赖
  2. 在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
  1. 在consumer服务中编写消费逻辑,绑定simple.queue这个队列

具体实现:

1、在父工程中引入spring-amqp的依赖:

        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2、在publisher中编写测试方法,向simple.queue发送消息:

在publisher服务的配置文件中添加mq的连接信息:

spring:
  rabbitmq:
    host:  # rabbitMQ的ip地址
    port: 5672 # 端口
    username: # 用户名
    password: # 密码
    virtual-host: # 虚拟主机

在publisher服务中新建一个测试类,编写测试方法:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    public void testSendMessage2SimpleQueue() {
        String queueName = "simple.queue";
        String message = "hello, spring amqp!";
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

在RabbitMQ中的simple队列中查询信息:

3、在consumer服务中编写消费逻辑,监听simple.queue

在consumer服务的配置文件中添加mq连接信息:

spring:
  rabbitmq:
    host:  # rabbitMQ的ip地址
    port: 5672 # 端口
    username: # 用户名
    password: # 密码
    virtual-host: # 虚拟主机

在consumer服务中新建一个类,编写具体的消费逻辑:

@Component
public class SpringRabbitListener { 
   @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) throws InterruptedException {
        System.out.println("消费者接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(20);
    }
}

运行启动类:

服务器的异步通信——RabbitMQ2:https://developer.aliyun.com/article/1521836

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
13天前
|
消息中间件 测试技术 RocketMQ
消息队列 MQ产品使用合集之在异步发送消息函数sendMessage()中出现了错误,错误代码为-3,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
22天前
|
Java
Java Socket编程与多线程:提升客户端-服务器通信的并发性能
【6月更文挑战第21天】Java网络编程中,Socket结合多线程提升并发性能,服务器对每个客户端连接启动新线程处理,如示例所示,实现每个客户端的独立操作。多线程利用多核处理器能力,避免串行等待,提升响应速度。防止死锁需减少共享资源,统一锁定顺序,使用超时和重试策略。使用synchronized、ReentrantLock等维持数据一致性。多线程带来性能提升的同时,也伴随复杂性和挑战。
|
22天前
|
安全 Java 网络安全
Java Socket编程教程:构建安全可靠的客户端-服务器通信
【6月更文挑战第21天】构建安全的Java Socket通信涉及SSL/TLS加密、异常处理和重连策略。示例中,`SecureServer`使用SSLServerSocketFactory创建加密连接,而`ReliableClient`展示异常捕获与自动重连。理解安全意识,如防数据截获和中间人攻击,是首要步骤。通过良好的编程实践,确保网络应用在复杂环境中稳定且安全。
|
9天前
|
网络协议 网络架构
【网络编程入门】TCP与UDP通信实战:从零构建服务器与客户端对话(附简易源码,新手友好!)
在了解他们之前我们首先要知道网络模型,它分为两种,一种是OSI,一种是TCP/IP,当然他们的模型图是不同的,如下
|
15小时前
|
安全 网络协议 网络安全
SSL(Secure Sockets Layer)是一种安全协议,用于在客户端和服务器之间建立加密的通信通道。
SSL(Secure Sockets Layer)是一种安全协议,用于在客户端和服务器之间建立加密的通信通道。
4 0
|
3天前
|
Java 数据格式
Java面试题:简述Java Socket编程的基本流程,包括客户端和服务器的创建与通信。
Java面试题:简述Java Socket编程的基本流程,包括客户端和服务器的创建与通信。
11 0
|
13天前
|
消息中间件 Serverless 网络性能优化
消息队列 MQ产品使用合集之客户端和服务器之间的保活心跳检测间隔是怎么设置的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
22天前
|
网络协议 Java Linux
探索Java Socket编程:实现跨平台客户端-服务器通信的奥秘
【6月更文挑战第21天】Java Socket编程示例展示了如何构建跨平台聊天应用。服务器端使用`ServerSocket`监听客户端连接,每个连接启动新线程处理。客户端连接服务器,发送并接收消息。Java的跨平台能力确保代码在不同操作系统上无需修改即可运行,简化开发与维护。
|
13天前
|
消息中间件 监控 Oracle
消息队列 MQ产品使用合集之启动Namesrv节点时,遇到报错,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
13天前
|
消息中间件 Java RocketMQ
消息队列 MQ产品使用合集之当SpringBoot应用因网络不通而启动失败时,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。