RPC 服务器之【多进程描述符传递】高阶模型

简介: 今天老师要给大家介绍一个比较特别的 RPC 服务器模型,这个模型不同于 Nginx、不同于 Redis、不同于 Apache、不同于 Tornado、不同于 Netty,它的原型是 Node Cluster 的多进程并发模型。

今天老师要给大家介绍一个比较特别的 RPC 服务器模型,这个模型不同于 Nginx、不同于 Redis、不同于 Apache、不同于 Tornado、不同于 Netty,它的原型是 Node Cluster 的多进程并发模型。

Nginx 并发模型

我们知道 Nginx 的并发模型是一个多进程并发模型,它的 Master 进程在绑定监听地址端口后 fork 出了多个 Slave 进程共同竞争处理这个服务端套接字接收到的很多客户端连接。

 

这多个 Slave 进程会共享同一个处于操作系统内核态的套接字队列,操作系统的网络模块在处理完三次握手后就会将套接字塞进这个队列。这是一个生产者消费者模型,生产者是操作系统的网络模块,消费者是多个 Slave 进程,队列中的对象是客户端套接字。

这种模型在负载均衡上有一个缺点,那就是套接字分配不均匀,形成了类似于贫富分化的局面,也就是「闲者愈闲,忙者愈忙」的状态。这是因为当多个进程竞争同一个套接字队列时,操作系统采用了 LIFO 的策略,最后一个来 accept 的进程最优先拿到 套接字。越是繁忙的进程越是有更多的机会调用 accept,它能拿到的套接字也就越多。

 

Node Cluster 并发模型

Node Cluster 为了解决负载均衡问题,它采用了不同的策略。它也是多进程并发模型,Master 进程会 fork 出多个子进程来处理客户端套接字。但是不存在竞争问题,因为负责 accept 套接字的只能是 Master 进程,Slave 进程只负责处理客户端套接字请求。那就存在一个问题,Master 进程拿到的客户端套接字如何传递给 Slave 进程。

 

这时,神奇的 sendmsg 登场了。它是操作系统提供的系统调用,可以在不同的进程之间传递文件描述符。sendmsg 会搭乘一个特殊的「管道」将 Master 进程的套接字描述符传递到 Slave 进程,Slave 进程通过 recvmsg 系统调用从这个「管道」中将描述符取出来。这个「管道」比较特殊,它是 Unix 域套接字。普通的套接字可以跨机器传输消息,Unix 域套接字只能在同一个机器的不同进程之间传递消息。同管道一样,Unix 域套接字也分为有名套接字和无名套接字,有名套接字会在文件系统指定一个路径名,无关进程之间都可以通过这个路径来访问 Unix 域套接字。而无名套接字一般用于父子进程之间,父进程会通过 socketpair 调用来创建套接字,然后 fork 出来子进程,这样子进程也会同时持有这个套接字的引用。后续父子进程就可以通过这个套接字互相通信。

 

注意这里的传递描述符,本质上不是传递,而是复制。父进程的描述符并不会在 sendmsg 自动关闭自动消失,子进程收到的描述符和父进程的描述符也不是同一个整数值。但是父子进程的描述符都会指向同一个内核套接字对象。

有了描述符的传递能力,父进程就可以将 accept 到的客户端套接字轮流传递给多个 Slave 进程,负载均衡的目标就可以顺利实现了。

接下来我们就是用 Python 代码来撸一遍 Node Cluster 的并发模型。因为 sendmsg 和 recvmsg 方法到了 Python3.5 才内置进来,所以下面的代码需要使用 Python3.5+才可以运行。

我们看 sendmsg 方法的定义

socket.sendmsg(buffers[, ancdata[, flags[, address]]])

 

我们只需要关心第二个参数 ancdata,描述符是通过ancdata 参数传递的,它的意思是 「辅助数据」,而 buffers 表示需要传递的消息内容,因为消息内容这里没有意义,所以这个字段可以任意填写,但是必须要有内容,如果没有内容,sendmsg 方法就是一个空调用。

import socket, struct

def send_fds(sock, fd):
    return sock.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack("i", fd))])

