RPC(四)

本文涉及的产品
云解析 DNS,旗舰版 1个月
云解析DNS,个人版 1个月
全局流量管理 GTM,标准版 1个月
简介: RPC(四)

每日分享

Adopt the pace of nature, her secret is patience.

请接受大自然的节奏,她的秘诀就是耐心。

小闫语录

耐心,静心,以自然的节奏对待生活,以自然的节奏处理事情。你会发现一切没有那么糟糕,事事如此平和顺畅。


RPC

历史文章导航:

RPC(一)

RPC(二)

RPC(三)

1.请求参数转换为消息数据实现

上篇文章我们对消息协议进行了设计,并将其原理解释清楚。今天呢,我们用代码将设计作以实现。

明确好大体思路后,我们开始敲代码了。

1.首先定义一个类:

class DivideProtocol(object):
    """
    divide过程消息协议转换工具
    """

2.然后写一个对请求参数转换为消息数据的方法:

代码结合上一篇文章的设计方案进行阅读,会容易的多。

def args_encode(self, num1, num2=1):
    """
    将原始的调用请求参数转换打包成二进制消息数据
    :param num1: int
    :param num2: int
    :return: bytes 二进制消息数据
    """
    name = 'divide'
    # 处理方法的名字 字符串
    # 处理字符串的长度
    buff = struct.pack('!I', 6)
    # 处理字符
    buff += name.encode()
    # 处理参数1
    # 处理序号
    buff2 = struct.pack('!B', 1)
    # 处理参数值
    buff2 += struct.pack('!i', num1)
    # 处理参数2
    if num2 != 1:
        # 处理序号
        buff2 += struct.pack('!B', 2)
        buff2 += struct.pack('!i', num2)
    # 处理消息长度,边界设定
    length = len(buff2)
    buff += struct.pack('!I', length)
    buff += buff2
    return buff

i:代表的是有符号整数类型用4个字节进行表示。因为参数是有正负的。

B:代表的是整数类型用1个字节进行表示。

2.请求消息数据转换为请求原始参数的实现

接下来我们编写一个方法,用以对请求消息数据的解码操作。

在进行解码操作之前,思考一个问题,就是方法接收的参数是什么?有人会说,这还不容易吗,直接将全部数据接收然后解析,传递一个data即可。可以是可以,但是不方便,因为我们传递的数据大小未知,全部解析完成之后,里面都进行了解码操作,消息数据原先的格式被打乱,无法确定每一个消息的边界,所以需要边解析边确定。直到读取完方法名之后,继续读取边界长度才变成已知。

既然需要边读取边解析,那么就需要一个读取数据的来源。因为我们的RPC建立在网络通讯基础之上,毕竟是远程调用,对吧?所以我们就可以从网络数据中读取数据。在TCP中,我们可以利用socket进行相关操作,定义connection,建立一个网络连接通道,边读取数据边进行解析。

还有一个问题,因为底层TCP的socket在封装完工具类之后,我们还未实现,但是又想测试此工具类,为了方便,需要有一个读取二进制的东西,在此我们引入BytesIO对象,来往外读取二进制数据。此处我们先书写参数解码操作的方法:

def _read_all(self, size):
    """
    帮助我们读取二进制数据
    :param size: 想要读取的二进制数据大小
    :return: 二进制数据 bytes
    """
    # self.conn
    pass
def args_decode(self, connection):
    """
    接收调用请求消息数据并进行解析
    :param connection: 连接对象 socket BytesIO
    :return: dict 包含了解析之后的参数
    """
    # 参数长度映射关系
    param_len_map = {
        1: 4,
        2: 4
    }
    # 参数格式映射关系
    param_fmt_map = {
        1: '!i',
        2: '!i'
    }
    # 参数名字映射关系
    param_name_map = {
        1: 'num1',
        2: 'num2'
    }
    # 保存用来返回参数的字典
    # args = {"num1": xxx, "num2": xxx}
    args = {}
    self.conn = connection
    # 处理方法的名字已经提前被处理
    # 后面我们会实现一个方法专门处理不同的调用请求的方法名解析
    # 处理消息边界
    # 读取二进制数据
    # socket.recv(4) => ?4 判断读取的数据是否为4个,直到4个字节我们才进行处理
    # BytesIO.read
    buff = self._read_all(4)
    # 将二进制数据转换为python的数据类型
    length = struct.unpack('!I', buff)[0]
    # 已经读取处理的字节数
    have = 0
    # 处理第一个参数
    # 1.处理参数序号
    buff = self._read_all(1)
    have += 1
    param_seq = struct.unpack('!B', buff)[0]
    # 2.处理参数值
    param_len = param_len_map[param_seq]
    buff = self._read_all(param_len)
    have += param_len
    param_fmt = param_fmt_map[param_seq]
    param = struct.unpack(param_fmt, buff)[0]
    param_name = param_name_map[param_seq]
    args[param_name] = param
    if have >= length:
        return args
    # 处理第二个参数
    # 1.处理参数序号
    buff = self._read_all(1)
    param_seq = struct.unpack('!B', buff)[0]
    # 2.处理参数值
    param_len = param_len_map[param_seq]
    buff = self._read_all(param_len)
    param_fmt = param_fmt_map[param_seq]
    param = struct.unpack(param_fmt, buff)[0]
    param_name = param_name_map[param_seq]
    args[param_name] = param
    return args

