ornado 是由Facebook开源的一个服务器“套装",适合于做python的web或者使用其本身提供的可扩展的功能,完成了不完整的wsgi协议,可用于做快速的web开发,封装了epoll性能较好。文章主要以分析tornado的网络部分即异步事件处理与上层的IOstream类提供的异步IO,其他的模块如web的tornado.web 以后慢慢留作分析。
源码组织:
|---__init__.py
---auth.py
---......
---epoll.c
---ioloop.py
---iostream.py
---...
tornado网络部分最核心的两个模块就是ioloop.py与iostream.py,我们主要分析的就是这两个部分。
ioloop.py 主要的是将底层的epoll或者说是其他的IO多路复用封装作异步事件来处理
iostream.py主要是对于下层的异步事件的进一步封装,为其封装了更上一层的buffer(IO)事件.
这段时间一直在学习tornado的 异步的处理。主要是用ioloop实现多路复用。
下面是个例子,有tornado基础的朋友,一看就懂的~
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
import
subprocess
import
tornado.ioloop
import
time
import
fcntl
import
functools
import
os
class
GenericSubprocess (object):
def __init__ ( self, timeout=-
1
, **popen_args ):
self.args = dict()
self.args[
"stdout"
] = subprocess.PIPE
self.args[
"stderr"
] = subprocess.PIPE
self.args[
"close_fds"
] = True
self.args.update(popen_args)
self.ioloop = None
self.expiration = None
self.pipe = None
self.timeout = timeout
self.streams = []
self.has_timed_out = False
def start(self):
""
"Spawn the task.
Throws RuntimeError
if
the task was already started.
""
"
if
not self.pipe
is
None:
raise RuntimeError(
"Cannot start task twice"
)
self.ioloop = tornado.ioloop.IOLoop.instance()
if
self.timeout >
0
:
self.expiration = self.ioloop.add_timeout( time.time() + self.timeout, self.on_timeout )
self.pipe = subprocess.Popen(**self.args)
self.streams = [ (self.pipe.stdout.fileno(), []),
(self.pipe.stderr.fileno(), []) ]
for
fd, d
in
self.streams:
flags = fcntl.fcntl(fd, fcntl.F_GETFL)| os.O_NDELAY
fcntl.fcntl( fd, fcntl.F_SETFL, flags)
self.ioloop.add_handler( fd,
self.stat,
self.ioloop.READ|self.ioloop.ERROR)
def on_timeout(self):
self.has_timed_out = True
self.cancel()
def cancel (self ) :
""
"Cancel task execution
Sends SIGKILL to the child process.
""
"
try
:
self.pipe.kill()
except:
pass
def stat( self, *args ):
''
'Check process completion and consume pending I/O data'
''
self.pipe.poll()
if
not self.pipe.returncode
is
None:
''
'cleanup handlers and timeouts'
''
if
not self.expiration
is
None:
self.ioloop.remove_timeout(self.expiration)
for
fd, dest
in
self.streams:
self.ioloop.remove_handler(fd)
''
'schedulle callback (first try to read all pending data)'
''
self.ioloop.add_callback(self.on_finish)
for
fd, dest
in
self.streams:
while
True:
try
:
data = os.read(fd,
4096
)
if
len(data) ==
0
:
break
dest.extend([data])
except:
break
@property
def stdout(self):
return
self.get_output(
0
)
@property
def stderr(self):
return
self.get_output(
1
)
@property
def status(self):
return
self.pipe.returncode
def get_output(self, index ):
return
""
.join(self.streams[index][
1
])
def on_finish(self):
raise NotImplemented()
class
Subprocess (GenericSubprocess):
def __init__ ( self, callback, *args, **kwargs):
self.callback = callback
self.done_callback = False
GenericSubprocess.__init__(self, *args, **kwargs)
def on_finish(self):
if
not self.done_callback:
self.done_callback = True
''
'prevent calling callback twice'
''
self.ioloop.add_callback(functools.partial(self.callback, self.status, self.stdout, self.stderr, self.has_timed_out))
if
__name__ ==
"__main__"
:
ioloop = tornado.ioloop.IOLoop.instance()
def print_timeout( status, stdout, stderr, has_timed_out) :
assert(status!=
0
)
assert(has_timed_out)
print
"OK status:"
, repr(status),
"stdout:"
, repr(stdout),
"stderr:"
, repr(stderr),
"timeout:"
, repr(has_timed_out)
def print_ok( status, stdout, stderr, has_timed_out) :
assert(status==
0
)
assert(not has_timed_out)
print
"OK status:"
, repr(status),
"stdout:"
, repr(stdout),
"stderr:"
, repr(stderr),
"timeout:"
, repr(has_timed_out)
def print_error( status, stdout, stderr, has_timed_out):
assert(status!=
0
)
assert(not has_timed_out)
print
"OK status:"
, repr(status),
"stdout:"
, repr(stdout),
"stderr:"
, repr(stderr),
"timeout:"
, repr(has_timed_out)
def stop_test():
ioloop.stop()
t1 = Subprocess( print_timeout, timeout=
3
, args=[
"sleep"
,
"5"
] )
t2 = Subprocess( print_ok, timeout=
3
, args=[
"ip"
,
"a"
] )
t3 = Subprocess( print_ok, timeout=
3
, args=[
"sleepdsdasdas"
,
"1"
] )
t4 = Subprocess( print_error, timeout=
3
, args=[
"cat"
,
"/etc/sdfsdfsdfsdfsdfsdfsdf"
] )
t1.start()
t2.start()
try
:
t3.start()
assert(
false
)
except:
print
"OK"
t4.start()
ioloop.add_timeout(time.time() +
10
, stop_test)
ioloop.start()
|
本文转自 rfyiamcool 51CTO博客,原文链接:http://blog.51cto.com/rfyiamcool/1236330,如需转载请自行联系原作者