真的好用吗?鲜有人提的 RabbitMQ-RPC模式

简介: 真的好用吗?鲜有人提的 RabbitMQ-RPC模式

前言

我们之前介绍了RabbitMQ的五种模型(详见上方系列文章《RabbitMQ灵活运用,怎么理解五种消息模型》),即简单、轮询、主题、发布/订阅、路由、主题五种模式。除此之外,rabbitMQ还提供了一种 RPC 模式,这种模式是怎么回事?真的好用吗?一起来了解下


一、RPC 及 RabbitMQ-RPC模型

1. RPC概念

RPC是指远程过程调用(Remote Procedure Call),是一种计算机通信协议,用于将一个计算机程序的执行过程转移至另一台计算机上,但对用户而言,它就像是在本地运行一样,RPC通常用于分布式系统或组件化架构中,可以隐藏分布式系统背后的复杂性

cdc3e7238d9d44d7bcd9db59006c6f80.png

2. RabbitMQ-RPC模型

我们照例先看官方流程图

9609780d67bc467aa918cf77569490f9.png

RPC 工作模式如下:


  1. 当客户端启动时,它会创建一个匿名独占回调队列。
  2. 对于 RPC 请求,客户端发送具有两个属性的消息:reply_to,设置为回调队列,correlation_id,设置为每个请求的唯一值,请求将发送到rpc_queue队列。
  3. RPC 工作线程(又名:服务器)正在等待该队列上的请求。当请求出现时,它会执行作业,并使用 reply_to 字段中的队列将包含结果的消息发送回客户端。
  4. 客户端等待回调队列中的数据。出现消息时,它会检查 correlation_id 属性。如果它与请求中的值匹配,则它将响应返回到应用程序。

可以看出,所谓RPC模式其实是两台机器互为消息生产者和消费者,同时利用两条队列,一个负责传递参数,一个负责传递结果


二、RPC模式的Demo

在进一步探讨前,我们先实际运行一段demo


1. 调用方代码

在这个示例中,我们创建了一个RPCClient类,并在构造函数中建立了RabbitMQ连接并创建了一个通道。我们还声明了一个请求队列名称requestQueueName和一个回复队列名称replyQueueName。


在call方法中,我们将请求消息发布到请求队列,并使用replyTo属性将回复队列名称包括在消息中。接着我们定义一个response阻塞队列,它将用于存储来自回复队列的响应。最后,我们使用BlockingQueue.take()方法等待响应内容并返回结果。


调用方代码如下(示例):

package com.example.seeu.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
public class RPCClient implements AutoCloseable {
    private final Connection connection;
    private final Channel channel;
    private final String requestQueueName = "rpc_queue";
    private String replyQueueName;
    final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();
        replyQueueName = channel.queueDeclare().getQueue();
        channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
                response.offer(new String(delivery.getBody(), "UTF-8"));
        }, consumerTag -> {
        });
    }
    public String call(String message) throws IOException, InterruptedException {
        final String corrId = UUID.randomUUID().toString();
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();
        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
        String result = response.take();
        return result;
    }
    public void close() throws IOException {
        connection.close();
    }
    public static void main(String[] argv) {
        try (RPCClient fibonacciRpc = new RPCClient()) {
            for (int i = 1; i <= 30; i++) {
                System.out.println(" [x] 请求 fib("+ i +")");
                String response = fibonacciRpc.call(String.valueOf(i));
                System.out.println(" [.] fib("+ i +")的结果为 '" + response + "'");
            }
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2. 服务方代码

在被调用方,声明了一个名为RPC_QUEUE_NAME的请求队列,并使用queueDeclare方法在RabbitMQ中声明它。我们还设置了basicQos,以确保每次只接收一个请求。


在main方法中,我们定义了一个DeliverCallback,它将处理每个传入的消息并生成响应。对于每个传入的消息,我们提取出correlationId和请求消息。我们执行一些计算,然后将结果作为字符串回复到名为replyTo的消息队列中。最后,我们使用basicAck确认我们已收到消息。


我们使用basicConsume在请求队列上注册一个消费者,并将DeliverCallback传递给它。我们还使用synchronized块和wait()方法来等待每个传入的消息处理完成并立即发送回复。


被调用方代码:

package com.example.seeu.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RPCServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";
    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }
    public static void main(String[] argv) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);
            System.out.println(" [x] Awaiting RPC requests");
            Object monitor = new Object();
            // 当成功接受到消息时,进行的处理逻辑
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        // 返回参数中需要附带请求的id
                        .correlationId(delivery.getProperties().getCorrelationId())
                        .build();
                String response = "";
                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    int n = Integer.parseInt(message);
                    System.out.println(" [.] fib(" + message + ")");
                    response += fib(n);
                } catch (RuntimeException e) {
                    System.out.println(" [.] " + e.toString());
                } finally {
                    // delivery.getProperties().getReplyTo() 即为返回的队列,将结果发送给该该队列
                    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    // 接受确认,通知RabbitMQ 服务器自己已经消费掉此条消息
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    synchronized (monitor) {
                        monitor.notify();
                    }
                }
            };
            // 推模式,监听获取消息
            channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {
            }));
            // 主线程活跃
            while (true) {
                synchronized (monitor) {
                    try {
                        monitor.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

结果:

ba800a9a17c44c1abd6d1af9bc543fea.png


以上示例仅为最简单的 1 对 1 同步调用的示例,当调用方和被调用方是多对多的关系时,调用方还得考虑获取的结果乱序的问题,必须为每一个请求找到其对应的结果


三、RabbitMQ-RPC 的利与弊

尽管我们现在知道了 RabbitMQ 有RPC 的用法,但是其跟真正的 RPC 框架(如duubo) 等进行对比,又有什么优劣呢?


优势:


Dubbo 的跨语言支持相对较差,主要支持 Java 、Golang及部分其他语言;RabbitMQ 支持的语言更多

劣势:


RabbitMQ周边支持不完善,如序列化问题,同步异步选择,服务治理,都需要开发者自行设计与编码

调用方和被调用方多对多时,结果集可能乱序,需开发者为每个结果寻找其请求线程并返回值

高度依赖MQ集群的可用性和稳定性,一旦MQ故障,RPC即无法进行

总结:和真正的RPC框架比起来,优势不足,缺陷太多,主要是需要开发者进行大量代码编写才能实现较完善的RPC功能,有这时间,不如直接使用现成的RPC框架了,所以虽然RabbitMQ 官方提供了 RPC 模式,但使用者寥寥。除非是少量的异步调用,或跨语言问题无法解决,否则笔者亦不建议在生产上使用RabbitMQ 作为 RPC 框架


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
3月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
70 2
|
3月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
60 1
|
3月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
4月前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
|
4月前
|
消息中间件 存储 Kafka
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
|
3月前
|
消息中间件 Java Maven
RabbitMQ通配符模式
RabbitMQ通配符模式
60 0
|
4月前
|
消息中间件 Java Apache
消息队列 MQ使用问题之如何在内外网环境下使用单组节点单副本模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 负载均衡 RocketMQ
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决
|
5月前
|
消息中间件
RabbitMQ配置单活模式队列
RabbitMQ配置单活模式队列
137 0