上一篇《白话tornado源码之待请求阶段》中介绍了tornado框架在客户端请求之前所做的准备(下图1、2部分),本质上就是创建了一个socket服务端,并进行了IP和端口的绑定,但是未执行 socket的accept方法,也就是未获取客户端请求信息。
概述
本篇就来详细介绍tornado服务器(socket服务端)是如何接收用户请求数据以及如果根据用户请求的URL处理并返回数据,也就是上图的3系列所有步骤,如上图【start】是一个死循环,其中利用epoll监听服务端socket句柄,一旦客户端发送请求,则立即调用HttpServer对象的_handle_events方法来进行请求的处理。
对于整个3系列按照功能可以划分为四大部分:
- 获取用户请求数据(上图3.4)
- 根据用户请求URL进行路由匹配,从而使得某个方法处理具体的请求(上图3.5~3.19)
- 将处理后的数据返回给客户端(上图3.21~3.23)
- 关闭客户端socket(上图3.24~3.26)
3.1、HTTPServer对象的_handle_events方法
此处代码主要有三项任务:
1、 socket.accept() 接收了客户端请求。
2、创建封装了客户端socket对象和IOLoop对象的IOStream实例(用于之后获取或输出数据)。
3、创建HTTPConnection对象,其内容是实现整个功能的逻辑。
class HTTPServer(object): def _handle_events(self, fd, events): while True: try: #======== 获取客户端请求 =========# connection, address = self._socket.accept() except socket.error, e: if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): return raise if self.ssl_options is not None: assert ssl, "Python 2.6+ and OpenSSL required for SSL" try: connection = ssl.wrap_socket(connection, server_side=True, do_handshake_on_connect=False, **self.ssl_options) except ssl.SSLError, err: if err.args[0] == ssl.SSL_ERROR_EOF: return connection.close() else: raise except socket.error, err: if err.args[0] == errno.ECONNABORTED: return connection.close() else: raise try: #这是的条件是选择https和http请求方式 if self.ssl_options is not None: stream = iostream.SSLIOStream(connection, io_loop=self.io_loop) else: #将客户端socket对象和IOLoop对象封装到IOStream对象中 #IOStream用于从客户端socket中读取请求信息 stream = iostream.IOStream(connection, io_loop=self.io_loop) #创建HTTPConnection对象 #address是客户端IPdizhi #self.request_callback是Application对象,其中包含了:url映射关系和配置文件等.. #so,HTTPConnection的构造函数就是下一步处理请求的位置了.. HTTPConnection(stream, address, self.request_callback,self.no_keep_alive, self.xheaders) except: logging.error("Error in connection callback", exc_info=True)
3.2、IOStream的__init__方法
此处代码主要两项目任务:
- 封装客户端socket和其他信息,以便之后执行该对象的其他方法获取客户端请求的数据和响应客户信息
- 将客户端socket对象添加到epoll,并且指定当客户端socket对象变化时,就去执行 IOStream的_handle_events方法(调用socket.send给用户响应数据)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
class
IOStream(
object
):
def
__init__(
self
, socket, io_loop
=
None
, max_buffer_size
=
104857600
,
read_chunk_size
=
4096
):
#客户端socket对象
self
.socket
=
socket
self
.socket.setblocking(
False
)
self
.io_loop
=
io_loop
or
ioloop.IOLoop.instance()
self
.max_buffer_size
=
max_buffer_size
self
.read_chunk_size
=
read_chunk_size
self
._read_buffer
=
collections.deque()
self
._write_buffer
=
collections.deque()
self
._write_buffer_frozen
=
False
self
._read_delimiter
=
None
self
._read_bytes
=
None
self
._read_callback
=
None
self
._write_callback
=
None
self
._close_callback
=
None
self
._connect_callback
=
None
self
._connecting
=
False
self
._state
=
self
.io_loop.ERROR
with stack_context.NullContext():
#将客户端socket句柄添加的epoll中,并将IOStream的_handle_events方法添加到 Start 的While循环中
#Start 的While循环中监听客户端socket句柄的状态,以便再最后调用IOStream的_handle_events方法把处理后的信息响应给用户
self
.io_loop.add_handler(
self
.socket.fileno(),
self
._handle_events,
self
._state)
|
3.3、HTTPConnections的__init__方法
此处代码主要两项任务:
- 获取请求数据
- 调用 _on_headers 继续处理请求
对于获取请求数据,其实就是执行IOStream的read_until函数来完成,其内部通过socket.recv(4096)方法获取客户端请求的数据,并以 【\r\n\r\n】作为请求信息结束符(http请求头和内容通过\r\n\r\n分割)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
class
HTTPConnection(
object
):
def
__init__(
self
, stream, address, request_callback, no_keep_alive
=
False
,xheaders
=
False
):
self
.stream
=
stream
#stream是封装了客户端socket和IOLoop实例的IOStream对象
self
.address
=
address
#address是客户端IP地址
self
.request_callback
=
request_callback
#request_callback是封装了URL映射和配置文件的Application对象。
self
.no_keep_alive
=
no_keep_alive
self
.xheaders
=
xheaders
self
._request
=
None
self
._request_finished
=
False
#获取请求信息(请求头和内容),然后执行 HTTPConnection的_on_headers方法继续处理请求
self
._header_callback
=
stack_context.wrap(
self
._on_headers)
self
.stream.read_until(
"\r\n\r\n"
,
self
._header_callback)
|
请求数据格式:
GET / HTTP/1.1
Host: localhost:8888
Connection: keep-alive
Cache-Control: max-age=0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
User-Agent: Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.118 Safari/537.36
Accept-Encoding: gzip, deflate, sdch
Accept-Language: zh-CN,zh;q=0.8,en;q=0.6,zh-TW;q=0.4
If-None-Match: "e02aa1b106d5c7c6a98def2b13005d5b84fd8dc8"
详细代码解析:



