Python异步非阻塞IO多路复用Select/Poll/Epoll使用

简介: 来源:http://www.haiyun.me/archives/1056.html 有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的。 下面记录下分别基于Select/Poll/Epoll的echo server实现。 Python Select Server,可监控事件数量有限制: #!/us

来源:http://www.haiyun.me/archives/1056.html

有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的。
下面记录下分别基于Select/Poll/Epoll的echo server实现。
Python Select Server,可监控事件数量有限制:

#!/usr/bin/python
# -*- coding: utf-8 -*-
import select
import socket</span>
import Queue
 
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(False)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR  , 1)
server_address= ('192.168.1.5',8080)
server.bind(server_address)
server.listen(10)
 
#select轮询等待读socket集合
inputs = [server]
#select轮询等待写socket集合
outputs = []
message_queues = {}
#select超时时间
timeout = 20
 
while True:
    print "等待活动连接......"
    readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)
 
    if not (readable or writable or exceptional) :
        print "select超时无活动连接,重新select...... "
        continue;   
    #循环可读事件
    for s in readable :
        #如果是server监听的socket
        if s is server:
            #同意连接
            connection, client_address = s.accept()
            print "新连接: ", client_address
            connection.setblocking(0)
            #将连接加入到select可读事件队列
            inputs.append(connection)
            #新建连接为key的字典,写回读取到的消息
            message_queues[connection] = Queue.Queue()
        else:
            #不是本机监听就是客户端发来的消息
            data = s.recv(1024)
            if data :
                print "收到数据:" , data , "客户端:",s.getpeername()
                message_queues[s].put(data)
                if s not in outputs:
                    #将读取到的socket加入到可写事件队列
                    outputs.append(s)
            else:
                #空白消息,关闭连接
                print "关闭连接:", client_address
                if s in outputs :
                    outputs.remove(s)
                inputs.remove(s)
                s.close()
                del message_queues[s]
    for s in writable:
        try:
            msg = message_queues[s].get_nowait()
        except Queue.Empty:
            print "连接:" , s.getpeername() , '消息队列为空'
            outputs.remove(s)
        else:
            print "发送数据:" , msg , "到", s.getpeername()
            s.send(msg)
 
    for s in exceptional:
        print "异常连接:", s.getpeername()
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
        del message_queues[s]
Python Poll Server,Select升级版,无可监控事件数量限制,还是要轮询所有事件:

#!/usr/bin/python
# -*- coding: utf-8 -*-
import socket
import select
import Queue
 
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = ("192.168.1.5", 8080)
server.bind(server_address)
server.listen(5)
print  "服务器启动成功,监听IP:" , server_address
message_queues = {}
#超时,毫秒
timeout = 5000
#监听哪些事件
READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
READ_WRITE = (READ_ONLY|select.POLLOUT)
#新建轮询事件对象
poller = select.poll()
#注册本机监听socket到等待可读事件事件集合
poller.register(server,READ_ONLY)
#文件描述符到socket映射
fd_to_socket = {server.fileno():server,}
while True:
    print "等待活动连接......"
    #轮询注册的事件集合
    events = poller.poll(timeout)
    if not events:
      print "poll超时,无活动连接,重新poll......"
      continue
    print "有" , len(events), "个新事件,开始处理......"
    for fd ,flag in events:
        s = fd_to_socket[fd]
        #可读事件
        if flag & (select.POLLIN | select.POLLPRI) :
            if s is server :
                #如果socket是监听的server代表有新连接
                connection , client_address = s.accept()
                print "新连接:" , client_address
                connection.setblocking(False)
 
                fd_to_socket[connection.fileno()] = connection
                #加入到等待读事件集合
                poller.register(connection,READ_ONLY)
                message_queues[connection]  = Queue.Queue()
            else :
                #接收客户端发送的数据
                data = s.recv(1024)
                if data:
                    print "收到数据:" , data , "客户端:" , s.getpeername()
                    message_queues[s].put(data)
                    #修改读取到消息的连接到等待写事件集合
                    poller.modify(s,READ_WRITE)
                else :
                    # Close the connection
                    print "  closing" , s.getpeername()
                    # Stop listening for input on the connection
                    poller.unregister(s)
                    s.close()
                    del message_queues[s]
        #连接关闭事件
        elif flag & select.POLLHUP :
            print " Closing ", s.getpeername() ,"(HUP)"
            poller.unregister(s)
            s.close()
        #可写事件
        elif flag & select.POLLOUT :
            try:
                msg = message_queues[s].get_nowait()
            except Queue.Empty:
                print s.getpeername() , " queue empty"
                poller.modify(s,READ_ONLY)
            else :
                print "发送数据:" , data , "客户端:" , s.getpeername()
                s.send(msg)
        #异常事件
        elif flag & select.POLLERR:
            print "  exception on" , s.getpeername()
            poller.unregister(s)
            s.close()
            del message_queues[s]</span>
Python Epoll Server,基于回调的事件通知模式,轻松管理大量连接:
#!/usr/bin/python
# -*- coding: utf-8 -*-
import socket, select
import Queue
 
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = ("192.168.1.5", 8080)
serversocket.bind(server_address)
serversocket.listen(1)
print  "服务器启动成功,监听IP:" , server_address
serversocket.setblocking(0)
timeout = 10
#新建epoll事件对象,后续要监控的事件添加到其中
epoll = select.epoll()
#添加服务器监听fd到等待读事件集合
epoll.register(serversocket.fileno(), select.EPOLLIN)
message_queues = {}
 
