RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)

简介:
        在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成。那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例。

1. 客户端接口 Client interface

        为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class。 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果。代码如下:

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)

2. 回调函数队列 Callback queue

        总体来说,在RabbitMQ进行RPC远程调用是比较容易的。client发送请求的Message然后server返回响应结果。为了收到响应client在publish message时需要提供一个”callback“(回调)的queue地址。code如下:

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

# ... and some code to read a response message from the callback_queue ...

2.1 Message properties

AMQP 预定义了14个属性。它们中的绝大多很少会用到。以下几个是平时用的比较多的:

  • delivery_mode: 持久化一个Message(通过设定值为2)。其他任意值都是非持久化。请移步RabbitMQ消息队列(三):任务分发机制
  • content_type: 描述mime-type 的encoding。比如设置为JSON编码:设置该property为application/json
  • reply_to: 一般用来指明用于回调的queue(Commonly used to name a callback queue)。
  • correlation_id: 在请求中关联处理RPC响应(correlate RPC responses with requests)。


3. 相关id Correlation id

       在上个小节里,实现方法是对每个RPC请求都会创建一个callback queue。这是不高效的。幸运的是,在这里有一个解决方法:为每个client创建唯一的callback queue。

       这又有其他问题了:收到响应后它无法确定是否是它的,因为所有的响应都写到同一个queue了。上一小节的correlation_id在这种情况下就派上用场了:对于每个request,都设置唯一的一个值,在收到响应后,通过这个值就可以判断是否是自己的响应。如果不是自己的响应,就不去处理。

 

4. 总结

     工作流程:

  • 当客户端启动时,它创建了匿名的exclusive callback queue.
  • 客户端的RPC请求时将同时设置两个properties: reply_to设置为callback queue;correlation_id设置为每个request一个独一无二的值.
  • 请求将被发送到an rpc_queue queue.
  • RPC端或者说server一直在等待那个queue的请求。当请求到达时,它将通过在reply_to指定的queue回复一个message给client。
  • client一直等待callback queue的数据。当message到达时,它将检查correlation_id的值,如果值和它request发送时的一致那么就将返回响应。


5. 最终实现

The code for rpc_server.py:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)

    print " [.] fib(%s)"  % (n,)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print " [x] Awaiting RPC requests"
channel.start_consuming()

The server code is rather straightforward:
  • (4) As usual we start by establishing the connection and declaring the queue.
  • (11) We declare our fibonacci function. It assumes only valid positive integer input. (Don't expect this one to work for big numbers, it's probably the slowest recursive implementation possible).
  • (19) We declare a callback for basic_consume, the core of the RPC server. It's executed when the request is received. It does the work and sends the response back.
  • (32) We might want to run more than one server process. In order to spread the load equally over multiple servers we need to set theprefetch_count setting.

The code for rpc_client.py:

#!/usr/bin/env python
import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print " [x] Requesting fib(30)"
response = fibonacci_rpc.call(30)
print " [.] Got %r" % (response,)

The client code is slightly more involved:
  • (7) We establish a connection, channel and declare an exclusive 'callback' queue for replies.
  • (16) We subscribe to the 'callback' queue, so that we can receive RPC responses.
  • (18) The 'on_response' callback executed on every response is doing a very simple job, for every response message it checks if thecorrelation_id is the one we're looking for. If so, it saves the response inself.response and breaks the consuming loop.
  • (23) Next, we define our main call method - it does the actual RPC request.
  • (24) In this method, first we generate a unique correlation_id number and save it - the 'on_response' callback function will use this value to catch the appropriate response.
  • (25) Next, we publish the request message, with two properties: reply_to and correlation_id.
  • (32) At this point we can sit back and wait until the proper response arrives.
  • (33) And finally we return the response back to the user.

开始rpc_server.py:

$ python rpc_server.py
 [x] Awaiting RPC requests
通过client来请求fibonacci数:

$ python rpc_client.py
 [x] Requesting fib(30)
      现在这个设计并不是唯一的,但是这个实现有以下优势:
  • 如何RPC server太慢,你可以扩展它:启动另外一个RPC server。
  • 在client端, 无所进行加锁能同步操作,他所作的就是发送请求等待响应。

      我们的code还是挺简单的,并没有尝试去解决更复杂和重要的问题,比如:

  • 如果没有server在运行,client需要怎么做?
  • RPC应该设置超时机制吗?
  • 如果server运行出错并且抛出了异常,需要将这个问题转发到client吗?
  • 需要边界检查吗?

尊重原创,转载请注明出处 anzhsoft: http://blog.csdn.net/anzhsoft/article/details/19633107

参考资料:

1. http://www.rabbitmq.com/tutorials/tutorial-six-python.html

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
432 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
12月前
|
边缘计算 负载均衡 NoSQL
FreeMQTT Plus: 一个新型 MQTT Broker 集群的实现
FreeMQTT Plus 是一款基于 MQTT 协议的高性能消息中间件,采用分布式架构解决单点瓶颈问题。其核心由 Nginx 负载均衡器、黑(A)节点(MQTT Broker)、白(B)节点(消息路由)和日志(L)节点组成。通过无主从设计,支持高可用性、负载均衡与灵活扩展。针对会话同步、消息路由等挑战,FreeMQTT Plus 利用 MQTT5 特性定义元命令,实现节点间高效通信,无需依赖第三方组件。适用于物联网海量设备接入与高并发场景,为未来边缘计算和多级集群部署提供坚实基础。
1780 74
|
消息中间件 监控 RocketMQ
Docker部署RocketMQ5.2.0集群
本文详细介绍了如何使用Docker和Docker Compose部署RocketMQ 5.2.0集群。通过创建配置文件、启动集群和验证容器状态,您可以快速搭建起一个RocketMQ集群环境。希望本文能够帮助您更好地理解和应用RocketMQ,提高消息中间件的部署和管理效率。
1702 91
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
503 3
|
负载均衡 网络协议 小程序
SpringCloud远程调用为啥要采用HTTP,而不是RPC?
【8月更文挑战第28天】在微服务架构日益盛行的今天,SpringCloud凭借其强大的生态系统和灵活的集成能力,成为了众多企业构建微服务系统的首选框架。在微服务之间的远程调用中,一个常见的问题是选择HTTP还是RPC(远程过程调用)作为通信协议。本文将深入探讨SpringCloud为何更倾向于采用HTTP而非RPC进行远程调用。
886 5
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
474 93
|
消息中间件 存储 运维
2024最全RabbitMQ集群方案汇总
本文梳理了RabbitMQ集群的几种方案,主要包括普通集群、镜像集群(高可用)、Quorum队列(仲裁队列)、Streams集群模式(高可用+负载均衡)和插件方式。重点介绍了每种方案的特点、优缺点及适用场景。搭建步骤包括安装Erlang和RabbitMQ、配置集群节点、修改hosts文件、配置Erlang Cookie、启动独立节点并创建集群,以及配置镜像队列以提高可用性和容错性。推荐使用Quorum队列与Streams模式,其中Quorum队列适合高可用集群,Streams模式则同时支持高可用和负载均衡。此外,还有Shovel和Federation插件可用于特定场景下的集群搭建。
3212 2
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
245 1
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
336 16

相关产品

  • 云消息队列 MQ