3.4、HTTPConnnection的 _on_headers 方法(含3.5)
上述代码主要有两个任务:
- 根据获取的请求信息生成响应的请求头键值对,并把信息封装到HttpRequest对象中
- 调用Application的__call__方法,继续处理请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
class
HTTPConnection(
object
):
def
_on_headers(
self
, data):
try
:
data
=
native_str(data.decode(
'latin1'
))
eol
=
data.find(
"\r\n"
)
#获取请求的起始行数据,例如:GET / HTTP/1.1
start_line
=
data[:eol]
try
:
#请求方式、请求地址、http版本号
method, uri, version
=
start_line.split(
" "
)
except
ValueError:
raise
_BadRequestException(
"Malformed HTTP request line"
)
if
not
version.startswith(
"HTTP/"
):
raise
_BadRequestException(
"Malformed HTTP version in HTTP Request-Line"
)
#把请求头信息包装到一个字典中。(不包括第一行)
headers
=
httputil.HTTPHeaders.parse(data[eol:])
#把请求信息封装到一个HTTPRequest对象中
#注意:self._request = HTTPRequest,
#HTTPRequest中封装了HTTPConnection
#HTTPConnection中封装了stream和application
self
._request
=
HTTPRequest(connection
=
self
, method
=
method, uri
=
uri, version
=
version,headers
=
headers, remote_ip
=
self
.address[
0
])
#从请求头中获取 Content-Length
content_length
=
headers.get(
"Content-Length"
)
if
content_length:
content_length
=
int
(content_length)
if
content_length >
self
.stream.max_buffer_size:
raise
_BadRequestException(
"Content-Length too long"
)
if
headers.get(
"Expect"
)
=
=
"100-continue"
:
self
.stream.write(
"HTTP/1.1 100 (Continue)\r\n\r\n"
)
self
.stream.read_bytes(content_length,
self
._on_request_body)
return
#**************** 执行Application对象的 __call__ 方法,也就是路由系统的入口 *******************
self
.request_callback(
self
._request)
except
_BadRequestException, e:
logging.info(
"Malformed HTTP request from %s: %s"
,
self
.address[
0
], e)
self
.stream.close()
return
|

