tornado和subprocess实现程序的非堵塞异步处理

简介:

ornado     是由Facebook开源的一个服务器“套装",适合于做python的web或者使用其本身提供的可扩展的功能,完成了不完整的wsgi协议,可用于做快速的web开发,封装了epoll性能较好。文章主要以分析tornado的网络部分即异步事件处理与上层的IOstream类提供的异步IO,其他的模块如web的tornado.web 以后慢慢留作分析。


源码组织:

  |---__init__.py

   ---auth.py

   ---......

   ---epoll.c

   ---ioloop.py

   ---iostream.py

   ---...


  tornado网络部分最核心的两个模块就是ioloop.py与iostream.py,我们主要分析的就是这两个部分。

  ioloop.py 主要的是将底层的epoll或者说是其他的IO多路复用封装作异步事件来处理

  iostream.py主要是对于下层的异步事件的进一步封装,为其封装了更上一层的buffer(IO)事件.


这段时间一直在学习tornado的 异步的处理。主要是用ioloop实现多路复用。

下面是个例子,有tornado基础的朋友,一看就懂的~


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import  subprocess
import  tornado.ioloop
import  time
import  fcntl
import  functools
import  os
class  GenericSubprocess (object):
     def __init__ ( self, timeout=- 1 , **popen_args ):
         self.args = dict()
         self.args[ "stdout" ] = subprocess.PIPE
         self.args[ "stderr" ] = subprocess.PIPE
         self.args[ "close_fds" ] = True
         self.args.update(popen_args)
         self.ioloop = None
         self.expiration = None
         self.pipe = None
         self.timeout = timeout
         self.streams = []
         self.has_timed_out = False
     def start(self):
         "" "Spawn the task.
         Throws RuntimeError  if  the task was already started. "" "
         if  not self.pipe  is  None:
             raise RuntimeError( "Cannot start task twice" )
         self.ioloop = tornado.ioloop.IOLoop.instance()
         if  self.timeout >  0 :
             self.expiration = self.ioloop.add_timeout( time.time() + self.timeout, self.on_timeout )
         self.pipe = subprocess.Popen(**self.args)
         self.streams = [ (self.pipe.stdout.fileno(), []),
                              (self.pipe.stderr.fileno(), []) ]
         for  fd, d  in  self.streams:
             flags = fcntl.fcntl(fd, fcntl.F_GETFL)| os.O_NDELAY
             fcntl.fcntl( fd, fcntl.F_SETFL, flags)
             self.ioloop.add_handler( fd,
                                      self.stat,
                                      self.ioloop.READ|self.ioloop.ERROR)
     def on_timeout(self):
         self.has_timed_out = True
         self.cancel()
     def cancel (self ) :
         "" "Cancel task execution
         Sends SIGKILL to the child process. "" "
         try :
             self.pipe.kill()
         except:
             pass
     def stat( self, *args ):
         '' 'Check process completion and consume pending I/O data' ''
         self.pipe.poll()
         if  not self.pipe.returncode  is  None:
             '' 'cleanup handlers and timeouts' ''
             if  not self.expiration  is  None:
                 self.ioloop.remove_timeout(self.expiration)
             for  fd, dest  in   self.streams:
                 self.ioloop.remove_handler(fd)
             '' 'schedulle callback (first try to read all pending data)' ''
             self.ioloop.add_callback(self.on_finish)
         for  fd, dest  in   self.streams:
             while  True:
                 try :
                     data = os.read(fd,  4096 )
                     if  len(data) ==  0 :
                         break
                     dest.extend([data])
                 except:
                     break
     @property
     def stdout(self):
         return  self.get_output( 0 )
     @property
     def stderr(self):
         return  self.get_output( 1 )
     @property
     def status(self):
         return  self.pipe.returncode
     def get_output(self, index ):
         return  "" .join(self.streams[index][ 1 ])
     def on_finish(self):
         raise NotImplemented()
class  Subprocess (GenericSubprocess):
     def __init__ ( self, callback, *args, **kwargs):
         self.callback = callback
         self.done_callback = False
         GenericSubprocess.__init__(self, *args, **kwargs)
     def on_finish(self):
         if  not self.done_callback:
             self.done_callback = True
             '' 'prevent calling callback twice' ''
             self.ioloop.add_callback(functools.partial(self.callback, self.status, self.stdout, self.stderr, self.has_timed_out))
