XML-RPC 是一种远程过程调用方法,它使用通过 HTTP 传递的 XML 作为载体。 有了它,客户端可以在远程服务器上调用带参数的服务器方法(服务器以 URI 命名)并获取结构化的数据。python自带xmlrpc实现,学习xmlrpc,可以让我们快速了解rpc的实现及原理,本文包括下面几个部分:
- xmlrpc演示
- xmlrpc-API
- xmlrpc-server实现
- xmlrpc-client实现
- xmlrpc序列化/反序列化
- 小结
xmlrpc演示
xmlrpc可以直接运行,启动rpc服务:
# python3 -m xmlrpc.server Serving XML-RPC on localhost port 8000 It is advisable to run this example server within a secure, closed network. 127.0.0.1 - - [05/May/2021 18:03:16] "POST /RPC2 HTTP/1.1" 200 - 127.0.0.1 - - [05/May/2021 18:03:16] "POST /RPC2 HTTP/1.1" 200 - 复制代码
启动rpc客户端:
# python3 -m xmlrpc.client 20210505T18:03:16 42 512 3 复制代码
从服务端可以看到2个http请求。我们先看抓包得到的第一个http请求报文:
METHOD: POST URL: http://localhost:8000/RPC2 HEADERS accept-encoding: gzip content-length: 120 content-type: text/xml host: localhost:8000 user-agent: Python-xmlrpc/3.8 复制代码
请求的数据是:
<?xml version='1.0'?> <methodCall> <methodName> currentTime.getCurrentTime </methodName> <params> </params> </methodCall> 复制代码
http响应报文:
STATUS: 200 OK HEADERS content-length: 163 content-type: text/xml date: Wed, 05 May 2021 09:40:57 GMT server: BaseHTTP/0.6 Python/3.8.5 复制代码
响应的数据是:
<?xml version='1.0'?> <methodResponse> <params> <param> <value> <dateTime.iso8601> 20210505T18:03:16 </dateTime.iso8601> </value> </param> </params> </methodResponse> 复制代码
第二个请求的数据报文我就不贴出来了,下面是请求xml:
<?xml version='1.0'?> <methodCall> <methodName> system.multicall </methodName> <params> <param> <value> <array> <data> <value> <struct> <member> <name> methodName </name> <value> <string> getData </string> </value> </member> <member> <name> params </name> <value> <array> <data> </data> </array> </value> </member> </struct> </value> <value> <struct> <member> <name> methodName </name> <value> <string> pow </string> </value> </member> <member> <name> params </name> <value> <array> <data> <value> <int> 2 </int> </value> <value> <int> 9 </int> </value> </data> </array> </value> </member> </struct> </value> <value> <struct> <member> <name> methodName </name> <value> <string> add </string> </value> </member> <member> <name> params </name> <value> <array> <data> <value> <int> 1 </int> </value> <value> <int> 2 </int> </value> </data> </array> </value> </member> </struct> </value> </data> </array> </value> </param> </params> </methodCall> 复制代码
下面是响应xml:
<?xml version='1.0'?> <methodResponse> <params> <param> <value> <array> <data> <value> <array> <data> <value> <string> 42 </string> </value> </data> </array> </value> <value> <array> <data> <value> <int> 512 </int> </value> </data> </array> </value> <value> <array> <data> <value> <int> 3 </int> </value> </data> </array> </value> </data> </array> </value> </param> </params> </methodResponse> 复制代码
注: 为了完整展示xmlrpc数据,所以我贴了xml全文,导致内容有点长。
从演示可以看到xmlrpc下面几个特点:
- 使用http协议进行数据传输。使用
POST
方法,url是RPC2
,content-type是text/xml
。 - 使用xml对请求/响应进行编码。请求使用
methodCall
标签, 响应使用methodResponse
标签。 - 使用xml数据层层嵌套,冗余较多,看起来非常烦琐 (这应该是xmlrpc没流行起来的原因之一)。
xmlrpc-API
继续查看xmlrpc的API使用,服务端代码:
class ExampleService: def getData(self): return '42' class currentTime: @staticmethod def getCurrentTime(): return datetime.datetime.now() with SimpleXMLRPCServer(("localhost", 8000)) as server: server.register_function(pow) server.register_function(lambda x,y: x+y, 'add') server.register_instance(ExampleService(), allow_dotted_names=True) server.register_multicall_functions() print('Serving XML-RPC on localhost port 8000') print('It is advisable to run this example server within a secure, closed network.') try: server.serve_forever() except KeyboardInterrupt: print("\nKeyboard interrupt received, exiting.") sys.exit(0) 复制代码
服务端做了下面几件事:
- 在8000端口创建SimpleXMLRPCServer的实例server
- 向server注册
pow
和名为add
的lambda函数接口 - 向server注册服务实例instance(可能叫app更合适),instance带有2个函数实现:
getData
和currentTime.getCurrentTime
- 向server注册system.multicall实现,这个实现支持多个rpc请求合并使用一个http请求
- 启动server
客户端是这样使用的:
server = ServerProxy("http://localhost:8000") print(server.currentTime.getCurrentTime()) multi = MultiCall(server) multi.getData() multi.pow(2,9) multi.add(1,2) try: for response in multi(): print(response) except Error as v: print("ERROR", v) 复制代码
- 创建了一个服务代理(可以理解为rpc-client)
- 调用服务端的实现
currentTime.getCurrentTime
- 使用MultiCall的方式调用
getData
,pow
和add
三个rpc接口 - 发送multicall请求,并循环打印服务调用结果
可以很明显的对比出xmlrpc服务和http服务的不同:
- rpc服务接口都是普通的函数,比如pow,getData和getCurrentTime;这些接口和http的request和response是隔离的
- 客户端需要额外实现,并不是直接发送的http请求
同时大家对 RPC(remote procedure call) 应该也有直观了解,简单的解释就是远程函数调用。所谓远程:跨机器是远程,我们这里的跨进程也是远程。至于如何实现远程函数调用,就是各个RPC框架的功能了,今天我们先看看xmlrpc的实现。
xmlrpc-server的实现
服务端http协议实现
xmlrpc中http协议由SimpleXMLRPCServer和SimpleXMLRPCRequestHandler实现:
class SimpleXMLRPCServer(socketserver.TCPServer, SimpleXMLRPCDispatcher): allow_reuse_address = True _send_traceback_header = False def __init__(self, addr, requestHandler=SimpleXMLRPCRequestHandler, logRequests=True, allow_none=False, encoding=None, bind_and_activate=True, use_builtin_types=False): self.logRequests = logRequests SimpleXMLRPCDispatcher.__init__(self, allow_none, encoding, use_builtin_types) socketserver.TCPServer.__init__(self, addr, requestHandler, bind_and_activate) 复制代码
SimpleXMLRPCServer的父类TCPServer在之前的博文中有介绍,提供tcp服务的实现。SimpleXMLRPCRequestHandler负责http协议部分的实现,而xmlrpc规范是必须使用POST请求到 /RPC2
重点就在 do_POST 方法:
class SimpleXMLRPCRequestHandler(BaseHTTPRequestHandler): # rpc-url rpc_paths = ('/', '/RPC2') def do_POST(self): ... max_chunk_size = 10*1024*1024 size_remaining = int(self.headers["content-length"]) L = [] while size_remaining: chunk_size = min(size_remaining, max_chunk_size) chunk = self.rfile.read(chunk_size) if not chunk: break L.append(chunk) size_remaining -= len(L[-1]) data = b''.join(L) ... response = self.server._marshaled_dispatch( data, getattr(self, '_dispatch', None), self.path ) ... self.send_response(200) self.send_header("Content-type", "text/xml") self.send_header("Content-length", str(len(response))) self.end_headers() self.wfile.write(response) 复制代码
do_POST方法分三段:
- 从http请求上读取请求数据,数据长度由 content-length 决定
- 使用server的_marshaled_dispatch方法调用rpc接口
- 将接口返回值包装成http响应返回
服务端rpc协议实现
SimpleXMLRPCServer的另外一个父类SimpleXMLRPCDispatcher提供了rpc协议的实现:
class SimpleXMLRPCDispatcher: def __init__(self, allow_none=False, encoding=None, use_builtin_types=False): # 接口函数字典 self.funcs = {} # 服务实例 (app) self.instance = None self.allow_none = allow_none self.encoding = encoding or 'utf-8' self.use_builtin_types = use_builtin_types def register_instance(self, instance, allow_dotted_names=False): # 注册服务实例对象 self.instance = instance self.allow_dotted_names = allow_dotted_names def register_function(self, function=None, name=None): # 注册接口方法 if name is None: name = function.__name__ self.funcs[name] = function return function def register_multicall_functions(self): """Registers the XML-RPC multicall method in the system namespace.""" # 复合调用 self.funcs.update({'system.multicall' : self.system_multicall}) 复制代码
instace和function的注册比较简单,我们可以跳过实现会略微复杂一点的multical,先看看注册的接口如何在_marshaled_dispatch中调用:
def _marshaled_dispatch(self, data, dispatch_method = None, path = None): try: # 解析rpc接口和参数 params, method = loads(data, use_builtin_types=self.use_builtin_types) # generate response response = self._dispatch(method, params) # wrap response in a singleton tuple response = (response,) # 生成xml响应 response = dumps(response, methodresponse=1, allow_none=self.allow_none, encoding=self.encoding) except Fault as fault: ... except: ... return response.encode(self.encoding, 'xmlcharrefreplace') def _dispatch(self, method, params): try: # call the matching registered function # 查找接口 func = self.funcs[method] except KeyError: pass else: if func is not None: # 执行接口 return func(*params) ... if self.instance is not None: if hasattr(self.instance, '_dispatch'): # call the `_dispatch` method on the instance return self.instance._dispatch(method, params) # call the instance's method directly try: func = resolve_dotted_attribute( self.instance, method, self.allow_dotted_names ) except AttributeError: pass else: if func is not None: return func(*params) ... 复制代码
代码比较长,主要做了2件事:
- 从请求中解析 params 和 method
- 根据method从func或者instance中调用方法并返回
服务端multi-call实现
了解 single-call 后,再回头看 multi-call 的实现,就比较容易。注册接口:
def register_multicall_functions(self): self.funcs.update({'system.multicall' : self.system_multicall}) 复制代码
funcs字典中会增加一个名为 system.multicall ,处理函数为system_multicall的调用:
def system_multicall(self, call_list): """system.multicall([{'methodName': 'add', 'params': [2, 2]}, ...]) => \ [[4], ...] Allows the caller to package multiple XML-RPC calls into a single request. See http://www.xmlrpc.com/discuss/msgReader$1208 """ results = [] # 顺序执行多个call for call in call_list: method_name = call['methodName'] params = call['params'] ... results.append([self._dispatch(method_name, params)]) ... return results 复制代码
system_multicall和注释介绍一样,就是从请求中接受多个请求,然后逐一调用执行。system.multicall的调用数据示例:
<methodName> system.multicall </methodName> <params> ... <member> <name> methodName </name> <value> <string> getData </string> </value> </member> ... <params> 复制代码
xmlrpc-client实现
客户端http协议实现
客户端也需要实现http协议,主要在ServerProxy和Transport中(SafeTransport实现https)。ServerProxy包装Transport对象:
class ServerProxy: def __init__(self, uri, transport=None, encoding=None, verbose=False, allow_none=False, use_datetime=False, use_builtin_types=False, *, headers=(), context=None): # get the url type, uri = urllib.parse._splittype(uri) self.__host, self.__handler = urllib.parse._splithost(uri) .. handler = Transport extra_kwargs = {} transport = handler(use_datetime=use_datetime, use_builtin_types=use_builtin_types, headers=headers, **extra_kwargs) self.__transport = transport ... def __request(self, methodname, params): # call a method on the remote server # 接口调用转为xml请求数据 request = dumps(params, methodname, encoding=self.__encoding, allow_none=self.__allow_none).encode(self.__encoding, 'xmlcharrefreplace') response = self.__transport.request( self.__host, self.__handler, request, verbose=self.__verbose ) return response 复制代码
Transport实现http细节:
class Transport: """Handles an HTTP transaction to an XML-RPC server.""" def __init__(self, use_datetime=False, use_builtin_types=False, *, headers=()): self._use_datetime = use_datetime self._use_builtin_types = use_builtin_types self._connection = (None, None) self._headers = list(headers) self._extra_headers = [] def request(self, host, handler, request_body, verbose=False): http_conn = self.send_request(host, handler, request_body, verbose) resp = http_conn.getresponse() if resp.status == 200: self.verbose = verbose return self.parse_response(resp) def send_request(self, host, handler, request_body, debug): connection = self.make_connection(host) headers = self._headers + self._extra_headers ... connection.putrequest("POST", handler) headers.append(("Content-Type", "text/xml")) headers.append(("User-Agent", self.user_agent)) self.send_headers(connection, headers) self.send_content(connection, request_body) return connection def make_connection(self, host): if self._connection and host == self._connection[0]: return self._connection[1] # create a HTTP connection object from a host descriptor chost, self._extra_headers, x509 = self.get_host_info(host) self._connection = host, http.client.HTTPConnection(chost) return self._connection[1] def parse_response(self, response): stream = response p, u = self.getparser() while 1: data = stream.read(1024) if not data: break if self.verbose: print("body:", repr(data)) p.feed(data) if stream is not response: stream.close() p.close() return u.close() 复制代码
- 使用http.client创建http连接
- 使用send_request发送http请求
- 使用parse_response解析http请求
客户端rpc协议实现
在http协议上使用_Method包装请求:
class ServerProxy: def __getattr__(self, name): # magic method dispatcher return _Method(self.__request, name) class _Method: # some magic to bind an XML-RPC method to an RPC server. # supports "nested" methods (e.g. examples.getStateName) def __init__(self, send, name): self.__send = send self.__name = name def __getattr__(self, name): return _Method(self.__send, "%s.%s" % (self.__name, name)) def __call__(self, *args): return self.__send(self.__name, args) 复制代码
可以使用 server.currentTime.getCurrentTime()
发送请求,这是一个链式调用。server.currentTime会调用 ServerProxy.__getattr__
得到一个_Method对象;继续调用getCurrentTime会执行 _Method.__getattr__
又得到一个_Method对象,最后使用 getCurrentTime()
执行这个method对象的call方法,会使用ServerProxy的call方法将请求发送出去。
客户端multi-call实现
了解客户端 single-call 实现后,继续查看 multi-call,主要涉及下面3个类:
class _MultiCallMethod: def __init__(self, call_list, name): self.__call_list = call_list self.__name = name def __getattr__(self, name): return _MultiCallMethod(self.__call_list, "%s.%s" % (self.__name, name)) def __call__(self, *args): self.__call_list.append((self.__name, args)) # 添加一个call class MultiCallIterator: def __init__(self, results): self.results = results def __getitem__(self, i): item = self.results[i] if type(item) == type({}): raise Fault(item['faultCode'], item['faultString']) elif type(item) == type([]): return item[0] else: raise ValueError("unexpected type in multicall result") class MultiCall: def __init__(self, server): self.__server = server self.__call_list = [] def __getattr__(self, name): return _MultiCallMethod(self.__call_list, name) def __call__(self): marshalled_list = [] for name, args in self.__call_list: marshalled_list.append({'methodName' : name, 'params' : args}) # 最后执行system.multical return MultiCallIterator(self.__server.system.multicall(marshalled_list)) 复制代码
代码行数比较多,和Method一样都是使用python的魔法函数:call, getattr和getitem, 可以对比调用示例体会:
multi = MultiCall(server) multi.getData() multi.pow(2,9) multi.add(1,2) for response in multi(): print(response) 复制代码
xmlrpc序列化/反序列化
rpc服务需要跨网络传输,server和client之间的数据还需要进行序列化/反序列化。主要由Marshaller和Unmarshaller两个类实现:
# client.py class Marshaller: ... class Unmarshaller: ... 复制代码
xmlrpc支持下面9种数据类型:
- array
- base64
- boolean
- date/time
- double
- integer
- string
- struct
- nil
一些数据类型的,比如double和nil在python中是不存在的。这2种数据的编/解码如下:
class Marshaller: def dump_double(self, value, write): write("<value><double>") write(repr(value)) write("</double></value>\n") dispatch[float] = dump_double def dump_nil (self, value, write): if not self.allow_none: raise TypeError("cannot marshal None unless allow_none is enabled") write("<value><nil/></value>") dispatch[type(None)] = dump_nil class Unmarshaller: def end_double(self, data): self.append(float(data)) # float self._value = 0 dispatch["double"] = end_double dispatch["float"] = end_double def end_nil (self, data): self.append(None) # None self._value = 0 dispatch["nil"] = end_nil 复制代码
小结
xmlrpc不考虑tcp协议的情况下,主要是2层模型,底层是http协议,上层是xmlrpc协议。http协议负责网络传输;xmlrpc协议负责将rpc请求转换成xml数据,然后再反序列化成请求执行。