3.6、Application的__call__方法(含3.7、3.8、3.9)
此处代码主要有三个项任务:
- 根据请求的url和封装在Application对象中的url映射做匹配,获取url所对应的Handler对象。ps:Handlers泛指继承RequestHandler的类
- 创建Handler对象,即:执行Handler的__init__方法
- 执行Handler对象的 _execute 方法
注意:
1、执行Application的 __call__ 方法时,其参数request是HTTPRequest对象(其中封装HTTPConnetion、Stream、Application对象、请求头信息)
2、Handler泛指就是我们定义的用于处理请求的类并且她还继承自RequestHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
class
Application(
object
):
def
__call__(
self
, request):
"""Called by HTTPServer to execute the request."""
transforms
=
[t(request)
for
t
in
self
.transforms]
handler
=
None
args
=
[]
kwargs
=
{}
#根据请求的目标主机,匹配主机模版对应的正则表达式和Handlers
handlers
=
self
._get_host_handlers(request)
if
not
handlers:
handler
=
RedirectHandler(
self
, request, url
=
"http://"
+
self
.default_host
+
"/"
)
else
:
for
spec
in
handlers:
match
=
spec.regex.match(request.path)
if
match:
# None-safe wrapper around url_unescape to handle
# unmatched optional groups correctly
def
unquote(s):
if
s
is
None
:
return
s
return
escape.url_unescape(s, encoding
=
None
)
handler
=
spec.handler_class(
self
, request,
*
*
spec.kwargs)
#创建RquestHandler对象
# Pass matched groups to the handler. Since
# match.groups() includes both named and unnamed groups,
# we want to use either groups or groupdict but not both.
# Note that args are passed as bytes so the handler can
# decide what encoding to use.
kwargs
=
dict
((k, unquote(v))
for
(k, v)
in
match.groupdict().iteritems())
if
kwargs:
args
=
[]
else
:
args
=
[unquote(s)
for
s
in
match.groups()]
break
if
not
handler:
handler
=
ErrorHandler(
self
, request, status_code
=
404
)
# In debug mode, re-compile templates and reload static files on every
# request so you don't need to restart to see changes
if
self
.settings.get(
"debug"
):
if
getattr
(RequestHandler,
"_templates"
,
None
):
for
loader
in
RequestHandler._templates.values():
loader.reset()
RequestHandler._static_hashes
=
{}
#==== 执行RequestHandler的_execute方法 ====
handler._execute(transforms,
*
args,
*
*
kwargs)
return
handler
|


上述过程中,首先根据请求的URL去路由规则中匹配,一旦匹配成功,则创建路由相对应的handler的实例。例如:如果请求 的url是【/index/11】则会创建IndexHandler实例,然后再执行该对象的 _execute 方法。由于所有的 xxxHandler 类是RequestHandler的派生类,所以会默认执行 RequestHandler的 _execute 方法。
3.10 RequestHandler的_execute方法 (含有3.11、3.12、3.13)
此处代码主要有三项任务:
- 扩展点,因为self.prepare默认是空方法,所有可以在这里被重写
- 通过反射执行Handler的get/post/put/delete等方法
- 完成请求处理后,执行finish方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
class
RequestHandler(
object
):
def
_execute(
self
, transforms,
*
args,
*
*
kwargs):
"""Executes this request with the given output transforms."""
self
._transforms
=
transforms
with stack_context.ExceptionStackContext(
self
._stack_context_handle_exception):
if
self
.request.method
not
in
self
.SUPPORTED_METHODS:
raise
HTTPError(
405
)
# If XSRF cookies are turned on, reject form submissions without
# the proper cookie
if
self
.request.method
not
in
(
"GET"
,
"HEAD"
)
and
\
self
.application.settings.get(
"xsrf_cookies"
):
self
.check_xsrf_cookie()
self
.prepare()
if
not
self
._finished:
#通过反射的方法,执行 RequestHandler 派生类的的 get、post、put方法
getattr
(
self
,
self
.request.method.lower())(
*
args,
*
*
kwargs)
if
self
._auto_finish
and
not
self
._finished:
self
.finish()
|
例:用户发送get请求


