楔子
之前在介绍 TCP 协议的时候,提到了 Socket,它的中文翻译是套接字。我们说 Socket 是对 TCP/IP 协议的一个封装,可以让我们更方便地使用 TCP/IP 协议,而不用关注背后的原理。并且我们经常使用的 Web 框架,本质上也是一个 Socket。
那么本篇文章我们就以 Python 为例,好好地聊一聊 Socket,而且你也一定知道 IO 多路复用,比如 select, poll, epoll,但它们之间的区别和用法你是否了如指掌呢?下面就带着这些问题,开始本文的内容吧。
什么是 Socket
上面说了,Socket 是操作系统对 TCP/IP 网络协议栈的封装,并提供了一系列的接口,我们通过这些接口可以实现网络通信,而不用关注网络协议的具体细节。
按照现有的网络模型,Socket 并不属于其中的任何一层,但我们可以简单地将 Socket 理解为传输层之上的抽象层,负责连接应用层和传输层。Socket 提供了大量的 API,基于这些 API 我们可以非常方便地使用网络协议栈,在不同主机间进行网络通信。
Linux 一切皆文件,Socket 也不例外,它被称为套接字文件,在使用上和普通文件是类似的。
如何使用 Socket 编程
socket 是什么我们已经知道了,下面来看看如何使用 socket 进行编程。
- 服务端初始化 socket,此时会得到「主动套接字」;
- 服务端调用 bind,将套接字绑定在某个 IP 和端口上;
- 服务端调用 listen 进行监听,此时「主动套接字」会变成「监听套接字」;
- 服务端调用 accept,等待客户端连接,此时服务端会阻塞在这里(调用的是阻塞的 API);
- 客户端同样初始化 socket,得到主动套接字;
- 客户端调用主动套接字的 connect,向服务器端发起连接请求,如果连接成功,后续客户端就用这个主动套接字进行数据的传输;
- 当客户端来连接时,那么服务端的 accept 将不再阻塞,并返回「已连接套接字」,后续服务端便用这个已连接套接字和客户端进行数据传输;
- 如果客户端断开连接,那么服务端 read 读取数据的时候就会出现 EOF,知道客户端断开连接了。待数据处理完毕后,服务端也要调用 close 来关闭连接;
我们使用 Python 来演示一下这个过程,首先是服务端:
import socket # socket.socket() 会返回一个「主动套接字」 server = socket.socket( # 表示使用 IPv4,如果是 socket.AF_INET6 # 则表示使用 IPv6 socket.AF_INET, # 表示建立 TCP 连接,如果是 socket.SOCK_DGRAM # 则表示建立 UDP 连接 socket.SOCK_STREAM ) # 当然这两个参数也可以不传,因为默认就是它 # 设置套接字属性,这里让端口释放后立刻就能再次使用 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) # 将「主动套接字」绑定在某个 IP 和端口上 server.bind(("localhost", 12345)) # 监听,此时「主动套接字」会变成「监听套接字」 # 里面的参数表示 backlog,代表的含义后面说 server.listen(5) # 调用 accept,等待客户端连接,此时会阻塞在这里 # 如果客户端连接到来,那么会返回「已连接套接字」,也就是这里的 conn # 至于 addr 则是一个元组,保存了客户端连接的信息(IP 和端口) conn, addr = server.accept() # 下面我们通过「已连接套接字」conn 和客户端进行消息的收发 # 收消息使用 recv、发消息使用 send,和 read、write 本质是一样的 while True: msg = conn.recv(1024) # 当客户端断开连接时,msg 会收到一个空字节串 if not msg: print("客户端已经断开连接") conn.close() break print("客户端发来消息:", msg.decode("utf-8")) # 然后我们加点内容之后,再给客户端发过去 conn.send("服务端收到, 你发的消息是: ".encode("utf-8") + msg)
接下来编写客户端:
import socket # 返回主动套接字 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 连接服务端 client.connect(("localhost", 12345)) while True: # 发送消息 data = input("请输入内容: ") if data.strip().lower() in ("q", "quit", "exit"): client.close() print("Bye~~~") break client.send(data.encode("utf-8")) print(client.recv(1024).decode("utf-8"))
启动服务端和客户端进行测试:
还是比较简单的,当然我们这里的服务端每次只能和一个客户端通信,如果想服务多个客户端的话,那么需要为已连接套接字单独开一个线程和客户端进行通信,然后主线程继续执行 accept 等待下一个客户端。
下面来编写一下多线程的版本,这里只需要编写服务端即可,客户端代码不变。
import socket import threading server = socket.socket() server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) server.bind(("localhost", 12345)) server.listen(5) def handle_message(conn, addr): while True: msg = conn.recv(1024) if not msg: print(f"客户端(ip: {addr[0]}, port: {addr[1]}) 已经断开连接") conn.close() break print(f"客户端(ip: {addr[0]}, port: {addr[1]}) 发来消息:", msg.decode("utf-8")) conn.send("服务端收到, 你发的消息是: ".encode("utf-8") + msg) while True: conn, addr = server.accept() threading.Thread( target=handle_message, args=(conn, addr) ).start()
代码很简单,就是把已连接套接字和客户端的通信逻辑写在了单独的函数中,每来一个客户端,服务端都会启动一个新的线程去执行该函数,然后继续监听,等待下一个客户端连接到来。
然后客户端代码不变,我们启动三个客户端去和服务端通信,看看结果如何。
结果一切正常,当然我们这里的代码比较简单,就是普通的消息收发。你也可以实现一个更复杂的功能,比如文件下载器,把服务端当成网盘,支持客户端上传和下载文件,并不难。
socketserver
另外 Python 标准库还提供了一个模块叫 socketserver,它是 socket 的更高级封装,可以简化服务端的代码逻辑。并且 socketserver 的内部会自动使用多线程,服务多个客户端。
import socketserver # 自定义一个类,必须继承 BaseRequestHandler class ServiceHandler(socketserver.BaseRequestHandler): """ 内部提供了三个重要属性 self.request: 已连接套接字 conn self.client_address: 客户端信息 addr self.server: 服务端实例(一会我们会创建它) 然后我们必须要实现 handle 方法,处理客户端连接时会自动调用 此外还有两个方法,分别是 setup 和 finish,实不实现均可 """ def setup(self) -> None: """在执行 handle 之前调用,用于提前做一些连接相关的设置""" def finish(self) -> None: """在执行 handle 之后调用,用于资源释放等等""" self.request.close() def handle(self) -> None: """ 处理客户端连接 这里的 self.request 就相当于之前的 conn """ client_ip, client_port = self.client_address while True: msg = self.request.recv(1024) if not msg: print(f"客户端(ip: {client_ip}, port: {client_port}) 已经断开连接") self.request.close() break print(f"客户端(ip: {client_ip}, port: {client_port}) 发来消息:", msg.decode("utf-8")) self.request.send("服务端收到, 你发的消息是: ".encode("utf-8") + msg) # 绑定 IP 和端口,以及用于处理的 Handler # 这里的 ThreadingTCPServer 实例就是 ServiceHandler 里面的 self.server server = socketserver.ThreadingTCPServer( ("localhost", 12346), ServiceHandler ) # 开启无限循环,监听连接 server.serve_forever() # 如果关闭监听,那么调用 server.shutdown()
可以测试一下,结果没有问题。并且当前支持多个客户端连接,每来一个客户端就会实例化一个 ServiceHandler,并开启多线程执行 handle 方法,与客户端通信。
以上我们就简单提了一下 socketserver,了解一下即可。
listen 方法的意义?
在创建完 socket 之后,我们调用了 listent 方法,该方法接收一个 backlog 参数。
server = socket.socket() ... server.listen(5)
那么该方法的意义是什么呢?我们调用时传的数字 5 又有什么作用呢?
根据上面的 socket 流程图,我们可以得知在三次握手的时候,Linux 内核会维护两个队列:
- 半连接队列,也称 SYN 队列;
- 全连接队列,也称 Accept 队列;
服务端收到客户端发起的 SYN 请求后,内核会把该连接存储到半连接队列,并向客户端响应 SYN+ACK。接着客户端会回复 ACK,服务端收到后,内核会从半连接队列里面将连接取出,然后添加到全连接队列,等待进程调用 accept 函数时把连接取出来。
所以整个过程如下:
- 1. 客户端发送 SYN 报文;
- 2. 服务端将连接插入到半连接队列;
- 3. 服务端向客户端返回 SYN + ACK;
- 4. 客户端收到之后再向服务端返回 ACK;
- 5. 服务端将连接从半连接队列中取出,移入全连接队列;
- 6. 进程调用 accept 函数,从全连接队列中取出已完成连接建立的 socket 连接;
因此半连接队列(SYN 队列)用来存储 SYN_RECV 状态、未完成建立的连接;全连接队列(Accept 队列)用来存储 ESTABLISH 状态、已完成建立的连接。
而我们也可以很容易得出结论,客户端返回成功是在第二次握手之后,服务端 accept 成功是在三次握手之后,因为调用 accept 就相当于从全连接队列中取出连接和客户端进行通信。
那么如何查看 SYN 队列和 Accept 队列的大小呢?
- net.ipv4.tcp_max_syn_backlog:查看半连接队列长度;
- net.core.somaxconn:查看全连接队列的长度;
Linux 一切皆文件,如果想要修改队列大小的话,直接修改相应的文件即可。当然准确来说:
- max(64, tcp_max_syn_backlog) 才是半连接队列的长度;
- min(backlog, somaxconn) 才是全连接队列的长度,这里的 backlog 就是我们编写 socket 代码时,在 listen 方法里面指定的值。我们之前指定了 5,那么全连接队列的长度就是 5;
但是在服务端并发处理大量请求时,如果 TCP Accpet 队列过小,或者应用程序调用 accept 方法不及时,就会造成 Accpet 队列已满。这时后续的连接就会被丢弃,从而导致服务端请求数量上不去。
以上就是 listen 方法存在的意义,它接收一个 backlog 参数。如果觉得服务端支持的并发量不够,那么可以增大 backlog 的值。
非阻塞 I/O
先回顾一下 socket 模型:
但是注意:我们说在 listen() 这一步,会将主动套接字转化为监听套接字,但此时的监听套接字的类型是阻塞的。阻塞类型的监听套接字在调用 accept() 方法时,如果没有客户端来连接的话,就会一直处于阻塞状态,那么此时主线程就没法干其它事情了。
所以要设置为非阻塞,而非阻塞的监听套接字在调用 accept() 时,如果没有客户端来连接,那么主线程不会傻傻地等待,而是会直接返回,然后去做其它的事情。
类似的,我们在创建已连接套接字的时候默认也是阻塞的,阻塞类型的已连接套接字在调用 send() 和 recv() 的时候也会处于阻塞状态。比如当客户端一直不发数据的时候,已连接套接字就会一直阻塞在 recv() 这一步。如果是非阻塞类型的已连接套接字,那么当调用 recv() 但却收不到数据时,也不用处于阻塞状态,同样可以直接返回去做其它事情。
import socket server = socket.socket() server.bind(("localhost", 12345)) # 调用 setblocking 方法,传入 False # 表示将监听套接字和已连接套接字的类型设置为非阻塞 server.setblocking(False) server.listen(5) while True: try: # 非阻塞的监听套接字调用 accept() 时 # 如果发现没有客户端连接,则会立刻抛出 BlockingIOError # 因此这里写了个死循环 conn, addr = server.accept() except BlockingIOError: pass else: break while True: try: # 同理,非阻塞的已连接套接字在调用 recv() 时 # 如果发现客户端没有发数据,那么同样会报错 msg = conn.recv(1024) except BlockingIOError: pass else: print(msg.decode("utf-8")) conn.send(b"data from server")
很明显,虽然上面的代码在运行的时候正常,但存在两个问题:
1)虽然 accept() 不阻塞了,在没有客户端连接时主线程可以去做其它事情,但如果后续有客户端连接,主线程要如何得知呢?因此必须要有一种机制,能够继续在监听套接字上等待后续连接请求,并在请求到来时通知主线程。
我们上面的做法是写了一个死循环,但很明显这是没有意义的,这种做法还不如使用阻塞的套接字。
2)send() / recv() 不阻塞了,相当于 I/O 读写流程不再是阻塞的,读写方法都会瞬间完成并返回,也就是说它会采用能读多少就读多少、能写多少就写多少的策略来执行 I/O 操作,这显然更符合我们对性能的追求。
但显然对于非阻塞套接字而言,会面临一个问题,那就是当我们执行读取操作时,有可能只读了一部分数据,剩余的数据客户端还没发过来,那么这些数据何时可读呢?同理写数据也是这种情况,当缓冲区满了,而我们的数据还没有写完,那么剩下的数据又何时可写呢?因此同样要有一种机制,能够在主线程做别的事情的时候继续监听已连接套接字,并且在有数据可读写的时候通知主线程。
这样才能保证主线程既不会像基本 IO 模型一样,一直在阻塞点等待,也不会无法处理实际到达的客户端连接请求和可读写的数据,而上面所提到的机制便是 I/O 多路复用。
I/O 多路复用
I/O 多路复用机制是指一个线程处理多个 IO 流,也就是我们经常听到的 select/poll/epoll,而 Linux 默认采用的是 epoll。
简单来说,在只运行单线程的情况下,该机制允许内核中同时存在多个监听套接字和已连接套接字(套接字必须是非阻塞的)。内核会一直监听这些套接字上的连接请求或数据请求,一旦有请求到达就会交给主线程处理,这样就实现了一个线程处理多个 IO 流的效果。
上图就是基于多路复用的 IO 模型,我们以 epoll 为例。图中的 FD 是套接字,可以是监听套接字、也可以是已连接套接字,程序会通过 epoll 机制来让内核帮忙监听这些套接字。而此时主线程不会阻塞在某一个特定的套接字上,也就是说不会阻塞在某一个特定的客户端请求处理上。因此基于 epoll,服务端可以同时和多个客户端建立连接并处理请求,从而提升并发性。
但为了在请求到达时能够通知主线程,epoll 提供了基于事件的回调机制,即针对不同事件的发生,调用相应的处理函数。
那回调机制是怎么工作的呢?以上图为例,首先 epoll 一旦监测到 FD 上有请求到达,就会触发相应的事件。这些事件会被放进一个队列中,主线程对该事件队列不断进行处理,这样一来就无需一直轮询是否有请求发生,从而避免资源的浪费。
而在对事件队列中的事件进行处理时,会调用相应的处理函数,这就实现了基于事件的回调。因为主线程一直在对事件队列进行处理,所以能及时响应客户端请求,提升服务的响应性能。
比如连接请求和数据读取请求分别对应 Accept 事件和 Recv 事件,主线程分别对这两个事件注册 accept 和 recv 回调函数。当 Linux 内核监听到有连接请求或数据读取请求时,就会触发 Accept 事件或 Recv 事件,然后通知主线程执行 accept 函数或 recv 函数。
不好理解的话,举个通俗易懂的例子。比如小明要去怡红院,去找小红、小花和小翠,于是他问老鸨,这些姑娘来了没有啊,老鸨说没有。过一会小明又来问,这些姑娘来了没有啊,老鸨说没有。然后小明又问,这个过程就是在不断地轮询。最后老鸨无奈了,问小明:你要找这些姑娘做什么,等她们来了我通知你。
在这个例子中,小明相当于主线程,小红、小花和小翠就相当于套接字,老鸨相当于 epoll,负责监听这些套接字,并且可以同时监听很多个。如果她们来怡红院了,就说明套接字有事件发生了,老鸨就会通知小明,谁谁谁已经来了,你赶快做你想做的事情吧(相当于执行事件处理函数)。
比如小红来了,送她一只口红;小花来了,送她一朵玫瑰;小翠来了,送她一条手链。针对不同的事件执行相应的处理函数,而整个过程小明不需要一直轮询,它完全可以去做别的事情,当套接字有事件发生时,epoll 会通知他。
所以通过将非阻塞 I/O 和 I/O 多路复用技术搭配使用,在非阻塞 I/O 事件发生时,调用对应事件的处理函数,这种方式极大地提高了程序的健壮性和稳定性,是 Linux 下高性能网络编程的首选。
然后我们就来看看如何在 Python 里面使用 IO 多路复用,而且 IO 多路复用有多种,最常见的就是 select、poll 和 epoll,而它们之间又有什么区别呢?下面来一点一点介绍。
多路复用之 select
Python 有一个 select 模块,它内部有一个 select 函数,对应 select IO 多路复用。进程指定内核监听哪些文件描述符,当没有文件描述符事件发生时,进程被阻塞;当一个或者多个文件描述符事件发生时,进程被唤醒。
然后我们来看一下 select 函数。
import select """ select 函数接收四个参数 rlist:一个列表,监听那些可读的 socket wlist:一个列表,监听那些可写的 socket xlist:一个列表,监听那些出错的 socket timeout:超时时间 """ select.select()
这里需要特别指出的是,Linux 一切接文件,套接字也不例外。每一个套接字(文件)都有一个文件描述符(非负整数),用来标识唯一的套接字。如果用 C 实现多路复用,那么会以文件描述符作为参数,有了文件描述符,函数就能找到对应的套接字,进而进行监听、读写等操作。
但在 Python 里面,则是直接使用套接字本身作为参数,而不是使用文件描述符。当然啦,Python 的 select 也是封装了底层的 select。
然后我们来看一下如何使用 select。
import socket import select from queue import Queue from typing import Dict server = socket.socket() server.bind(("localhost", 12345)) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) # 必须设置为非阻塞,IO 多路复用要搭配非阻塞套接字 server.setblocking(False) server.listen(5) # 以上我们就创建了监听套接字,它负责监听是否有客户端连接 # 所以当事件发生时,属于可读事件,代表客户端连接过来了 # 所以 server 应该放在 rlist 里面 rlist = [server] wlist = [] xlist = rlist # 因为可以监听多个套接字,所以 rlist、wlist、xlist 都是列表 # 但在初始状态下,select 只需要监听一个套接字(server)即可 message_queues: Dict[socket.socket, Queue] = {} while True: # 开启 select 监听,返回三个元素 # readable: rlist 中发生可读事件的 socket # writeable: wlist 中发生可写事件的 socket # exceptional:xlist 中发生异常的 socket readable, writeable, exceptional = select.select(rlist, wlist, xlist) # 遍历 readable for r in readable: # 如果 r is server,则代表监听套接字有事件发生 # 显然是客户端连接来了 if r is server: # 监听套接字是非阻塞的,那么已连接套接字默认也是非阻塞的 # 当然你也可以调用 conn.setblocking(False) 显式地设置一下 conn, addr = server.accept() print(f"和客户端 {addr[0]}:{addr[1]} 建立连接") # 将已连接套接字添加到 rlist 中,让它也被 select 监听 # 当客户端发消息时,它会进入活跃状态,有事件发生 # 然后被 select 监测到,放到 readable 中,这样遍历的时候就可以处理它了 rlist.append(conn) # 由于客户端连接之后要发消息,那么我们是不是要将消息保存起来呢? message_queues[conn] = Queue() # 负责保存后续客户端发的消息 else: # 如果 r is not server,则代表是已连接套接字有事件发生 # 说明是某个客户端发送消息了,我们要处理它 data = r.recv(1024) if data: # 这里的 r 就是活跃的已连接套接字,调用它的 getpeername 方法 # 也可以获取到客户端连接的 ip 和 端口 addr = r.getpeername() print(f"收到客户端 {addr[0]}:{addr[1]} 发来的消息:", f"`{data.decode('utf-8')}`") # message_queues 保存了所有的已连接套接字 # 每一个套接字都对应一个队列,找到该活跃套接字对应的队列 message_queues[r].put(data) # 将消息放进去 # 消息放进去了,服务端是不是也要回复呢?显然这属于可写事件 # 我们还要将 r 放到 wlist 中,这样 select 就会监测到 # 然后将它放到 writeable 中,我们遍历的时候就可以处理它了 if r not in wlist: wlist.append(r) else: # 走到这里说明 data 为假,说明客户端断开连接了,发来一个 b'' addr = r.getpeername() print(f"客户端 {addr[0]}:{addr[1]} 已断开连接") # 我们要将套接字从 rlist、wlist 当中移除 # 客户端都断开连接了,那么 select 也就不需要再监听了 rlist.remove(r) if r in wlist: wlist.remove(r) # 对了,还要将它从 message_queues 里面移除 message_queues.pop(r) r.close() # 关闭套接字连接 # 以上是遍历可读事件,可读事件可以发生在监听套接字上面(和客户端建立连接) # 也可以发生在已连接套接字上面(客户端发信息了) # 如果没有事件发生或者处理完毕,那么接下来就要遍历可写事件 # 而可写事件一定发生在已连接套接字上面(要回消息给客户端) for w in writeable: message_queue = message_queues[w] # 一开始队列里面肯定是有消息的,因为我们手动往里面放了一条 # 但如果队列为空,说明服务端已经回复过了,那么要将该套接字从 wlist 里面移除 # 让它变为非活跃状态,不再满足可写 # 等到下一次客户端发消息时,再将它添加到 wlist 中,让它变得可写 if message_queue.empty(): wlist.remove(w) continue # 获取消息 data = message_queue.get() # 发送给客户端 w.send("服务端收到,你发的消息是: ".encode("utf-8") + data) # 然后遍历 xlist,如果在跟某个客户端通信的过程中,出现了错误 # 那么将套接字从 rlist、wlist、xlist、message_queue 当中都删除 # 然后再关闭套接字连接 for x in exceptional: addr = x.getpeername() print(f"和客户端 {addr[0]}:{addr[1]} 通信出现错误") rlist.remove(x) if x in wlist: wlist.remove(x) message_queues.pop(x) x.close()
客户端代码还和之前一样,保持不变,然后来测试一下:
代码应该不难理解,但我们调用 select 背后都发生了什么呢?
1)上下文从用户态进入内核态,把要监听的文件描述符从用户空间拷贝到内核空间;
2)内核通过文件描述符找到所有的套接字,然后遍历,查看套接字是否有对应的事件发生;
3)如果没发生,那么让进程阻塞,当到达规定的超时时间(通过 select 函数的第四个参数指定,不设置则一直阻塞),再将进程唤醒。然后再次进行遍历,直到有事件发生(设备驱动产生中断)。这些都由内核帮我们完成;
4)当事件发生时,对套接字进行遍历,找到那些发生事件的套接字并返回,就是我们代码中的 readable、writeable、exptional;
5)程序对这些活跃的套接字进行处理;
上面的五个步骤就完成了一次 select 监听流程,但很明显我们要一直监听,所以写一个死循环。当 select 监听结束时,立刻开启下一轮 select 监听,因为服务是要不断运行的。
因此通过以上几个步骤不难看出,select 有两个致命的缺陷:
- 每一次监听都要将所有的文件描述符拷贝到内核态,如果描述符非常多的话,这种拷贝会很耗时;
- 当事件发生时,还要将所有的文件描述符都遍历一次,才能找到那些有事件发生的套接字。如果描述符非常多,遍历也需要时间;
然后 select 多路复用其实还有一个缺陷,就是它最多同时监听 1024 个文件描述符。
所以 select 多路复用有三个缺陷,因此在工作中我们很少用它。
多路复用之 poll
第二个要介绍的多路复用是 poll,相比 select,它的最大改进就是取消了最多同时监听 1024 个文件描述符这一限制,但其它的两个缺陷却没有得到改善。
多路复用,Windows 只支持 select,macOS 支持 select、poll,Linux 则是 select、poll 和 epoll 都支持。
下面先来简单看一下 poll,它在 select 模块里面是一个类。
import select from select import POLLIN, POLLOUT, POLLERR, POLLHUP # select.poll 是一个类 # 我们实例化一个 poll 对象 poll = select.poll() # 给指定的套接字绑定事件 # 第一个参数可以是描述符,也可以是套接字 # 第二个参数是要绑定的事件 # POLLIN:可读事件(rlist) # POLLOUT:可写事件(wlist) # POLLERR:出现异常(xlist) # POLLHUP:连接中断 poll.register(..., POLLIN | POLLERR) # 取消某个套接字的事件绑定 poll.unregister(...)
有了 select 的经验,poll 应该不难理解,我们将上面使用 select 的服务端代码,用 poll 重写一下。
import socket import select from select import POLLIN, POLLOUT, POLLERR, POLLHUP from typing import Dict from queue import Queue server = socket.socket() server.bind(("localhost", 12345)) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) server.setblocking(False) server.listen(5) # 通过文件描述符找到套接字 fd2sk = {server.fileno(): server} # 保存套接字接收到的客户端发来的消息 message_queues: Dict[int, Queue] = {} # 实例化一个 poll 对象 poll = select.poll() # 首先要对 server 进行注册,正如我们使用 select.select 时 # 要先将 server 放在 rlist 里面,然后事件为可读 # 不过由于可能发生错误,因此事件类型为 POLLIN | POLLERR poll.register(server, POLLIN | POLLERR) # 开启无限循环 while True: # poll 方法接收一个 timeout,不传则表示不设置超时 # 它和 select.select 的第四个参数的含义相同 # 当有事件发生时,会返回相应的文件描述符和事件 events = poll.poll() # 正式开启监听 # 我们进行遍历 for fd, event in events: # 说明是监听套接字活跃了 if fd == server.fileno(): conn, addr = fd2sk[fd].accept() print(f"和客户端 {addr[0]}:{addr[1]} 建立连接") # 对 conn 进行注册,下一轮循环的时候也会对它进行监听 # 这里可以传文件描述符、也可以传套接字 # 如果传套接字,那么会自动调用 fileno 获取描述符 poll.register(conn, POLLIN | POLLHUP | POLLERR) # 维护文件描述符到套接字的映射 fd2sk[conn.fileno()] = conn # 为每个文件描述符维护一个队列,用于保存客户端发来的消息 message_queues[conn.fileno()] = Queue() # 否则说明是已连接套接字有事件发生 # 那么事件是可读还是可写呢?显然要通过 event 判断 elif event & POLLIN: # 可读 data = fd2sk[fd].recv(1024) if data: # 有数据 addr = fd2sk[fd].getpeername() print(f"收到客户端 {addr[0]}:{addr[1]} 发来的消息:", f"`{data.decode('utf-8')}`") # 客户端发消息,服务端也要回消息 # 因此要给它注册一个可写事件 poll.register(fd, POLLOUT | POLLHUP | POLLERR) # 然后将消息保存起来 message_queues[fd].put(data) else: # 客户端断开连接 addr = fd2sk[fd].getpeername() print(f"客户端 {addr[0]}:{addr[1]} 已断开连接") # 取消监听,会将所有事件全部取消 poll.unregister(fd) # 关闭连接 fd2sk[fd].close() # 从字典中移除 fd2sk.pop(fd) elif event & POLLOUT: # 已连接套接字可写 message_queue = message_queues[fd] if message_queue.empty(): # 队列为空,说明消息已经发出去了 # 那么套接字就不再可写了,因此要取消监听 # 等到下一次客户端发消息时,再变得可写 poll.unregister(fd) # 但 unregister 会取消所有事件的监听 # 因此还要重新注册可读事件 # 不然后续客户端发消息时,就无法处理了 poll.register(fd, POLLIN | POLLHUP | POLLERR) else: data = message_queue.get() fd2sk[fd].send( "服务端收到,你发的消息是: ".encode("utf-8") + data ) elif event & POLLERR: # 发生错误 addr = fd2sk[fd].getpeername() print(f"和客户端 {addr[0]}:{addr[1]} 通信出现错误") poll.unregister(fd) fd2sk[fd].close() message_queues.pop(fd)
这段使用 poll 多路复用实现的服务端,和上面使用 select 多路复用实现的服务端,在效果上一模一样,可以测试一下。不过从使用上讲,poll 的话要方便一些。
然后是性能问题,poll 相比 select,只是改善了支持的最大描述符的数量,因此这两种多路复用基本都不用。
于是 Linux 内核在 2.4 的时候引入了 epoll,它是 IO 多路复用的终极形态,我们来看一下。
多路复用之 epoll
通过非阻塞 IO 和 IO 多路复用,单线程的服务端可以同时和多个客户端通信。我们给每个套接字绑定好相应的事件,然后让内核帮忙监听这些套接字,一旦有时间发生就及时通知。
目前使用的多路复用是 select 和 poll,但这两种多路复用存在性能问题。
1)每一次监听都要将文件描述符拷贝到内核空间,当描述符很多的时候,就会很耗时。那么有没有一种机制,只需要拷贝一次就好了呢?后续就交给内核来维护。虽然要交给内核,导致拷贝无法避免,但能不能不要每次都拷贝。
2)当有套接字活跃的时候,select 和 poll 会被唤醒,但它们醒来之后只知道有套接字活跃了,却不知道是哪些套接字活跃,只能挨个遍历所有的套接字。所以能不能有一种机制,负责告知活跃的套接字呢?
为了解决上面两个问题,Linux 引入了 epoll,这也是我们的重点。不过关于 epoll 的原理一会再说,先来看看如何在 Python 里面使用 epoll。
在使用上,epoll 和 poll 高度相似。
我们将之前的服务端代码,使用 epoll 重写一下。
import select import socket from queue import Queue from select import ( # epoll 和 poll 的用法相似 # 把事件换成 epoll 的事件即可 EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP ) from typing import Dict server = socket.socket() server.bind(("localhost", 12345)) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) server.setblocking(False) server.listen(5) fd2sk = {server.fileno(): server} message_queues: Dict[int, Queue] = {} # 实例化一个 epoll 对象 epoll = select.epoll() # 给 server 注册读事件 epoll.register(server, EPOLLIN | EPOLLERR) while True: # 仍然是调用 poll 方法,开始轮询 events = epoll.poll() for fd, event in events: if fd == server.fileno(): conn, addr = fd2sk[fd].accept() print(f"和客户端 {addr[0]}:{addr[1]} 建立连接") # 给已连接套接字注册读事件 # 第一个参数可以传文件描述符、也可以传套接字 epoll.register(conn, EPOLLIN | EPOLLHUP | EPOLLERR) fd2sk[conn.fileno()] = conn message_queues[conn.fileno()] = Queue() elif event & EPOLLIN: # 可读 data = fd2sk[fd].recv(1024) if data: # 有数据 addr = fd2sk[fd].getpeername() print(f"收到客户端 {addr[0]}:{addr[1]} 发来的消息:", f"`{data.decode('utf-8')}`") # 客户端发消息了,那么套接字要回复消息,因此满足可写 # 但是和 poll 不同,epoll 只能给一个套接字注册一次 # 而之前已经注册过一次了(已连接套接字创建时,注册了读事件) # 因此再注册就会报错,因为不能连续注册 # 所以我们需要将 fd 上的事件取消掉,然后重新注册 epoll.unregister(fd) # 重新注册,此时要同时注册读事件和写事件 epoll.register(fd, EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLERR) message_queues[fd].put(data) else: # 客户端断开连接了 addr = fd2sk[fd].getpeername() print(f"客户端 {addr[0]}:{addr[1]} 已断开连接") epoll.unregister(fd) fd2sk[fd].close() fd2sk.pop(fd) elif event & EPOLLOUT: # 可写 message_queue = message_queues[fd] if message_queue.empty(): # 队列是空的,说明消息已经发走了,那么应该取消写事件 # 做法也很简单:先将事件全部取消,然后重新注册读事件 # 但也可以通过 modify 方法,直接修改事件类型 epoll.modify(fd, EPOLLIN | EPOLLHUP | EPOLLERR) else: data = message_queue.get() fd2sk[fd].send( "服务端收到,你发的消息是: ".encode("utf-8") + data ) elif event & EPOLLERR: addr = fd2sk[fd].getpeername() print(f"和客户端 {addr[0]}:{addr[1]} 通信出现错误") epoll.unregister(fd) fd2sk[fd].close() message_queues.pop(fd)
代码和 poll 高度相似,但它的性能要比 select 和 poll 高很多。经过测试,epoll 在监听 10 个描述符的时候,耗时大概 0.4 秒,而 poll 耗时大概 0.6 秒,两者差别不是很大。但如果监听 10000 个描述符,那么 epoll 耗时大概 0.7 秒,而 poll 的耗时要 990 秒。
所以 epoll 的性能是碾压 select 和 poll 的,那么 epoll 的内部用了哪些黑魔法呢?我们来介绍一下。
首先必须要说明的是,Python select 模块里面的 epoll 实际上是封装了底层操作系统的 epoll,但是让使用变得更简单了,当然 select 和 poll 也是同理。所以接下来我们分析的是,操作系统提供的 epoll。
关于 epoll,操作系统提供了三个 API,分别是 epoll_create,epoll_ctl 和 epoll_wait。
epoll_create
调用 epoll_create 即可创建一个 epoll 实例,函数原型如下:
int epoll_create(int size);
该函数返回一个整型,也就是文件描述符,通过描述符可以找到相应的 epoll 实例。而 Python 在调用 select.epoll() 的时候,底层也会调用 epoll_create,只不过 Python 会封装成一个 epoll 对象再返回,我们直接操作内部的方法即可。
正如打开文件一样,底层在打开文件的时候也是返回一个描述符,而 Python 则是封装一个文件对象再返回。调用文件对象的方法,来操作文件,因为 Python 是面向对象的语言。
函数的 size 参数,在一开始的 epoll_create 实现中,是用来告知内核期望监控的文件描述符的数量,然后内核使用这部分的信息来初始化内核数据结构。但在新的实现中,这个参数不再被需要,因为内核可以动态分配需要的数据结构。我们在使用的时候,将 size 设置成一个大于 0 的整数就可以了。
epoll_ctl
在创建完 epoll 实例之后,可以通过调用 epoll_ctl 往这个 epoll 实例增加或删除监控的事件。函数 epll_ctl 函数如下:
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
第一个参数 epfd 是 epoll 实例的描述符,也就是 epoll_create 的返回值。
第二个参数表示是注册事件、还是取消注册的事件,它有三个选项可供选择:
- EPOLL_CTL_ADD: 给 epoll 实例注册事件;
- EPOLL_CTL_DEL:取消给 epoll 实例注册事件;
- EPOLL_CTL_MOD: 修改给 epoll 实例注册的事件;
第三个参数表示套接字对应的文件描述符。
第四个参数表示注册的事件类型,有以下几种:
- EPOLLIN:读事件;
- EPOLLOUT:写事件;
- EPOLLERR:出现错误;
- EPOLLHUP:连接关闭;
- EPOLLET:触发方式为边缘触发,默认为水平触发(一会解释);
所以该函数就等价于,Python 中 epoll 对象的 register 和 unregister 方法。
epoll_wait
该函数相当于 Python 里面 epoll 对象的 poll 方法,调用之后即可开启监听。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
这种类型的 C 函数,一般返回的都是整型,用来表示成功还是失败。至于真正意义上的返回值,则在调用之前先声明好,然后将指针传进去,函数在内部修改指针指向的值。
所以这里的第二个参数,就相当于 Python 里面 poll 方法返回的 events,里面包含了套接字的描述符和发生的事件。
至于第一个参数,就是 epoll 实例的描述符。
第三个参数表示 epoll 可以处理的最大事件数量。
第四个参数则表示超时时间,不设置的话会一直等待,直到有事件发生。如果设置了,那么当抵达超时时间,无论有没有事件都会立即返回。然后进入下一轮循环,重新开启监听。
epoll 为什么高效?
说完了 epoll 的相关 API,那么我们来聊一聊它为什么高效?
首先是描述符的查找,我们在给某个描述符对应的套接字增加、删除和修改事件的时候,epoll 肯定要找到指定的描述符,那么 epoll 使用什么数据结构来管理这些描述符呢?
答案是红黑树,这是一个非常高效的数据结构,操作元素的时间复杂度为 O(logN),C++ 的 map 也是基于红黑树实现的。
然后再来看看 select 的三个缺点,epoll 是如何改善的,还记得 select 函数的三个缺点吗?
- 每次调用 select.select() 或者 poll.poll() 的时候,都要将描述符从用户空间拷贝到内核空间,当描述符非常多时开销会很大;
- 当有事件发生时,需要遍历所有的描述符,一个一个检测,才能知道是哪些套接字有事件发生。在描述符非常多时,开销同样很大;
- select 支持的文件描述符太少了,默认是 1024;
首先第三个缺点不用多说,epoll 采用红黑树管理描述符,所以它能支持的描述符的数量非常多,就看操作系统能打开多少文件了。
然后是第一个缺点,epoll 在给套接字注册事件、也就是调用 epoll_ctl 的时候就会将其描述符拷贝到内核空间中,而不是等到监听的时候再拷贝。这样的话,每个描述符只需要拷贝一次即可。
最后是第二个缺点,epoll 是通过回调解决的。在给套接字注册事件时,还会为它绑定一个回调函数。当有事件发生时,就会调用该回调函数,将对应的描述符放在一个专门的就绪队列(由双向链表实现)里面,然后唤醒 epoll。
所以就绪队列里面的描述符对应的套接字都是活跃的,不在就绪队列里面则不活跃,也就是没有事件发生,这样就不需要遍历了。内核只需要将就绪队列里的描述符返回即可,并且这个过程还使用了 mmap,省略了拷贝的开销。
Linux 一切皆文件,套接字也不例外,而文件支持很多操作,比如我们熟知的 read, write, fsync, close 等等。但除了这些还有一个 poll 操作,它负责自定制事件的监听逻辑,事件发生时,将描述符添加到就绪队列这一逻辑就由 poll 操作实现。
所以 epoll 管理的描述符对应的文件必须支持 poll 操作,如果文件没有实现,那么它就无法被 epoll 管理。支持 poll 操作的最常见的文件种类就是套接字,而像我们平常使用的文件系统则是不支持的。
因此基于 epoll,单线程也能实现高并发,Redis 和 Nginx 已经证明了这一点。
水平触发和边缘触发
epoll 的工作模式有两种,分别是 LT 和 ET。
- LT(level trigger):水平触发,当 epoll_wait 检测到描述符有事件发生,并将事件通知给应用程序时,应用程序可以不立即处理该事件。下次调用 epoll_wait 时,会再次响应并通知事件。
- ET(edge trigger):边缘触发,当 epoll_wait 检测到描述符有事件发生,并将事件通知给应用程序时,应用程序必须立即处理该事件。如果不处理,下次调用 epoll_wait 时,就不会再通知了。
LT 是默认模式,但 ET模式在很大程度上减少了事件被重复触发的次数,因此效率要比 LT 模式高。
selectors:select 模块的封装
这里再补充一个 selectors 模块,它是 select 模块的一个封装,里面提供了多种 IO 多路复用器。但不管哪一种,它们的操作都是一样的,这样就实现了统一。
我们还是将之前的 server 端,重写一下。
import socket from queue import Queue # selectors 里面提供了多种"多路复用器" # 除了 select、poll、epoll 之外 # 还有 kqueue,这个是针对 BSD 平台的 try: from selectors import ( SelectSelector, PollSelector, EpollSelector, KqueueSelector ) except ImportError: pass # 由于种类比较多,所以提供了 DefaultSelector # 会根据当前的系统种类,自动选择一个合适的多路复用器 from selectors import ( DefaultSelector, EVENT_READ, # 读事件 EVENT_WRITE, # 写事件 ) server = socket.socket() server.bind(("localhost", 12345)) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) server.setblocking(False) server.listen(5) message_queues = {} # 实例化一个多路复用器 sel = DefaultSelector() def accept(server: socket.socket): """和客户端建立连接""" conn, addr = server.accept() print(f"和客户端 {addr[0]}:{addr[1]} 建立连接") # 一旦建立连接,那么就要接收客户端消息 # 所以我们要绑定事件,register 方法接收三个参数 # 参数一:传一个套接字即可 # 参数二:事件类型,这里是读事件 # 参数三:事件发生时,执行的回调函数 sel.register(conn, EVENT_READ, recv) # 表示当 conn 可读时,就去执行 recv 函数 message_queues[conn] = Queue() def recv(conn: socket.socket): """接收客户端数据""" data = conn.recv(1024) addr = conn.getpeername() if data: # 有数据 print(f"收到客户端 {addr[0]}:{addr[1]} 发来的消息:", f"`{data.decode('utf-8')}`") # 收到数据了,那么要给客户端回复,所以要绑定可写事件 # 让事件可写,当事件发生时,执行 send 函数 sel.modify(conn, EVENT_READ | EVENT_WRITE, send) message_queues[conn].put(data) else: print(f"客户端 {addr[0]}:{addr[1]} 已断开连接") conn.close() # 取消监听 sel.unregister(conn) message_queues.pop(conn) def send(conn: socket.socket): """给客户端发送数据""" message_queue = message_queues[conn] if message_queue.empty(): # 队列为空说明已经发送过了,将事件改成可读 # 继续监听客户端发来的消息 sel.modify(conn, EVENT_READ, recv) else: data = message_queue.get() conn.send( "服务端收到,你发的消息是: ".encode("utf-8") + data ) # 给监听套接字注册可读事件 # 当有客户端连接,去执行 accept 函数 sel.register(server, EVENT_READ, accept) # 在 accept 函数里面创建已连接套接字 conn # 然后给 conn 绑定可读事件,当客户端发消息时,去执行 recv 函数 # 在 recv 函数里面给套接字绑定可写事件,然后去执行 send 函数 while True: # 内部会根据操作系统,选择一个合适的多路复用 events = sel.select() # key 是一个 namedtuple # 内部有如下字段:'fileobj', 'fd', 'events', 'data' # key.fd 就是套接字的文件描述符 # key.fileobj 则是套接字本身 # key.data 是给套接字绑定的回调 # key.event 则是事件 for key, mask in events: # 事件发生时,获取回调,然后调用 # 回调显然就是这里的 accept、recv、send callback = key.data callback(key.fileobj)
通过 selectors 模块我们将服务端重新实现了,效果和之前是一样的。整个过程就是给 socket 绑定一个事件 + 回调,当事件发生就会告知我们。但是调用回调不是内核自动完成的,而是由我们手动完成的。"非阻塞 + 回调 + 基于 IO 多路复用的事件循环",所有框架基本都是这个套路。
需要说明的是,这种写法的性能非常高,Redis 和 Nginx 都是基于这种方式实现了高并发,但它和我们传统的同步代码的写法大相径庭。如果是同步代码,那么会先建立连接、然后接收数据、再发送数据,这显然更符合我们人类的思维,逻辑自上而下,非常自然。
但是多路复用加回调的方式,就让人很不适应,我们在建立完连接之后,不能直接收数据,必须将接收数据的逻辑放在一个单独的函数(方法)中,然后再将这个函数以回调的方式注册进去。
同理,在接收完数据之后,也不能立刻发送。同样要将发送数据的逻辑放在一个单独的函数中,然后再以回调的方式注册进去。
所以好端端的自上而下的逻辑,因为回调而被分割的四分五裂,这种代码在编写和维护的时候是非常痛苦的。
比如回调可能会层层嵌套,容易陷入回调地狱,如果某一个回调执行出错了怎么办?代码的可读性差导致不好排查,即便排查到了也难处理。
另外,如果多个回调需要共享一个变量该怎么办?因为回调是通过事件循环调用的,在注册回调的时候很难把变量传过去。简单的做法是把该变量设置为全局变量,或者说多个回调都是某个类的成员函数,然后把共享的变量作为一个属性绑定在 self 上面。但当逻辑复杂时,就很容易导致全局变量满天飞的问题。
所以这种模式就使得开发人员在编写业务逻辑的同时,还要关注并发细节。
因此使用回调的方式编写异步化代码,虽然并发量能上去,但是对开发者很不友好;而使用同步的方式编写同步代码,虽然很容易理解,可并发量却又上不去。那么问题来了,有没有一种办法,能够让我们在享受异步化带来的高并发的同时,又能以同步的方式去编写代码呢?也就是我们能不能以同步的方式去编写异步化的代码呢?
答案是可以的,使用「协程」便可以办到。协程在这种模式的基础之上又批了一层外衣,兼顾了开发效率与运行效率。
关于协程,我们以后再聊。
小结
以上我们就聊了聊 Python 的 socket,以及多路复用是什么,它在 Python 里面该如何使用。
我们在工作中应该不会直接使用这些东西,但了解一下总归是好的,而且它也为我们后面学习协程打下基础。
本文参考自:
- 极客时间:《网络编程实战》
- 极客时间:《Redis 核心技术与实战》
- https://www.cnblogs.com/Anker/p/3265058.html
- 小林coding《图解网络》