每日分享
Adopt the pace of nature, her secret is patience.
请接受大自然的节奏,她的秘诀就是耐心。
小闫语录:
耐心,静心,以自然的节奏对待生活,以自然的节奏处理事情。你会发现一切没有那么糟糕,事事如此平和顺畅。
RPC
历史文章导航:
1.请求参数转换为消息数据实现
上篇文章我们对消息协议进行了设计,并将其原理解释清楚。今天呢,我们用代码将设计作以实现。
明确好大体思路后,我们开始敲代码了。
1.首先定义一个类:
class DivideProtocol(object): """ divide过程消息协议转换工具 """
2.然后写一个对请求参数转换为消息数据的方法:
代码结合上一篇文章的设计方案进行阅读,会容易的多。
def args_encode(self, num1, num2=1): """ 将原始的调用请求参数转换打包成二进制消息数据 :param num1: int :param num2: int :return: bytes 二进制消息数据 """ name = 'divide' # 处理方法的名字 字符串 # 处理字符串的长度 buff = struct.pack('!I', 6) # 处理字符 buff += name.encode() # 处理参数1 # 处理序号 buff2 = struct.pack('!B', 1) # 处理参数值 buff2 += struct.pack('!i', num1) # 处理参数2 if num2 != 1: # 处理序号 buff2 += struct.pack('!B', 2) buff2 += struct.pack('!i', num2) # 处理消息长度,边界设定 length = len(buff2) buff += struct.pack('!I', length) buff += buff2 return buff
i:代表的是有符号整数类型用4个字节进行表示。因为参数是有正负的。
B:代表的是整数类型用1个字节进行表示。
2.请求消息数据转换为请求原始参数的实现
接下来我们编写一个方法,用以对请求消息数据的解码操作。
在进行解码操作之前,思考一个问题,就是方法接收的参数是什么?有人会说,这还不容易吗,直接将全部数据接收然后解析,传递一个data即可。可以是可以,但是不方便,因为我们传递的数据大小未知,全部解析完成之后,里面都进行了解码操作,消息数据原先的格式被打乱,无法确定每一个消息的边界,所以需要边解析边确定。直到读取完方法名之后,继续读取边界长度才变成已知。
既然需要边读取边解析,那么就需要一个读取数据的来源。因为我们的RPC建立在网络通讯基础之上,毕竟是远程调用,对吧?所以我们就可以从网络数据中读取数据。在TCP中,我们可以利用socket进行相关操作,定义connection,建立一个网络连接通道,边读取数据边进行解析。
还有一个问题,因为底层TCP的socket在封装完工具类之后,我们还未实现,但是又想测试此工具类,为了方便,需要有一个读取二进制的东西,在此我们引入BytesIO对象,来往外读取二进制数据。此处我们先书写参数解码操作的方法:
def _read_all(self, size): """ 帮助我们读取二进制数据 :param size: 想要读取的二进制数据大小 :return: 二进制数据 bytes """ # self.conn pass def args_decode(self, connection): """ 接收调用请求消息数据并进行解析 :param connection: 连接对象 socket BytesIO :return: dict 包含了解析之后的参数 """ # 参数长度映射关系 param_len_map = { 1: 4, 2: 4 } # 参数格式映射关系 param_fmt_map = { 1: '!i', 2: '!i' } # 参数名字映射关系 param_name_map = { 1: 'num1', 2: 'num2' } # 保存用来返回参数的字典 # args = {"num1": xxx, "num2": xxx} args = {} self.conn = connection # 处理方法的名字已经提前被处理 # 后面我们会实现一个方法专门处理不同的调用请求的方法名解析 # 处理消息边界 # 读取二进制数据 # socket.recv(4) => ?4 判断读取的数据是否为4个,直到4个字节我们才进行处理 # BytesIO.read buff = self._read_all(4) # 将二进制数据转换为python的数据类型 length = struct.unpack('!I', buff)[0] # 已经读取处理的字节数 have = 0 # 处理第一个参数 # 1.处理参数序号 buff = self._read_all(1) have += 1 param_seq = struct.unpack('!B', buff)[0] # 2.处理参数值 param_len = param_len_map[param_seq] buff = self._read_all(param_len) have += param_len param_fmt = param_fmt_map[param_seq] param = struct.unpack(param_fmt, buff)[0] param_name = param_name_map[param_seq] args[param_name] = param if have >= length: return args # 处理第二个参数 # 1.处理参数序号 buff = self._read_all(1) param_seq = struct.unpack('!B', buff)[0] # 2.处理参数值 param_len = param_len_map[param_seq] buff = self._read_all(param_len) param_fmt = param_fmt_map[param_seq] param = struct.unpack(param_fmt, buff)[0] param_name = param_name_map[param_seq] args[param_name] = param return args
3.read_all方法实现
由于 self.conn
可能有不同的类型(可能是socket类型,可能是bytes类型),因此我们需要根据不同的类型按照不同的方法进行读取操作。
from io import BytesIO def _read_all(self, size): """ 帮助我们读取二进制数据 :param size: 想要读取的二进制数据大小 :return: 二进制数据 bytes """ # self.conn # 读取二进制数据 # socket.recv(4) => ?4 判断读取的数据是否为4个,直到4个字节我们才进行处理 # BytesIO.read if isinstance(self.conn, BytesIO): # 只涉及到本地操作,未涉及网络,不需要特殊处理,因为测试代码需要,此处才进行引用 buff = self.conn.read(size) return buff else: # socket类型数据如何处理 # 因涉及到网络,获取到的数据未必是所需大小,所以需要判断 have = 0 buff = b'' while have < size: chunk = self.conn.recv(size - have) buff += chunk l = len(chunk) have += l if l == 0: # 表示客户端socket关闭了 raise EOFError() return buff
4.方法名协议实现
还记得在实现请求消息转换为原始数据的方法中我们留的一个坑吗?现在将其填上。因为解析方法名的方法是通用的,根据解析出对应的方法名再执行对应的调用过程,即对应的协议。所以此方法是独立于 DivideProtocol
之外的一个类。我们来定义一下:
class MethodProtocol(object): """ 解读方法名字 """ def __init__(self, connection): self.conn = connection def _read_all(self, size): """此处方法同3.read_all方法中实现的代码,因此不再重复书写""" # 当然,如果你愿意,直接用类的继承也未尝不可 ...... def get_method_name(self): """ 提供方法名 :return: str 方法名 """ # 1.读取字符串长度 buff = self._read_all(4) length = struct.unpack('!I', buff)[0] # 2.读取字符串 buff = self._read_all(length) name = buff.decode() return name