# ancdata 参数是一个三元组的列表,三元组的第一个参数表示网络协议栈级别 level,第二个参数表示辅助数据的类型 type,第三个参数才是携带的数据,level=SOL_SOCKET 表示传递的数据处于 TCP 协议层级,type=SCM_RIGHTS 就表示携带的数据是文件描述符。我们传递的描述符 fd 是一个整数,需要使用 struct 包将它序列化成二进制。

 

再看 recvmsg 方法的定义

msg, ancdata, flags, addr = socket.recvmsg(bufsize[, ancbufsize[, flags]])

 

同样,我们只需要关心返回的 ancdata 数据,它里面包含了我们需要的文件描述符。但是需要提供消息体的长度和辅助数据的长度参数。辅助数据的长度比较特殊,需要使用 CMSG_LEN 方法来计算,因为辅助数据里面还有我们看不到的额外的头部信息。

bufsize = 1  # 消息内容的长度
ancbufsize = socket.CMSG_LEN(struct.calcsize('i'))  # 辅助数据的长度
msg, ancdata, flags, addr = socket.recvmsg(bufsize, ancbufsize) # 收取消息
level, type, fd_bytes = ancdata[0] # 取第一个元祖,注意发送消息时我们传递的是一个三元组的列表
fd = struct.unpack('i', fd_bytes) # 反序列化

代码实现

下面我来献上完整的服务器代码,为了简单起见,我们在 Slave 进程中处理 RPC 请求使用同步模型。

# coding: utf
# sendmsg recvmsg python3.5+才可以支持

import os
import json
import struct
import socket


def handle_conn(conn, addr, handlers):
    print(addr, "comes")
    while True:
        # 简单起见,这里就没有使用循环读取了
        length_prefix = conn.recv(4)
        if not length_prefix:
            print(addr, "bye")
            conn.close()
            break  # 关闭连接,继续处理下一个连接
        length, = struct.unpack("I", length_prefix)
        body = conn.recv(length)
        request = json.loads(body)
        in_ = request['in']
        params = request['params']
        print(in_, params)
        handler = handlers[in_]
        handler(conn, params)


def loop_slave(pr, handlers):
    while True:
        bufsize = 1
        ancsize = socket.CMSG_LEN(struct.calcsize('i'))
        msg, ancdata, flags, addr = pr.recvmsg(bufsize, ancsize)
        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
        fd = struct.unpack('i', cmsg_data)[0]
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd)
        handle_conn(sock, sock.getpeername(), handlers)


def ping(conn, params):
    send_result(conn, "pong", params)


def send_result(conn, out, result):
    response = json.dumps({"out": out, "result": result}).encode('utf-8')
    length_prefix = struct.pack("I", len(response))
    conn.sendall(length_prefix)
    conn.sendall(response)


def loop_master(serv_sock, pws):
    idx = 0
    while True:
        sock, addr = serv_sock.accept()
        pw = pws[idx % len(pws)]
        # 消息数据,whatever
        msg = [b'x']
        # 辅助数据,携带描述符
        ancdata = [(
            socket.SOL_SOCKET,
            socket.SCM_RIGHTS,
            struct.pack('i', sock.fileno()))]
        pw.sendmsg(msg, ancdata)
        sock.close()  # 关闭引用
        idx += 1


def prefork(serv_sock, n):
    pws = []
    for i in range(n):
        # 开辟父子进程通信「管道」
        pr, pw = socket.socketpair()
        pid = os.fork()
        if pid < 0:  # fork error
            return pws
        if pid > 0:
            # 父进程
            pr.close()  # 父进程不用读
            pws.append(pw)
            continue
        if pid == 0:
            # 子进程
            serv_sock.close()  # 关闭引用
            pw.close()  # 子进程不用写
            return pr
    return pws


if __name__ == '__main__':
    serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    serv_sock.bind(("localhost", 8080))
    serv_sock.listen(1)
    pws_or_pr = prefork(serv_sock, 10)
    if hasattr(pws_or_pr, '__len__'):
        if pws_or_pr:
            loop_master(serv_sock, pws_or_pr)
        else:
            # fork 全部失败,没有子进程,Game Over
            serv_sock.close()
    else:
        handlers = {
            "ping": ping
        }
        loop_slave(pws_or_pr, handlers)

 

