RPC(六)

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: RPC(六)

每日分享

Nothing is so fatiguing as the eternal hanging on of an uncompleted task.

没有哪一件事情比藏在心里的一件无法完成的事情更劳累的。

小闫语录

心头积攒的琐事是阻碍你前进的罪魁祸首,尽管有些事、有些人难以释怀更难放下,但是你的心又有谁懂?放过自己,成全自己,做更好的自己。


RPC

GitHub地址:

  1. https://github.com/EthanYan6/rpc_divide.git

为了方便大家查看,我将RPC相关的代码放在了GitHub上面,大家可以clone到本地进行查看。

历史文章导航:

RPC(一)

RPC(二)

RPC(三)

RPC(四)

RPC(五)

咱们前面已经将RPC消息数据构造好了,接下来呢,就可以通过网络在调用双方进行传递了。传递的方式常用的有两种,一种是TCP(传输控制协议),一种是HTTP。下面我们会逐一进行讲解。

1.RPC传输协议说明

1.1TCP

由于它的可靠性,TCP成为了最常用的方式。而且我们可以直接借助socket工具进行TCP开发。下面我们回顾一些TCP编程的相关知识:

通讯过程:客户端和服务端建立连接,期间涉及到三次握手和四次挥手。

TCP服务端编写

sock = socket.socket() # 创建一个套接字
sock.bind() # 绑定端口
sock.listen() # 监听连接
sock.accept() # 接受新连接
sock.close() # 关闭服务器套接字

TCP客户端编写:

sock = socket.socket() # 创建一个套接字
sock.connect() # 连接远程服务器
sock.recv() # 接收数据
sock.send() # 数据尽可能的发送
sock.sendall() # 数据完全发送
sock.close() # 关闭连接

1.2HTTP

大家应该有点疑惑,TCP处于传输层还好理解,可以用来实现RPC传输。HTTP可是在应用层的协议啊,用它来传输是不是有点大材小用,杀鸡用宰牛刀啊?暂且不考虑这个问题,我们只考虑能否实现的问题,HTTP基于TCP实现,而且HTTP协议已经实现了TCP的收发,它还是一个公共的标准,各种语言都提供了HTTP实现的工具,我们直接通过一些库使用就好了,最大的优点便在于此--方便。

具体怎么操作呢?我们可以将构造好的RPC消息数据嵌入到HTTP报文中的body部分,而对HTTP的path路径等都无需关心。如下:

  1. HTTP/1.0 POST /
  2. Content-Type: binary
  3. Content-Length:5096

  4. # 此处放置RPC消息数据

HTTP的通讯效率不如TCP高,所以并不常用。

2.客户端传输工具实现

下面我们实现一个RPC的传输协议,先实现客户端传输工具:

class Channel(object):
    """
    用于客户端建立网络连接
    """
    def __init__(self, host, port):
        """
        :param host: 服务器地址
        :param port: 服务器端口号
        """
        self.host = host
        self.port = port
    def get_connection(self):
        """
        获取连接对象
        :return: 与服务器通讯的socket
        """
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((self.host, self.port))
        return sock

3.服务端传输工具实现

class Server(object):
    """
    RPC服务器
    """
    def __init__(self, host, port, handlers):
        # 创建socket的工具对象
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # 设置socket,重用地址
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # 绑定地址
        self.host = host
        self.port = port
        sock.bind((self.host, self.port))
        self.sock = sock
        self.handlers = handlers
    def serve(self):
        """
        开启服务器运行,提供RPC服务
        :return:
        """
        # 1.开启服务器的监听,等待客户端的连接请求
        self.sock.listen(128)
        # 2.接受客户端的连接请求
        while True:
            client_sock, client_addr = self.sock.accept()
            print('与客户端%s建立了链接' % str(client_addr))
            # 3.交给ServerStub,完成客户端的具体的RPC调用请求
            stub = ServerStub(client_sock, self.handlers)
            try:
                while True:
                    stub.process()
            except EOFError:
                # 表示客户端关闭了连接:
                print('客户端关闭了连接')
                client_sock.close()

4.ClientStub实现

class ClientStub(object):
    """
    用来帮助客户端完成远程过程调用 RPC调用
    stub = ClientStub()
    stub.divide(200, 100)
    框架是通用的,如果想实现加法,stub.add()类似
    """
    def __init__(self, channel):
        self.channel = channel
        self.conn = self.channel.get_connection()
    # 提供一个方法供客户端进行调用
    def divide(self, num1, num2=1):
        # 将调用的参数打包成消息协议的数据
        proto = DivideProtocol()
        args = proto.args_encode(num1, num2)
        # 将消息数据通过网络发送给服务器
        self.conn.sendall(args)
        # 接收服务器返回的返回值消息数据,并进行解析
        result = proto.result_decode(self.conn)
        # 将结果值(正常float 或 异常InvalidOperation)返回给客户端
        if isinstance(result, float):
            # 正常
            return result
        else:
            # 异常
            raise result
    def add(self):
        pass

5.ServerStub实现

