聊聊Python用rpc实现分布式系统调用的那些事

简介:

通俗的讲rpc是什么?


rpc 一般俗称,远程过程调用,把本地的函数,放到远端去调用。

通常我们调用一个方法,譬如: sumadd(10, 20),sumadd方法的具体实现要么是用户自己定义,要么存在于该语言的库函数中,也就说在sumadd方法的代码实现在本地,它是一个本地调用!

“远程调用”意思就是:被调用方法的具体实现不在程序运行本地,而是在别的某个地方(分布到各个服务器),但是用起来像是在本地。


更多的关于RPC的内容可以到我的个人博客,blog.xiaorui.cc 看看。 


rpc远程调用原理 :


比如 A调用B提供的remoteAdd方法:


首先A与B之间建立一个TCP连接;


然后A把需要调用的方法名(这里是remoteAdd)以及方法参数(10, 20)序列化成字节流发送出去;


B接受A发送过来的字节流,然后反序列化得到目标方法名,方法参数,接着执行相应的方法调用(可能是localAdd)并把结果30返回;


A接受远程调用结果,然后do()


RPC框架也就是把上线说的具体的细节封装起来,给用户好用的API使用(提示:有些远程调用选择比较底层的socket协议,有些远程调用选择比较上层的HTTP协议);