父进程使用 fork 调用创建了多个子进程,然后又使用 socketpair 调用为每一个子进程都创建一个无名套接字用来传递描述符。父进程使用 roundrobin 策略平均分配接收到的客户端套接字。子进程接收到的是一个描述符整数,需要将描述符包装成套接字对象后方可读写。打印对比发送和接收到的描述符,你会发现它们俩的值并不相同,这是因为 sendmsg 将描述符发送到内核后,内核给描述符指向的内核套接字又重新分配了一个新的描述符对象。

欢迎工作一到五年的Java工程师朋友们加入Java架构开发:744677563

本群提供免费的学习指导 架构资料 以及免费的解答

不懂得问题都可以在本群提出来 之后还会有职业生涯规划以及面试指导

相关文章
|
2月前
|
数据可视化 Linux 网络安全
如何使用服务器训练模型
本文介绍了如何使用服务器训练模型,包括获取服务器、访问服务器、上传文件、配置环境、训练模型和下载模型等步骤。适合没有GPU或不熟悉Linux服务器的用户。通过MobaXterm工具连接服务器,使用Conda管理环境,确保训练过程顺利进行。
116 0
如何使用服务器训练模型
|
7月前
|
存储 弹性计算 安全
ECS的安全责任共担模型
云服务器ECS的云上安全性是阿里云和客户的共同责任。本文介绍云服务器ECS(Elastic Compute Service)与客户在安全性方面各自应该承担的责任。
|
2月前
|
存储 PyTorch API
NVIDIA Triton系列09-为服务器添加模型
本文介绍了如何为NVIDIA Triton模型仓库添加新模型。通过示例模型`inception_graphdef`的配置文件`config.pbtxt`,详细解释了模型名称、平台/后端名称、模型执行策略、最大批量值、输入输出节点及版本策略等配置项。内容涵盖了模型的基本要素和配置细节,帮助读者更好地理解和使用Triton服务器。
39 0
|
2月前
|
机器学习/深度学习 人工智能 并行计算
StableDiffusion-01本地服务器部署服务 10分钟上手 底显存 中等显存机器 加载模型测试效果 附带安装指令 多显卡 2070Super 8GB*2
StableDiffusion-01本地服务器部署服务 10分钟上手 底显存 中等显存机器 加载模型测试效果 附带安装指令 多显卡 2070Super 8GB*2
44 0
|
4月前
|
运维 算法 调度
深入理解操作系统:进程调度与优先级自动化运维:使用Ansible实现服务器集群管理
【8月更文挑战第27天】在操作系统的众多奥秘中,进程调度无疑是一个既简单又复杂的主题。它就像是交响乐团中的指挥,协调着每一个音符,确保乐曲和谐而有序地进行。本文将带领读者走进进程调度的世界,探索其背后的原理和实现,同时通过代码示例揭示其精妙之处。让我们一起揭开进程调度的神秘面纱,理解它在操作系统中的重要性。
|
3月前
|
网络协议 数据处理 C语言
利用C语言基于poll实现TCP回声服务器的多路复用模型
此代码仅为示例,展示了如何基于 `poll`实现多路复用的TCP回声服务器的基本框架。在实际应用中,你可能需要对其进行扩展或修改,以满足具体的需求。
95 0
|
4月前
|
开发工具 git iOS开发
服务器配置Huggingface并git clone模型和文件
该博客提供了在服务器上配置Huggingface、安装必要的工具(如git-lfs和huggingface_hub库)、登录Huggingface以及使用git clone命令克隆模型和文件的详细步骤。
396 1
|
6月前
|
机器学习/深度学习 人工智能 网络安全
人工智能平台PAI产品使用合集之在本地可以成功进入模型流,但在服务器上无法进入,是什么原因
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
|
6月前
|
异构计算 弹性计算 并行计算
|
6月前
|
数据可视化 API 开发工具
详细解读cesi+supervisor可视化集中管理服务器节点进程
详细解读cesi+supervisor可视化集中管理服务器节点进程
99 0