class ServerStub(object):
    """ 
    帮助服务端完成远端过程调用 
    """
    def __init__(self, connection, handlers):
        """
        :param connection: 与客户端的连接
        :param handlers: 真正本地被调用的方法(函数  过程)
        class Handlers:
            @staticmethod
            def divide(num1, num2=1):
                pass
            @staticmethod
            def add():
                pass
        """
        self.conn = connection
        self.method_proto = MethodProtocol(self.conn)
        self.process_map = {
            'divide': self._process_divide
        }
        self.handlers = handlers
    def process(self):
        """
        当服务端接受了一个客户端的连接,建立好连接后,完成远端调用处理
        :return:
        """
        # 1.接收消息数据,并解析方法的名字
        name = self.method_proto.get_method_name()
        # 2.根据解析获得的方法(过程)名,调用响应的过程协议,接收并解析消息数据
        # self.process_map[name]()
        _process = self.process_map[name]
        _process()
    def _process_divide(self):
        """
        处理除法过程调用
        :return:
        """
        # 1.创建用于除法过程调用参数协议数据解析的工具
        proto = DivideProtocol()
        # 2.解析调用参数消息数据
        args = proto.args_decode(self.conn)
        # args = {"num1": xxx, "num2": xxx}
        # 3.进行除法的本地过程调用
        # 将本地调用过程的返回值(包括可能的异常)打包成消息协议数据,通过网络返回给客户端
        try:
            val = self.handlers.divide(**args)
        except InvalidOpreation as e:
            ret_message = proto.result_encode(e)
        else:
            ret_message = proto.result_encode(val)
        self.conn.sendall(ret_message)
    # def _process_add(self):
    #     pass

6.服务器与客户端调用案例实现

我们模拟一下服务器与本地调用。

6.1server

创建一个 server.py文件:

from services import InvalidOpreation
from services import Server
class Handlers:
    @staticmethod
    def divide(num1, num2=1):
        """
        除法
        :param num1: int
        :param num2: int
        :return: float
        """
        # 增加判断操作,抛出自定义异常
        if num2 == 0:
            raise InvalidOpreation()
        val = num1 / num2
        return val
if __name__ == '__main__':
    # 开启服务器
    _server = Server('127.0.0.1', 8000, Handlers)
    _server.serve()

6.2client

创建一个 client.py文件:

from services import ClientStub
from services import Channel
from services import InvalidOpreation
# 创建与服务器的连接
channel = Channel('127.0.0.1', '8000')
# 创建用于RPC调用的工具
stub = ClientStub(channel)
# 进行调用
try:
    val = stub.divide(200,0)
except InvalidOpreation as e:
    print(e.message)
else:
    print(val)

7.多线程RPC服务器实现

上面就将RPC完整实现了,但是案例有个小缺陷,就是每次服务器只能处理一个客户端,一个处理完成之后才开始处理下一个客户端。为了提升服务器的性能呢,我们可以将其做成多线程版本的RPC服务器。

class ThreadServer(object):
    """
    多线程RPC服务器
    """
    def __init__(self, host, port, handlers):
        # 创建socket的工具对象
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # 设置socket,重用地址
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # 绑定地址
        self.host = host
        self.port = port
        sock.bind((self.host, self.port))
        self.sock = sock
        self.handlers = handlers
    def serve(self):
        """
        开启服务器运行,提供RPC服务
        :return:
        """
        # 1.开启服务器的监听,等待客户端的连接请求
        self.sock.listen(128)
        print('服务器开始监听')
        # 2.接受客户端的连接请求
        while True:
            client_sock, client_addr = self.sock.accept()
            print('与客户端%s建立了链接' % str(client_addr))
            # 创建子线程处理这个客户端
            t = threading.Thread(target=self.handle, args=(client_sock,))
            # 开启子线程执行
            t.start()
    def handle(self, client_sock):
        """
        子线程调用的方法,用来处理一个客户端的请求
        :return: 
        """
        # 交给ServerStub,完成客户端的具体的RPC调用请求
        stub = ServerStub(client_sock, self.handlers)
        try:
            while True:
                stub.process()
        except EOFError:
            # 表示客户端关闭了连接:
            print('客户端关闭了连接')
            client_sock.close()

到此为止,RPC的简单实现就完成了。下一次文章将开始讲解分布式RPC的相关内容,尽情期待......

相关文章
|
网络协议 网络架构
01RPC - RPC介绍
01RPC - RPC介绍
64 0
|
网络协议
|
3月前
|
负载均衡 Java API
什么是RPC
【9月更文挑战第8天】什么是RPC
142 3
|
7月前
关于RPC
关于RPC
|
Dubbo Java 应用服务中间件
为什么大厂用的都是RPC服务
在很久以前,笔者刚毕业开始工作那会儿,对于企业开发的模式一直以为HTTP接口开发,也就是我们常说的RESTful风格的服务接口。的确,对于在接口不多、系统与系统交互较少的情况下,解决信息孤岛初期常使用的一种通信手段;优点就是简单、直接、开发方便。
218 1
|
JSON 移动开发 网络协议
|
负载均衡
为什么使用RPC
Remote Process Call 远程过程调用。 你对rpc的理解是什么?客户端调用服务端时,就像调用本地函数一样,直接使用并得到结果。
112 0