每日分享
Nothing is so fatiguing as the eternal hanging on of an uncompleted task.
没有哪一件事情比藏在心里的一件无法完成的事情更劳累的。
小闫语录:
心头积攒的琐事是阻碍你前进的罪魁祸首,尽管有些事、有些人难以释怀更难放下,但是你的心又有谁懂?放过自己,成全自己,做更好的自己。
RPC
GitHub地址:
https://github.com/EthanYan6/rpc_divide.git
为了方便大家查看,我将RPC相关的代码放在了GitHub上面,大家可以clone到本地进行查看。
历史文章导航:
咱们前面已经将RPC消息数据构造好了,接下来呢,就可以通过网络在调用双方进行传递了。传递的方式常用的有两种,一种是TCP(传输控制协议),一种是HTTP。下面我们会逐一进行讲解。
1.RPC传输协议说明
1.1TCP
由于它的可靠性,TCP成为了最常用的方式。而且我们可以直接借助socket工具进行TCP开发。下面我们回顾一些TCP编程的相关知识:
通讯过程:客户端和服务端建立连接,期间涉及到三次握手和四次挥手。
TCP服务端编写:
sock = socket.socket() # 创建一个套接字 sock.bind() # 绑定端口 sock.listen() # 监听连接 sock.accept() # 接受新连接 sock.close() # 关闭服务器套接字
TCP客户端编写:
sock = socket.socket() # 创建一个套接字 sock.connect() # 连接远程服务器 sock.recv() # 接收数据 sock.send() # 数据尽可能的发送 sock.sendall() # 数据完全发送 sock.close() # 关闭连接
1.2HTTP
大家应该有点疑惑,TCP处于传输层还好理解,可以用来实现RPC传输。HTTP可是在应用层的协议啊,用它来传输是不是有点大材小用,杀鸡用宰牛刀啊?暂且不考虑这个问题,我们只考虑能否实现的问题,HTTP基于TCP实现,而且HTTP协议已经实现了TCP的收发,它还是一个公共的标准,各种语言都提供了HTTP实现的工具,我们直接通过一些库使用就好了,最大的优点便在于此--方便。
具体怎么操作呢?我们可以将构造好的RPC消息数据嵌入到HTTP报文中的body部分,而对HTTP的path路径等都无需关心。如下:
HTTP/1.0 POST /
Content-Type: binary
Content-Length:5096
# 此处放置RPC消息数据
HTTP的通讯效率不如TCP高,所以并不常用。
2.客户端传输工具实现
下面我们实现一个RPC的传输协议,先实现客户端传输工具:
class Channel(object): """ 用于客户端建立网络连接 """ def __init__(self, host, port): """ :param host: 服务器地址 :param port: 服务器端口号 """ self.host = host self.port = port def get_connection(self): """ 获取连接对象 :return: 与服务器通讯的socket """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((self.host, self.port)) return sock
3.服务端传输工具实现
class Server(object): """ RPC服务器 """ def __init__(self, host, port, handlers): # 创建socket的工具对象 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 设置socket,重用地址 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 绑定地址 self.host = host self.port = port sock.bind((self.host, self.port)) self.sock = sock self.handlers = handlers def serve(self): """ 开启服务器运行,提供RPC服务 :return: """ # 1.开启服务器的监听,等待客户端的连接请求 self.sock.listen(128) # 2.接受客户端的连接请求 while True: client_sock, client_addr = self.sock.accept() print('与客户端%s建立了链接' % str(client_addr)) # 3.交给ServerStub,完成客户端的具体的RPC调用请求 stub = ServerStub(client_sock, self.handlers) try: while True: stub.process() except EOFError: # 表示客户端关闭了连接: print('客户端关闭了连接') client_sock.close()
4.ClientStub实现
class ClientStub(object): """ 用来帮助客户端完成远程过程调用 RPC调用 stub = ClientStub() stub.divide(200, 100) 框架是通用的,如果想实现加法,stub.add()类似 """ def __init__(self, channel): self.channel = channel self.conn = self.channel.get_connection() # 提供一个方法供客户端进行调用 def divide(self, num1, num2=1): # 将调用的参数打包成消息协议的数据 proto = DivideProtocol() args = proto.args_encode(num1, num2) # 将消息数据通过网络发送给服务器 self.conn.sendall(args) # 接收服务器返回的返回值消息数据,并进行解析 result = proto.result_decode(self.conn) # 将结果值(正常float 或 异常InvalidOperation)返回给客户端 if isinstance(result, float): # 正常 return result else: # 异常 raise result def add(self): pass
5.ServerStub实现
class ServerStub(object): """ 帮助服务端完成远端过程调用 """ def __init__(self, connection, handlers): """ :param connection: 与客户端的连接 :param handlers: 真正本地被调用的方法(函数 过程) class Handlers: @staticmethod def divide(num1, num2=1): pass @staticmethod def add(): pass """ self.conn = connection self.method_proto = MethodProtocol(self.conn) self.process_map = { 'divide': self._process_divide } self.handlers = handlers def process(self): """ 当服务端接受了一个客户端的连接,建立好连接后,完成远端调用处理 :return: """ # 1.接收消息数据,并解析方法的名字 name = self.method_proto.get_method_name() # 2.根据解析获得的方法(过程)名,调用响应的过程协议,接收并解析消息数据 # self.process_map[name]() _process = self.process_map[name] _process() def _process_divide(self): """ 处理除法过程调用 :return: """ # 1.创建用于除法过程调用参数协议数据解析的工具 proto = DivideProtocol() # 2.解析调用参数消息数据 args = proto.args_decode(self.conn) # args = {"num1": xxx, "num2": xxx} # 3.进行除法的本地过程调用 # 将本地调用过程的返回值(包括可能的异常)打包成消息协议数据,通过网络返回给客户端 try: val = self.handlers.divide(**args) except InvalidOpreation as e: ret_message = proto.result_encode(e) else: ret_message = proto.result_encode(val) self.conn.sendall(ret_message) # def _process_add(self): # pass
6.服务器与客户端调用案例实现
我们模拟一下服务器与本地调用。
6.1server
创建一个 server.py
文件:
from services import InvalidOpreation from services import Server class Handlers: @staticmethod def divide(num1, num2=1): """ 除法 :param num1: int :param num2: int :return: float """ # 增加判断操作,抛出自定义异常 if num2 == 0: raise InvalidOpreation() val = num1 / num2 return val if __name__ == '__main__': # 开启服务器 _server = Server('127.0.0.1', 8000, Handlers) _server.serve()
6.2client
创建一个 client.py
文件:
from services import ClientStub from services import Channel from services import InvalidOpreation # 创建与服务器的连接 channel = Channel('127.0.0.1', '8000') # 创建用于RPC调用的工具 stub = ClientStub(channel) # 进行调用 try: val = stub.divide(200,0) except InvalidOpreation as e: print(e.message) else: print(val)
7.多线程RPC服务器实现
上面就将RPC完整实现了,但是案例有个小缺陷,就是每次服务器只能处理一个客户端,一个处理完成之后才开始处理下一个客户端。为了提升服务器的性能呢,我们可以将其做成多线程版本的RPC服务器。
class ThreadServer(object): """ 多线程RPC服务器 """ def __init__(self, host, port, handlers): # 创建socket的工具对象 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 设置socket,重用地址 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 绑定地址 self.host = host self.port = port sock.bind((self.host, self.port)) self.sock = sock self.handlers = handlers def serve(self): """ 开启服务器运行,提供RPC服务 :return: """ # 1.开启服务器的监听,等待客户端的连接请求 self.sock.listen(128) print('服务器开始监听') # 2.接受客户端的连接请求 while True: client_sock, client_addr = self.sock.accept() print('与客户端%s建立了链接' % str(client_addr)) # 创建子线程处理这个客户端 t = threading.Thread(target=self.handle, args=(client_sock,)) # 开启子线程执行 t.start() def handle(self, client_sock): """ 子线程调用的方法,用来处理一个客户端的请求 :return: """ # 交给ServerStub,完成客户端的具体的RPC调用请求 stub = ServerStub(client_sock, self.handlers) try: while True: stub.process() except EOFError: # 表示客户端关闭了连接: print('客户端关闭了连接') client_sock.close()
到此为止,RPC的简单实现就完成了。下一次文章将开始讲解分布式RPC的相关内容,尽情期待......