Python异步非阻塞IO多路复用Select/Poll/Epoll使用

简介: 来源:http://www.haiyun.me/archives/1056.html 有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的。 下面记录下分别基于Select/Poll/Epoll的echo server实现。 Python Select Server,可监控事件数量有限制: #!/us

来源:http://www.haiyun.me/archives/1056.html

有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的。
下面记录下分别基于Select/Poll/Epoll的echo server实现。
Python Select Server,可监控事件数量有限制:

#!/usr/bin/python
# -*- coding: utf-8 -*-
import select
import socket</span>
import Queue
 
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(False)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR  , 1)
server_address= ('192.168.1.5',8080)
server.bind(server_address)
server.listen(10)
 
#select轮询等待读socket集合
inputs = [server]
#select轮询等待写socket集合
outputs = []
message_queues = {}
#select超时时间
timeout = 20
 
while True:
    print "等待活动连接......"
    readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)
 
    if not (readable or writable or exceptional) :
        print "select超时无活动连接,重新select...... "
        continue;   
    #循环可读事件
    for s in readable :
        #如果是server监听的socket
        if s is server:
            #同意连接
            connection, client_address = s.accept()
            print "新连接: ", client_address
            connection.setblocking(0)
            #将连接加入到select可读事件队列
            inputs.append(connection)
            #新建连接为key的字典,写回读取到的消息
            message_queues[connection] = Queue.Queue()
        else:
            #不是本机监听就是客户端发来的消息
            data = s.recv(1024)
            if data :
                print "收到数据:" , data , "客户端:",s.getpeername()
                message_queues[s].put(data)
                if s not in outputs:
                    #将读取到的socket加入到可写事件队列
                    outputs.append(s)
            else:
                #空白消息,关闭连接
                print "关闭连接:", client_address
                if s in outputs :
                    outputs.remove(s)
                inputs.remove(s)
                s.close()
                del message_queues[s]
    for s in writable:
        try:
            msg = message_queues[s].get_nowait()
        except Queue.Empty:
            print "连接:" , s.getpeername() , '消息队列为空'
            outputs.remove(s)
        else:
            print "发送数据:" , msg , "到", s.getpeername()
            s.send(msg)
 
    for s in exceptional:
        print "异常连接:", s.getpeername()
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
        del message_queues[s]
Python Poll Server,Select升级版,无可监控事件数量限制,还是要轮询所有事件:

#!/usr/bin/python
# -*- coding: utf-8 -*-
import socket
import select
import Queue
 
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = ("192.168.1.5", 8080)
server.bind(server_address)
server.listen(5)
print  "服务器启动成功,监听IP:" , server_address
message_queues = {}
#超时,毫秒
timeout = 5000
#监听哪些事件
READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
READ_WRITE = (READ_ONLY|select.POLLOUT)
#新建轮询事件对象
poller = select.poll()
#注册本机监听socket到等待可读事件事件集合
poller.register(server,READ_ONLY)
#文件描述符到socket映射
fd_to_socket = {server.fileno():server,}
while True:
    print "等待活动连接......"
    #轮询注册的事件集合
    events = poller.poll(timeout)
    if not events:
      print "poll超时,无活动连接,重新poll......"
      continue
    print "有" , len(events), "个新事件,开始处理......"
    for fd ,flag in events:
        s = fd_to_socket[fd]
        #可读事件
        if flag & (select.POLLIN | select.POLLPRI) :
            if s is server :
                #如果socket是监听的server代表有新连接
                connection , client_address = s.accept()
                print "新连接:" , client_address
                connection.setblocking(False)
 
                fd_to_socket[connection.fileno()] = connection
                #加入到等待读事件集合
                poller.register(connection,READ_ONLY)
                message_queues[connection]  = Queue.Queue()
            else :
                #接收客户端发送的数据
                data = s.recv(1024)
                if data:
                    print "收到数据:" , data , "客户端:" , s.getpeername()
                    message_queues[s].put(data)
                    #修改读取到消息的连接到等待写事件集合
                    poller.modify(s,READ_WRITE)
                else :
                    # Close the connection
                    print "  closing" , s.getpeername()
                    # Stop listening for input on the connection
                    poller.unregister(s)
                    s.close()
                    del message_queues[s]
        #连接关闭事件
        elif flag & select.POLLHUP :
            print " Closing ", s.getpeername() ,"(HUP)"
            poller.unregister(s)
            s.close()
        #可写事件
        elif flag & select.POLLOUT :
            try:
                msg = message_queues[s].get_nowait()
            except Queue.Empty:
                print s.getpeername() , " queue empty"
                poller.modify(s,READ_ONLY)
            else :
                print "发送数据:" , data , "客户端:" , s.getpeername()
                s.send(msg)
        #异常事件
        elif flag & select.POLLERR:
            print "  exception on" , s.getpeername()
            poller.unregister(s)
            s.close()
            del message_queues[s]</span>
