开发者社区> 技术小胖子> 正文

tornado和subprocess实现程序的非堵塞异步处理

简介:
+关注继续查看

ornado     是由Facebook开源的一个服务器“套装",适合于做python的web或者使用其本身提供的可扩展的功能,完成了不完整的wsgi协议,可用于做快速的web开发,封装了epoll性能较好。文章主要以分析tornado的网络部分即异步事件处理与上层的IOstream类提供的异步IO,其他的模块如web的tornado.web 以后慢慢留作分析。


源码组织:

  |---__init__.py

   ---auth.py

   ---......

   ---epoll.c

   ---ioloop.py

   ---iostream.py

   ---...


  tornado网络部分最核心的两个模块就是ioloop.py与iostream.py,我们主要分析的就是这两个部分。

  ioloop.py 主要的是将底层的epoll或者说是其他的IO多路复用封装作异步事件来处理

  iostream.py主要是对于下层的异步事件的进一步封装,为其封装了更上一层的buffer(IO)事件.


这段时间一直在学习tornado的 异步的处理。主要是用ioloop实现多路复用。

下面是个例子,有tornado基础的朋友,一看就懂的~


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import subprocess
import tornado.ioloop
import time
import fcntl
import functools
import os
class GenericSubprocess (object):
    def __init__ ( self, timeout=-1, **popen_args ):
        self.args = dict()
        self.args["stdout"] = subprocess.PIPE
        self.args["stderr"] = subprocess.PIPE
        self.args["close_fds"] = True
        self.args.update(popen_args)
        self.ioloop = None
        self.expiration = None
        self.pipe = None
        self.timeout = timeout
        self.streams = []
        self.has_timed_out = False
    def start(self):
        """Spawn the task.
        Throws RuntimeError if the task was already started."""
        if not self.pipe is None:
            raise RuntimeError("Cannot start task twice")
        self.ioloop = tornado.ioloop.IOLoop.instance()
        if self.timeout > 0:
            self.expiration = self.ioloop.add_timeout( time.time() + self.timeout, self.on_timeout )
        self.pipe = subprocess.Popen(**self.args)
        self.streams = [ (self.pipe.stdout.fileno(), []),
                             (self.pipe.stderr.fileno(), []) ]
        for fd, d in self.streams:
            flags = fcntl.fcntl(fd, fcntl.F_GETFL)| os.O_NDELAY
            fcntl.fcntl( fd, fcntl.F_SETFL, flags)
            self.ioloop.add_handler( fd,
                                     self.stat,
                                     self.ioloop.READ|self.ioloop.ERROR)
    def on_timeout(self):
        self.has_timed_out = True
        self.cancel()
    def cancel (self ) :
        """Cancel task execution
        Sends SIGKILL to the child process."""
        try:
            self.pipe.kill()
        except:
            pass
    def stat( self, *args ):
        '''Check process completion and consume pending I/O data'''
        self.pipe.poll()
        if not self.pipe.returncode is None:
            '''cleanup handlers and timeouts'''
            if not self.expiration is None:
                self.ioloop.remove_timeout(self.expiration)
            for fd, dest in  self.streams:
                self.ioloop.remove_handler(fd)
            '''schedulle callback (first try to read all pending data)'''
            self.ioloop.add_callback(self.on_finish)
        for fd, dest in  self.streams:
            while True:
                try:
                    data = os.read(fd, 4096)
                    if len(data) == 0:
                        break
                    dest.extend([data])
                except:
                    break
    @property
    def stdout(self):
        return self.get_output(0)
    @property
    def stderr(self):
        return self.get_output(1)
    @property
    def status(self):
        return self.pipe.returncode
    def get_output(self, index ):
        return "".join(self.streams[index][1])
    def on_finish(self):
        raise NotImplemented()
class Subprocess (GenericSubprocess):
    def __init__ ( self, callback, *args, **kwargs):
        self.callback = callback
        self.done_callback = False
        GenericSubprocess.__init__(self, *args, **kwargs)
    def on_finish(self):
        if not self.done_callback:
            self.done_callback = True
            '''prevent calling callback twice'''
            self.ioloop.add_callback(functools.partial(self.callback, self.status, self.stdout, self.stderr, self.has_timed_out))
if __name__ == "__main__":
    ioloop = tornado.ioloop.IOLoop.instance()
    def print_timeout( status, stdout, stderr, has_timed_out) :
        assert(status!=0)
        assert(has_timed_out)
        print "OK status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out)
    def print_ok( status, stdout, stderr, has_timed_out) :
        assert(status==0)
        assert(not has_timed_out)
        print "OK status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out)
    def print_error( status, stdout, stderr, has_timed_out):
        assert(status!=0)
        assert(not has_timed_out)
        print "OK status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out)
    def stop_test():
        ioloop.stop()
    t1 = Subprocess( print_timeout, timeout=3, args=[ "sleep","5"] )
    t2 = Subprocess( print_ok, timeout=3, args=[ "ip""a" ] )
    t3 = Subprocess( print_ok, timeout=3, args=[ "sleepdsdasdas""1" ] )
    t4 = Subprocess( print_error, timeout=3, args=[ "cat""/etc/sdfsdfsdfsdfsdfsdfsdf" ] )
    t1.start()
    t2.start()
    try:
        t3.start()
        assert(false)
    except:
        print "OK"
    t4.start()
    ioloop.add_timeout(time.time() + 10, stop_test)
    ioloop.start()









 本文转自 rfyiamcool 51CTO博客,原文链接:http://blog.51cto.com/rfyiamcool/1236330,如需转载请自行联系原作者

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
Python MySQL数据库4:MySQL与Python交互(上)
Python MySQL数据库4:MySQL与Python交互
5 0
两个使用 Pandas 读取异常数据结构 Excel 的方法,拿走不谢!
两个使用 Pandas 读取异常数据结构 Excel 的方法,拿走不谢!
5 0
Python MySQL数据库4:MySQL与Python交互(下)
Python MySQL数据库4:MySQL与Python交互(下)
4 0
Python MySQL数据库5:MySQL高级知识&账户管理&主从同步配置(上)
Python MySQL数据库5:MySQL高级知识&账户管理&主从同步配置(上)
4 0
肝了3天,整理了90个Pandas案例,强烈建议收藏!(上)
肝了3天,整理了90个Pandas案例,强烈建议收藏!(上)
4 0
肝了3天,整理了90个Pandas案例,强烈建议收藏!(下)
肝了3天,整理了90个Pandas案例,强烈建议收藏!(下)
4 0
Python MySQL数据库5:MySQL高级知识&账户管理&主从同步配置(下)
Python MySQL数据库5:MySQL高级知识&账户管理&主从同步配置
7 0
用 Python 制作子弹图也这么简单,爱了~
众所周知,Python 的应用是非常广泛的,今天我们就通过 matplotlib 库学习下如何制作精美的子弹图
5 0
Python mini-web框架1:WSGI-mini-web框架
Python mini-web框架1:WSGI-mini-web框架
6 0
又肝了3天,整理了80个Python DateTime 例子,必须收藏!(上)
又肝了3天,整理了80个Python DateTime 例子,必须收藏!(上)
3 0
21117
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
OceanBase 入门到实战教程
立即下载
阿里云图数据库GDB,加速开启“图智”未来.ppt
立即下载
实时数仓Hologres技术实战一本通2.0版(下)
立即下载