上述在执行RequestHandler的write方法时,讲数据保存在Handler对象的 _write_buffer 列表中,在之后执行finish时再讲数据写到IOStream对象的_write_buffer字段中,其类型是双向队列collections.deque()。
3.14、执行RequestHandler的finish
此段代码主要有两项任务:
- 将用户处理请求后返回的数据发送到IOStream的_write_buffer队列中
- 纪录操作日志
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
class
RequestHandler:
def
finish(
self
, chunk
=
None
):
"""Finishes this response, ending the HTTP request."""
assert
not
self
._finished
if
chunk
is
not
None
:
self
.write(chunk)
if
not
self
._headers_written:
if
(
self
._status_code
=
=
200
and
self
.request.method
in
(
"GET"
,
"HEAD"
)
and
"Etag"
not
in
self
._headers):
hasher
=
hashlib.sha1()
for
part
in
self
._write_buffer:
hasher.update(part)
etag
=
'"%s"'
%
hasher.hexdigest()
inm
=
self
.request.headers.get(
"If-None-Match"
)
if
inm
and
inm.find(etag) !
=
-
1
:
self
._write_buffer
=
[]
self
.set_status(
304
)
else
:
self
.set_header(
"Etag"
, etag)
if
"Content-Length"
not
in
self
._headers:
content_length
=
sum
(
len
(part)
for
part
in
self
._write_buffer)
self
.set_header(
"Content-Length"
, content_length)
if
hasattr
(
self
.request,
"connection"
):
self
.request.connection.stream.set_close_callback(
None
)
if
not
self
.application._wsgi:
#将处理请求返回的数据发送到IOStream的_write_buffer队列中
self
.flush(include_footers
=
True
)
self
.request.finish()
#纪录日志
self
._log()
self
._finished
=
True
|
3.15、执行RequestHandler的flush方法
此处代码主要有一项任务:
- 将处理请求返回的数据发送到IOStream的_write_buffer队列中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
def
flush(
self
, include_footers
=
False
):
"""Flushes the current output buffer to the network."""
if
self
.application._wsgi:
raise
Exception(
"WSGI applications do not support flush()"
)
chunk
=
"".join(
self
._write_buffer)
self
._write_buffer
=
[]
if
not
self
._headers_written:
self
._headers_written
=
True
for
transform
in
self
._transforms:
self
._headers, chunk
=
transform.transform_first_chunk(
self
._headers, chunk, include_footers)
headers
=
self
._generate_headers()
else
:
for
transform
in
self
._transforms:
chunk
=
transform.transform_chunk(chunk, include_footers)
headers
=
""
# Ignore the chunk and only write the headers for HEAD requests
if
self
.request.method
=
=
"HEAD"
:
if
headers:
self
.request.write(headers)
return
if
headers
or
chunk:
#执行HTTPReqeust的write方法
self
.request.write(headers
+
chunk)
|


以上代码执行完成之后,请求的处理基本上就完成了。下面就是等待监听客户端socket句柄的epoll触发,然后执行IOStream的_handle_event方法来将 响应数据发送给客户端。
3.20、执行RequestHandler的_log方法
此处代码主要有一项任务:
- 记录操作日志(利用logging模块)
1
2
3
4
|
class
RequestHandler:
def
_log(
self
):
self
.application.log_request(
self
)
|