Python Epoll Server,基于回调的事件通知模式,轻松管理大量连接:
#!/usr/bin/python
# -*- coding: utf-8 -*-
import socket, select
import Queue
 
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = ("192.168.1.5", 8080)
serversocket.bind(server_address)
serversocket.listen(1)
print  "服务器启动成功,监听IP:" , server_address
serversocket.setblocking(0)
timeout = 10
#新建epoll事件对象,后续要监控的事件添加到其中
epoll = select.epoll()
#添加服务器监听fd到等待读事件集合
epoll.register(serversocket.fileno(), select.EPOLLIN)
message_queues = {}
 
fd_to_socket = {serversocket.fileno():serversocket,}
while True:
  print "等待活动连接......"
  #轮询注册的事件集合
  events = epoll.poll(timeout)
  if not events:
     print "epoll超时无活动连接,重新轮询......"
     continue
  print "有" , len(events), "个新事件,开始处理......"
  for fd, event in events:
     socket = fd_to_socket[fd]
     #可读事件
     if event & select.EPOLLIN:
         #如果活动socket为服务器所监听,有新连接
         if socket == serversocket:
            connection, address = serversocket.accept()
            print "新连接:" , address
            connection.setblocking(0)
            #注册新连接fd到待读事件集合
            epoll.register(connection.fileno(), select.EPOLLIN)
            fd_to_socket[connection.fileno()] = connection
            message_queues[connection]  = Queue.Queue()
         #否则为客户端发送的数据
         else:
            data = socket.recv(1024)
            if data:
               print "收到数据:" , data , "客户端:" , socket.getpeername()
               message_queues[socket].put(data)
               #修改读取到消息的连接到等待写事件集合
               epoll.modify(fd, select.EPOLLOUT)
     #可写事件
     elif event & select.EPOLLOUT:
        try:
           msg = message_queues[socket].get_nowait()
        except Queue.Empty:
           print socket.getpeername() , " queue empty"
           epoll.modify(fd, select.EPOLLIN)
        else :
           print "发送数据:" , data , "客户端:" , socket.getpeername()
           socket.send(msg)
     #关闭事件
     elif event & select.EPOLLHUP:
        epoll.unregister(fd)
        fd_to_socket[fd].close()
        del fd_to_socket[fd]
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()

目录
相关文章
|
2月前
|
网络协议 安全 Linux
Linux C/C++之IO多路复用(select)
这篇文章主要介绍了TCP的三次握手和四次挥手过程,TCP与UDP的区别,以及如何使用select函数实现IO多路复用,包括服务器监听多个客户端连接和简单聊天室场景的应用示例。
95 0
|
29天前
|
数据采集 数据安全/隐私保护 开发者
非阻塞 I/O:异步编程提升 Python 应用速度
非阻塞 I/O:异步编程提升 Python 应用速度
|
2月前
|
存储 网络协议 Linux
聊一聊 Python 的 socket,以及 select、poll、epoll 又是怎么一回事?
聊一聊 Python 的 socket,以及 select、poll、epoll 又是怎么一回事?
138 2
|
3月前
|
网络协议 Java Linux
高并发编程必备知识IO多路复用技术select,poll讲解
高并发编程必备知识IO多路复用技术select,poll讲解
|
5月前
|
安全 Java Linux
(七)Java网络编程-IO模型篇之从BIO、NIO、AIO到内核select、epoll剖析!
IO(Input/Output)方面的基本知识,相信大家都不陌生,毕竟这也是在学习编程基础时就已经接触过的内容,但最初的IO教学大多数是停留在最基本的BIO,而并未对于NIO、AIO、多路复用等的高级内容进行详细讲述,但这些却是大部分高性能技术的底层核心,因此本文则准备围绕着IO知识进行展开。
173 1
|
5月前
|
缓存 网络协议 算法
【Linux系统编程】深入剖析:四大IO模型机制与应用(阻塞、非阻塞、多路复用、信号驱动IO 全解读)
在Linux环境下,主要存在四种IO模型,它们分别是阻塞IO(Blocking IO)、非阻塞IO(Non-blocking IO)、IO多路复用(I/O Multiplexing)和异步IO(Asynchronous IO)。下面我将逐一介绍这些模型的定义:
231 2
|
5月前
|
存储 Java Unix
(八)Java网络编程之IO模型篇-内核Select、Poll、Epoll多路复用函数源码深度历险!
select/poll、epoll这些词汇相信诸位都不陌生,因为在Redis/Nginx/Netty等一些高性能技术栈的底层原理中,大家应该都见过它们的身影,接下来重点讲解这块内容。
|
6月前
|
监控 网络协议 Unix
Python Select全面剖析
Python Select全面剖析
67 0
|
4月前
|
存储 Java
【IO面试题 四】、介绍一下Java的序列化与反序列化
Java的序列化与反序列化允许对象通过实现Serializable接口转换成字节序列并存储或传输,之后可以通过ObjectInputStream和ObjectOutputStream的方法将这些字节序列恢复成对象。
|
5月前
|
Java 大数据
解析Java中的NIO与传统IO的区别与应用
解析Java中的NIO与传统IO的区别与应用