轻重分离
第二种优化的方法,可以只用下图来解释。即,使用轻量级的线程池(PreProcess)对所有请求进行预处理,所有不需要 I/O 执行时间很短的请求直接执行,如果是需要磁盘 I/O 的则放入下一级阻塞队列,有单独的线程池来处理这些请求。详见下图:
第一级请求使用自己已有的线程池,不再多说。I/O 请求+二级线程池可以使用 twisted 提供的 ThreadPool 机制来实现。而我所说的优化正是使用此方法,代码很简单,如下:
deferred = threads.deferToThread(data_loader.get, sn) deferred.addCallback(self.loader_callback, (req, other_data))
解释一下:
threads.deferToThread 将会将 data_loader.get 放入 reactor 线程池的队列,并返回一个 defer 对象。data_loader.get 由 reactor 的线程池进行执行,执行完成后放入 reactor 的队列,然后由 reactor 主线程来调用 deferred.addCallback 中注册的回调函数。所以回调函数是不会跨线程调用的,如果在回调函数中调用一些不可跨线程的应用(如,memcached 客户端)也可放心使用,这也正是选择 reactor 的线程池作为二级线程池的原因之一。
选择 reactor 的线程池作为二级线程池的原因二:回调函数。因为 Read Thread 将自己负责恢复请求,所以回调函数必不可少。
接下来深入 twisted 源码探究此方法的原理,以下代码均是节选自 twisted2.0.0 源码,其他版本大致相同:
# threads.py def deferToThread(f, *args, **kwargs): d = defer.Deferred() from twisted.internet import reactor reactor.callInThread(_putResultInDeferred, d, f, args, kwargs) return d def _putResultInDeferred(deferred, f, args, kwargs): from twisted.internet import reactor try: result = f(*args, **kwargs) except: f = failure.Failure() reactor.callFromThread(deferred.errback, f) else: reactor.callFromThread(deferred.callback, result)
# base.py def callInThread(self, _callable, *args, **kwargs): if not self.threadpool: self._initThreadPool() self.threadpool.callInThread(_callable, *args, **kwargs) //由线程池执行具体的读取操作 def callFromThread(self, f, *args, **kw): ... self.threadCallQueue.append((f, args, kw)) //放入主线程队列,由主线程执行回调函数 self.wakeUp() ...
注:callInThread / allFromThread,前者是放入线程池执行,后者是 reactor 的队列里,由 reactor 的主线程来执行。
至于 threadpool 的代码在 twisted / python / threadpool 是一个线程池
异步磁盘 I/O
第三种方法是使用经典的服务器模型的 select(epoll)异步 I/O。使用 twisted 框架中的 reactor(epoll/select)+reader,将磁盘 I/O 封装为 reader,交给 reactor 来管理,磁盘 I/O 完成后调用回调函数将数据返回发送改请求的客户端。这样既不会因为 I/O 阻塞请求处理线程也不会如方法二一样因为 I/O 阻塞读取线程,详见下图:
reactor(epoll / select)+ reader 的方法需要继承 abstract.FileDescriptor 并且实现其几个方法,而 twisted 框架中的网络(TCP / UDP)、标准 I/O、进程都有类似的实现。使用时传入文件描述符,如下:
fileReader = FileReader(fd, loader_callback, other_data) reactor.addReader(fileReader)
FileReader 类的实现如下:
class FileReader(abstract.FileDescriptor): def __init__(self, fd, result_callback, args): ... self.fd = fd self.setNonBlocking(self.fd) self.dataRecieved=result_callback self.args=args self.all_data="" def setNonBlocking(self, fd): ... def fileno(self): return self.fd def connectionLost(self, reason): sys.close(self.fd) def doRead(self)://fdesc.readFromFD(self.fd, self.dataReceived) data = os.read(self.fd, 10240) //每次读取1M self.all_data += data if not data: self.dataRecieved(self.all_data , self.args) return CONNECTION_LOST
自己实现的 reader 并没有使用类似其他标准实现中的 fdesc.readFromFD(self.fd, self.dataReceived)来读取数据,因为该函数中提供的回调函数不允许传参,所以自己将 fdesc 实现在了 FileReader 内。
下面是此方法的理论依据:
# pollreactor.py def addReader(self, reader): fd = reader.fileno() if not reads.has_key(fd): selectables[fd] = reader reads[fd] = 1 self._updateRegistration(fd) def _updateRegistration(self, fd): ... mask = 0 if reads.has_key(fd): mask = mask | select.POLLIN poller.register(fd, mask) def _doReadOrWrite(self, selectable, fd, event, POLLIN, POLLOUT, log, faildict={ error.ConnectionDone: failure.Failure(error.ConnectionDone()), error.ConnectionLost: failure.Failure(error.ConnectionLost()) }): ... if event & POLLIN: why = selectable.doRead() inRead = True ... if why: self._disconnectSelectable(selectable, why, inRead)
# posixbase.py def _disconnectSelectable(self, selectable, why, isRead, faildict={ error.ConnectionDone: failure.Failure(error.ConnectionDone()), error.ConnectionLost: failure.Failure(error.ConnectionLost()) }): ... selectable.connectionLost(f)
本文作者 : cyningsun
本文地址 : https://www.cyningsun.com/08-27-2012/cookie-server-performance-optimization.html
版权声明 :本博客所有文章除特别声明外,均采用 CC BY-NC-ND 3.0 CN 许可协议。转载请注明出处!