3.21、IOStream的Handle_event方法
由于epoll中不但监听了服务器socket句柄还监听了客户端sokcet句柄,所以当客户端socket对象变化时,就会去调用之前指定的IOStream的_handler_events方法。
此段代码主要有一项任务:
- 将处理之后的响应数据发送给客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
class
IOStream(
object
):
def
_handle_events(
self
, fd, events):
if
not
self
.socket:
logging.warning(
"Got events for closed stream %d"
, fd)
return
try
:
if
events &
self
.io_loop.READ:
self
._handle_read()
if
not
self
.socket:
return
if
events &
self
.io_loop.WRITE:
if
self
._connecting:
self
._handle_connect()
#执行_handle_write方法,内部调用socket.send将数据响应给客户端
self
._handle_write()
if
not
self
.socket:
return
if
events &
self
.io_loop.ERROR:
self
.close()
return
state
=
self
.io_loop.ERROR
if
self
.reading():
state |
=
self
.io_loop.READ
if
self
.writing():
state |
=
self
.io_loop.WRITE
if
state !
=
self
._state:
self
._state
=
state
self
.io_loop.update_handler(
self
.socket.fileno(),
self
._state)
except
:
logging.error(
"Uncaught exception, closing connection."
,
exc_info
=
True
)
self
.close()
raise
|
3.22、IOStream的_handle_write方法
此段代码主要有两项任务:
- 调用socket.send给客户端发送响应数据
- 执行回调函数HTTPConnection的_on_write_complete方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
class
IOStream(
object
):
def
_handle_write(
self
):
while
self
._write_buffer:
try
:
if
not
self
._write_buffer_frozen:
_merge_prefix(
self
._write_buffer,
128
*
1024
)
#调用客户端socket对象的send方法发送数据
num_bytes
=
self
.socket.send(
self
._write_buffer[
0
])
self
._write_buffer_frozen
=
False
_merge_prefix(
self
._write_buffer, num_bytes)
self
._write_buffer.popleft()
except
socket.error, e:
if
e.args[
0
]
in
(errno.EWOULDBLOCK, errno.EAGAIN):
self
._write_buffer_frozen
=
True
break
else
:
logging.warning(
"Write error on %d: %s"
,
self
.socket.fileno(), e)
self
.close()
return
if
not
self
._write_buffer
and
self
._write_callback:
callback
=
self
._write_callback
self
._write_callback
=
None
#执行回调函数关闭客户端socket连接(HTTPConnection的_on_write_complete方法)
self
._run_callback(callback)
|

注:IOStream的_run_callback方法内部调用了HTTPConnection的_on_write_complete方法
3.23、执行HTTPConnection的_on_write_complete方法
此处代码主要有一项任务:
- 更新客户端socket所在epoll中的状态为【READ】,以便之后执行3.24时关闭socket客户端。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
class
HTTPConnection(
object
):
def
_on_write_complete(
self
):
if
self
._request_finished:
self
._finish_request()
def
_finish_request(
self
):
if
self
.no_keep_alive:
disconnect
=
True
else
:
connection_header
=
self
._request.headers.get(
"Connection"
)
if
self
._request.supports_http_1_1():
disconnect
=
connection_header
=
=
"close"
elif
(
"Content-Length"
in
self
._request.headers
or
self
._request.method
in
(
"HEAD"
,
"GET"
)):
disconnect
=
connection_header !
=
"Keep-Alive"
else
:
disconnect
=
True
self
._request
=
None
self
._request_finished
=
False
if
disconnect:
self
.stream.close()
return
self
.stream.read_until(
"\r\n\r\n"
,
self
._header_callback)
|



3.24、IOStream的_handle_write方法(含3.25、3.26)
此段代码主要有一项任务:
- 关闭客户端socket
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
class
IOStream(
object
):
def
_handle_events(
self
, fd, events):
if
not
self
.socket:
logging.warning(
"Got events for closed stream %d"
, fd)
return
try
:
#由于在 2.23 步骤中已经将epoll的状态更新为READ,所以这次会执行_handle_read方法
if
events &
self
.io_loop.READ:
self
._handle_read()
#执行完_handle_read后,客户端socket被关闭且置空,所有此处就会执行return
if
not
self
.socket:
return
#===============================终止===========================
if
events &
self
.io_loop.WRITE:
if
self
._connecting:
self
._handle_connect()
self
._handle_write()
if
not
self
.socket:
return
if
events &
self
.io_loop.ERROR:
self
.close()
return
state
=
self
.io_loop.ERROR
if
self
.reading():
state |
=
self
.io_loop.READ
if
self
.writing():
state |
=
self
.io_loop.WRITE
if
state !
=
self
._state:
self
._state
=
state
self
.io_loop.update_handler(
self
.socket.fileno(),
self
._state)
except
:
logging.error(
"Uncaught exception, closing connection."
,
exc_info
=
True
)
self
.close()
raise
|




结束语
以上就是tornado源码针对请求的主要内容,另外,大家可能注意到我们返回给用户的只是一个简单的“hello world”,tornado返回复杂的内容时又需要使用模板语言,至于如何生成复杂的页面,我们会在下一篇再会剖析。
读者如果觉得那里错误或不适,请与我联系!!!
本文转自武沛齐博客园博客,原文链接:http://www.cnblogs.com/wupeiqi/p/4540398.html,如需转载请自行联系原作者