if  __name__ ==  "__main__" :
     ioloop = tornado.ioloop.IOLoop.instance()
     def print_timeout( status, stdout, stderr, has_timed_out) :
         assert(status!= 0 )
         assert(has_timed_out)
         print  "OK status:" , repr(status),  "stdout:" , repr(stdout),  "stderr:" , repr(stderr),  "timeout:" , repr(has_timed_out)
     def print_ok( status, stdout, stderr, has_timed_out) :
         assert(status== 0 )
         assert(not has_timed_out)
         print  "OK status:" , repr(status),  "stdout:" , repr(stdout),  "stderr:" , repr(stderr),  "timeout:" , repr(has_timed_out)
     def print_error( status, stdout, stderr, has_timed_out):
         assert(status!= 0 )
         assert(not has_timed_out)
         print  "OK status:" , repr(status),  "stdout:" , repr(stdout),  "stderr:" , repr(stderr),  "timeout:" , repr(has_timed_out)
     def stop_test():
         ioloop.stop()
     t1 = Subprocess( print_timeout, timeout= 3 , args=[  "sleep" , "5" ] )
     t2 = Subprocess( print_ok, timeout= 3 , args=[  "ip" "a"  ] )
     t3 = Subprocess( print_ok, timeout= 3 , args=[  "sleepdsdasdas" "1"  ] )
     t4 = Subprocess( print_error, timeout= 3 , args=[  "cat" "/etc/sdfsdfsdfsdfsdfsdfsdf"  ] )
     t1.start()
     t2.start()
     try :
         t3.start()
         assert( false )
     except:
         print  "OK"
     t4.start()
     ioloop.add_timeout(time.time() +  10 , stop_test)
     ioloop.start()









 本文转自 rfyiamcool 51CTO博客,原文链接:http://blog.51cto.com/rfyiamcool/1236330,如需转载请自行联系原作者

相关文章
|
4月前
|
数据采集
爬虫之协程异步 asyncio和aiohttp
爬虫之协程异步 asyncio和aiohttp
|
8月前
|
存储 Python
python使用gevent库来创建协程,并通过协程实现并发执行不同的任务
```markdown 这段Python代码利用`gevent`库实现并发执行协程。定义了两个打印函数`f1`和`f2`,分别输出"csdn"和"yyds"。代码首先创建列表`t_l`,并启动5个`f1`协程,将其加入列表并等待所有协程完成。随后,同样方式启动5个`f2`协程,存入`t1_l`列表并等待执行完毕。整体展示了`gevent`的协程并发操作。 ```
64 1
|
调度 Python
16 Tornado - 认识异步
16 Tornado - 认识异步
61 1
|
JSON 前端开发 数据库
17 Tornado - Tornado异步
17 Tornado - Tornado异步
122 1
|
Python
165 python网络编程 - 单进程服务器(gevent版)
165 python网络编程 - 单进程服务器(gevent版)
58 0
|
Python
164 python网络编程 - 协程(gevent版)
164 python网络编程 - 协程(gevent版)
71 0
|
JSON 数据格式 Python
【Python】asyncio+aiohttp——使用协程异步paqu数据
【Python】asyncio+aiohttp——使用协程异步paqu数据
118 0
|
Web App开发 数据采集 Java
使用asyncio库和多线程实现高并发的异步IO操作的爬虫
使用asyncio库和多线程实现高并发的异步IO操作的爬虫
|
安全 Unix Shell
Python 异步: 在非阻塞子进程中运行命令(19)
Python 异步: 在非阻塞子进程中运行命令(19)
913 0
|
安全 调度 开发者
并发异步编程之争:协程(asyncio)到底需不需要加锁?(线程/协程安全/挂起/主动切换)Python3
协程与线程向来焦孟不离,但事实上是,线程更被我们所熟知,在Python编程领域,单核同时间内只能有一个线程运行,这并不是什么缺陷,这实际上是符合客观逻辑的,单核处理器本来就没法同时处理两件事情,要同时进行多件事情本来就需要正在运行的让出处理器,然后才能去处理另一件事情,左手画方右手画圆在现实中本来就不成立,只不过这个让出的过程是线程调度器主动抢占的。
并发异步编程之争:协程(asyncio)到底需不需要加锁?(线程/协程安全/挂起/主动切换)Python3