一般rpc配合http协议的多点,也就是走http的多。 当然还是看应用,我曾经一共的rpc框架是基于zeromq的zerorpc。速度是挺快,server和client都有python的gevent支持,速度没道理慢。(有兴趣的,可以看看有关zerorpc的文章 http://rfyiamcool.blog.51cto.com/1030776/1254000 最少要比python本身的xml-rpc要快。 rpc over http(基于http的rpc)有两种协议,一种是xml-rpc ,还有一个是 json-rpc。


XML-RPC:XML Remote Procedure Call,即XML远程方法调用,利用http+xml封装进行RPC调用。基于http协议传输、XML作为信息编码格式。一个xml-rpc消息就是一个请求体为xml的http-post请求,服务端执行后也以xml格式编码返回。这个标准面前已经演变为下面的SOAP协议。可以理解SOAP是XML-RPC的高级版本。


JSON-RPC:JSON Remote Procedure Call,即JSON远程方法调用 。类似于XML-RPC,不同之处是使用JSON作为信息交换格式


下面是一个例子,很简单。我们是用python的rpc库SimpleXMLRPCServer 做的测试,创建rpc server,然后注册一些函数,供应别的客户端去调用。


原文:http://rfyiamcool.blog.51cto.com/1030776/1439824


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from SimpleXMLRPCServer  import  SimpleXMLRPCServer
原文:xiaorui.cc
def add(x,y):
     return  x+y
 
def subtract(x, y):
     return  x-y
 
def multiply(x, y):
     return  x*y
 
def divide(x, y):
     return  x /y
 
# A simple server with simple arithmetic functions
server = SimpleXMLRPCServer(( "localhost" , 8000))
print  "Listening on port 8000..."
server.register_multicall_functions()
server.register_function(add,  'add' )
server.register_function(subtract,  'subtract' )
server.register_function(multiply,  'multiply' )
server.register_function(divide,  'divide' )
server.serve_forever()


1
2
3
4
5
6
7
8
9
10
11
import  xmlrpclib
 
proxy = xmlrpclib.ServerProxy( "http://localhost:8000/" )
multicall = xmlrpclib.MultiCall(proxy)
multicall.add(7,3)
multicall.subtract(7,3)
multicall.multiply(7,3)
multicall.divide(7,3)
result = multicall()
 
print  "7+3=%d, 7-3=%d, 7*3=%d, 7/3=%d"  % tuple(result)



rpc本来是单任务的,如果任务相对频繁,可以设置成多线程的默认,你不用在调用threading模块什么的,直接引用 。


1
class  AsyncXMLRPCServer(SocketServer.ThreadingMixIn,SimpleXMLRPCServer):  pass

然后rpc初始化的方法换成。


1
server  =  AsyncXMLRPCServer(('',  1111 ), SimpleXMLRPCRequestHandler)


这里再说下,和xmlrpc相似的jsonrpc,貌似现在用xmlrpc的,要比jsonrpc的多点。  有时候到国外的it论坛看帖子,xmlrpc用的交多点。其实现在较大的公司,一般干脆直接自己实现了rpc框架,像淘宝Dubbo(朋友有搞过,搞了半天,没有对接成接口,说是有难度,不明觉厉!),百度的xxx(忘名字了)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import  jsonrpc
server  =  jsonrpc.Server(jsonrpc.JsonRpc20(), jsonrpc.TransportTcpIp(addr = ( "127.0.0.1" 31415 ), logfunc = jsonrpc.log_file( "myrpc.log" )))
#原文:xiaorui.cc
# 注册一个函数方法
def  echo(s):
     return  s
 
def  search(number = None , last_name = None , first_name = None ):
     sql_where  =  []
     sql_vars   =  []
     if  number  is  not  None :
         sql_where.append( "number=%s" )
         sql_vars.append(number)
     if  last_name  is  not  None :
         sql_where.append( "last_name=%s" )
         sql_vars.append(last_name)
     if  first_name  is  not  None :
         sql_where.append( "first_name=%s" )
         sql_vars.append(first_name)
     sql_query  =  "SELECT id, last_name, first_name, number FROM mytable"
     if  sql_where:
         sql_query  + =  " WHERE"  +  " AND " .join(sql_where)
     cursor  =  ...
     cursor.execute(sql_query,  * sql_vars)
     return  cursor.fetchall()
 
server.register_function( echo )
server.register_function( search )
 
# start server
server.serve()


原文:http://rfyiamcool.blog.51cto.com/1030776/1439824


1
2
3
4
5
6
7
8
# 创建jsonrpc客户端
import  jsonrpc
server  =  jsonrpc.ServerProxy(jsonrpc.JsonRpc20(), jsonrpc.TransportTcpIp(addr = ( "127.0.0.1" 31415 )))
 
#调用远端的一个函数
result  =  server.echo( "hello world" )
 
found  =  server.search(last_name = 'Python' )


我做过一些个压力的测试,XMLRPCSERVER的开了async之后,每个连接特意堵塞5秒,他的并发在40个左右 。也就是每秒成功40个左右,剩下的还是在堵塞等待中。 其实他的瓶颈不是在于rpc的本身,是承载rpc的那个basehttpserver,太弱爆了。

wKiom1PIg-qhOXPZAAH7nZDKUJM508.jpg


接收请求,调用方法 !

wKioL1PIilzjVjTxAAX07GYu-No166.jpg


 现在开源社区这么发达,有不少人都根据rpc的协议,重写了承载rpc的web服务。  比如用flask,tornado,配合uwsgi,你猜咋招了。。。。如果不堵塞连接,那还可以,如果堵塞连接,uwsgi的废材特色就显出来了,以前有文章说过,uwsgi是prework,他会预先启动进程,官方都推荐要根据你的cpu核数或者超线程来开启进程,如果开的太多,你会发现,uwsgi他是驾驭不了那么多进程的。还是看我大tornado,用了@gen.engine之后。轻易飙到500的并发连接。 

(以上是我的吃饱又蛋疼测试,没听过谁会重复调用那么多的堵塞方法,自评 sx行为) 

不多说了,看flask实现xmlrpc服务端的代码,看了下flask xmlrpc的源码,实现的不难。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from  flask  import  Flask
from  flaskext.xmlrpc  import  XMLRPCHandler, Fault
 
app  =  Flask(__name__)
 
handler  =  XMLRPCHandler( 'api' )
handler.connect(app,  '/api' )
 
@handler .register
def  woca(name = "world" ):
     if  not  name:
         raise  Fault( "fuck...fuck" "fuck shencan!" )
     return  "Hello, %s!"  %  name
原文:xiaorui.cc
app.run()

 

对于每个连接的超时,有多种的方法,如果你用的是flask,tornado做web server,那就写个装饰器single起来,只是性能不好。 或者是前面挂一个nginx,然后做个client_header_timeout,client_body_timeout,proxy_connect_timeout(你懂的。),如果用的python自带的xml-rpc的话,需要引入socket。

1
2
import  socket
socket.setdefaulttimeout()


再说下rpc安全的问题。

至于安全方面,有兴趣就开个ssl,或者是在程序里面判断下client ip,反正配置都是统一下发的,你重载daemon的时候,也就知道该判断什么ip了。


原文:http://rfyiamcool.blog.51cto.com/1030776/1439824


我个人对于rpc的应用,更加的倾向于基本资源的获取和调用,毕竟单纯的用socket或者是mq,你在程序里面还要做一个解析过来的数据,然后根据过来的数据在做调用。 (alert: 我想触发 add() ,如果是rpc的话,我不用管,只是传过去就行了,到那时mq和socket就需要eval调用函数了),一些复杂的应用还是喜欢用面向资源的rest,也推荐大家用这个,靠谱的。  






 本文转自 rfyiamcool 51CTO博客,原文链接:http://blog.51cto.com/rfyiamcool/1439824,如需转载请自行联系原作者


相关文章
|
7月前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
756 1
|
12天前
|
机器学习/深度学习 数据可视化 TensorFlow
使用Python实现深度学习模型的分布式训练
使用Python实现深度学习模型的分布式训练
133 73
|
7天前
|
人工智能 分布式计算 数据处理
云产品评测:MaxFrame — 分布式Python计算服务的最佳实践与体验
阿里云推出的MaxFrame是一款高性能分布式计算平台,专为大规模数据处理和AI应用设计。它提供了强大的Python编程接口,支持分布式Pandas操作,显著提升数据处理速度(3-5倍)。MaxFrame在大语言模型数据处理中表现出色,具备高效内存管理和任务调度能力。然而,在开通流程、API文档及功能集成度方面仍有改进空间。总体而言,MaxFrame在易用性和计算效率上具有明显优势,但在开放性和社区支持方面有待加强。
34 9
|
9天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
38 2
|
10天前
|
人工智能 分布式计算 数据处理
云产品评测:分布式Python计算服务MaxFrame
云产品评测:分布式Python计算服务MaxFrame
42 3
|
2月前
|
人工智能 文字识别 Java
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
尼恩,一位拥有20年架构经验的老架构师,通过其深厚的架构功力,成功指导了一位9年经验的网易工程师转型为大模型架构师,薪资逆涨50%,年薪近80W。尼恩的指导不仅帮助这位工程师在一年内成为大模型架构师,还让他管理起了10人团队,产品成功应用于多家大中型企业。尼恩因此决定编写《LLM大模型学习圣经》系列,帮助更多人掌握大模型架构,实现职业跃迁。该系列包括《从0到1吃透Transformer技术底座》、《从0到1精通RAG架构》等,旨在系统化、体系化地讲解大模型技术,助力读者实现“offer直提”。此外,尼恩还分享了多个技术圣经,如《NIO圣经》、《Docker圣经》等,帮助读者深入理解核心技术。
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
|
2月前
|
存储 Python
Python:利用XML-RPC实现简单的远端过程调用
Python:利用XML-RPC实现简单的远端过程调用
17 1
|
4月前
|
消息中间件 JSON 自然语言处理
Python多进程日志以及分布式日志的实现方式
python日志模块logging支持多线程,但是在多进程下写入日志文件容易出现下面的问题: PermissionError: [WinError 32] 另一个程序正在使用此文件,进程无法访问。 也就是日志文件被占用的情况,原因是多个进程的文件handler对日志文件进行操作产生的。
|
5月前
|
NoSQL Redis 数据库
|
5月前
|
分布式计算 并行计算 算法
探索排序的宇宙奥秘:Python中归并排序的并行处理与分布式应用!
【7月更文挑战第11天】归并排序是一种分治算法,适用于并行和分布式处理。在Python中,利用`concurrent.futures`可实现并行归并排序,但因GIL限制,可能需借助`multiprocessing`或GPU库。分布式归并排序则通过分布式框架如Apache Spark处理大规模数据,每个节点独立排序后进行网络合并。并行与分布式技术提升了处理大数据的速度和效率。**
72 9