3.read_all方法实现

由于 self.conn可能有不同的类型(可能是socket类型,可能是bytes类型),因此我们需要根据不同的类型按照不同的方法进行读取操作。

from io import BytesIO
def _read_all(self, size):
    """
    帮助我们读取二进制数据
    :param size: 想要读取的二进制数据大小
    :return: 二进制数据 bytes
    """
    # self.conn
    # 读取二进制数据
    # socket.recv(4) => ?4 判断读取的数据是否为4个,直到4个字节我们才进行处理
    # BytesIO.read
    if isinstance(self.conn, BytesIO):
        # 只涉及到本地操作,未涉及网络,不需要特殊处理,因为测试代码需要,此处才进行引用
        buff = self.conn.read(size)
          return buff
    else:
        # socket类型数据如何处理
        # 因涉及到网络,获取到的数据未必是所需大小,所以需要判断
        have = 0
        buff = b''
        while have < size:
            chunk = self.conn.recv(size - have)
            buff += chunk
            l = len(chunk)
            have += l
            if l == 0:
                # 表示客户端socket关闭了
                raise EOFError()
        return buff

4.方法名协议实现

还记得在实现请求消息转换为原始数据的方法中我们留的一个坑吗?现在将其填上。因为解析方法名的方法是通用的,根据解析出对应的方法名再执行对应的调用过程,即对应的协议。所以此方法是独立于 DivideProtocol之外的一个类。我们来定义一下:

class MethodProtocol(object):
    """
    解读方法名字
    """
    def __init__(self, connection):
        self.conn = connection
    def _read_all(self, size):
        """此处方法同3.read_all方法中实现的代码,因此不再重复书写""" 
        # 当然,如果你愿意,直接用类的继承也未尝不可
        ......
    def get_method_name(self):
        """
        提供方法名
        :return: str 方法名
        """
        # 1.读取字符串长度
        buff = self._read_all(4)
        length = struct.unpack('!I', buff)[0]
        # 2.读取字符串
        buff = self._read_all(length)
        name = buff.decode()
        return name
相关文章
|
10月前
|
网络协议 网络架构
01RPC - RPC介绍
01RPC - RPC介绍
43 0
|
11月前
|
网络协议
|
3月前
关于RPC
关于RPC
|
12月前
|
Dubbo Java 应用服务中间件
为什么大厂用的都是RPC服务
在很久以前,笔者刚毕业开始工作那会儿,对于企业开发的模式一直以为HTTP接口开发,也就是我们常说的RESTful风格的服务接口。的确,对于在接口不多、系统与系统交互较少的情况下,解决信息孤岛初期常使用的一种通信手段;优点就是简单、直接、开发方便。
165 1
|
11月前
|
JSON 移动开发 网络协议
|
11月前
|
Go
|
11月前
|
Python
|
XML JSON 前端开发
什么是RPC
1.为什么需要RPC 为什么需要 RPC 呢?通俗的讲就是无法在一个进程内,甚至一个计算机内通过本地调用的方式完成的需求, 比如不同的系统间的通讯,甚至不同的组织间的通讯。 由于计算能力需要横向扩展,需要在多台机器组成的集群上部署应用,
442 0
|
负载均衡
为什么使用RPC
Remote Process Call 远程过程调用。 你对rpc的理解是什么?客户端调用服务端时,就像调用本地函数一样,直接使用并得到结果。
96 0