fd_to_socket = {serversocket.fileno():serversocket,}
while True:
  print "等待活动连接......"
  #轮询注册的事件集合
  events = epoll.poll(timeout)
  if not events:
     print "epoll超时无活动连接,重新轮询......"
     continue
  print "有" , len(events), "个新事件,开始处理......"
  for fd, event in events:
     socket = fd_to_socket[fd]
     #可读事件
     if event & select.EPOLLIN:
         #如果活动socket为服务器所监听,有新连接
         if socket == serversocket:
            connection, address = serversocket.accept()
            print "新连接:" , address
            connection.setblocking(0)
            #注册新连接fd到待读事件集合
            epoll.register(connection.fileno(), select.EPOLLIN)
            fd_to_socket[connection.fileno()] = connection
            message_queues[connection]  = Queue.Queue()
         #否则为客户端发送的数据
         else:
            data = socket.recv(1024)
            if data:
               print "收到数据:" , data , "客户端:" , socket.getpeername()
               message_queues[socket].put(data)
               #修改读取到消息的连接到等待写事件集合
               epoll.modify(fd, select.EPOLLOUT)
     #可写事件
     elif event & select.EPOLLOUT:
        try:
           msg = message_queues[socket].get_nowait()
        except Queue.Empty:
           print socket.getpeername() , " queue empty"
           epoll.modify(fd, select.EPOLLIN)
        else :
           print "发送数据:" , data , "客户端:" , socket.getpeername()
           socket.send(msg)
     #关闭事件
     elif event & select.EPOLLHUP:
        epoll.unregister(fd)
        fd_to_socket[fd].close()
        del fd_to_socket[fd]
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()

目录
相关文章
Python 高级编程与实战:深入理解网络编程与异步IO
在前几篇文章中,我们探讨了 Python 的基础语法、面向对象编程、函数式编程、元编程、性能优化、调试技巧、数据科学、机器学习、Web 开发和 API 设计。本文将深入探讨 Python 在网络编程和异步IO中的应用,并通过实战项目帮助你掌握这些技术。
Chainlit:一个开源的异步Python框架,快速构建生产级对话式 AI 应用
Chainlit 是一个开源的异步 Python 框架,帮助开发者在几分钟内构建可扩展的对话式 AI 或代理应用,支持多种工具和服务集成。
351 9
|
3月前
|
深入理解 Python 中的异步操作:async 和 await
Python 的异步编程通过 `async` 和 `await` 关键字处理 I/O 密集型任务,如网络请求和文件读写,显著提高性能。`async` 定义异步函数,返回 awaitable 对象;`await` 用于等待这些对象完成。本文介绍异步编程基础、`async` 和 `await` 的用法、常见模式(并发任务、异常处理、异步上下文管理器)及实战案例(如使用 aiohttp 进行异步网络请求),帮助你高效利用系统资源并提升程序性能。
161 7
Python异步: 什么时候使用异步?
Asyncio 是 Python 中用于异步编程的库,适用于协程、非阻塞 I/O 和异步任务。使用 Asyncio 的原因包括:1) 使用协程实现轻量级并发;2) 采用异步编程范式提高效率;3) 实现非阻塞 I/O 提升 I/O 密集型应用性能。然而,Asyncio 并不适合所有场景,特别是在 CPU 密集型任务或已有线程/进程方案的情况下。选择 Asyncio 应基于项目需求和技术优势。
Grequests,非常 Nice 的 Python 异步 HTTP 请求神器
在Python开发中,处理HTTP请求至关重要。`grequests`库基于`requests`,支持异步请求,通过`gevent`实现并发,提高性能。本文介绍了`grequests`的安装、基本与高级功能,如GET/POST请求、并发控制等,并探讨其在实际项目中的应用。
117 3
Python网络编程:Twisted框架的异步IO处理与实战
【10月更文挑战第26天】Python 是一门功能强大且易于学习的编程语言,Twisted 框架以其事件驱动和异步IO处理能力,在网络编程领域独树一帜。本文深入探讨 Twisted 的异步IO机制,并通过实战示例展示其强大功能。示例包括创建简单HTTP服务器,展示如何高效处理大量并发连接。
103 1
Python网络编程:Twisted框架的异步IO处理与实战
【10月更文挑战第27天】本文介绍了Python网络编程中的Twisted框架,重点讲解了其异步IO处理机制。通过反应器模式,Twisted能够在单线程中高效处理多个网络连接。文章提供了两个实战示例:一个简单的Echo服务器和一个HTTP服务器,展示了Twisted的强大功能和灵活性。
89 0
Python 异步: 在非阻塞子进程中运行命令(19)
Python 异步: 在非阻塞子进程中运行命令(19)
969 0
Python3的原生协程(Async/Await)和Tornado异步非阻塞
我们知道在程序在执行 IO 密集型任务的时候,程序会因为等待 IO 而阻塞,而协程作为一种用户态的轻量级线程,可以帮我们解决这个问题。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存,在调度回来的时候,恢复先前保存的寄存器上下文和栈。因此协程能保留上一次调用时的状态,即所有局部状态的一个特定组合
Python3的原生协程(Async/Await)和Tornado异步非阻塞

热门文章

最新文章

下一篇
oss创建bucket
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等