cowboy是基于ranch的http服务器。特点是功能强打(支持完整的http协议websocket,spdy等),简洁,轻量级。
项目的连接再这里 https://github.com/extend/cowboy。
首先回顾一下Http 协议
在shell下 curl http://127.0.0.1:8080, 同时用tcpdump抓包可以看到一个最简单的Http的Get请求。
GET / HTTP1.1 r\n User-Agent: curl17.36.0 r\n Host: 127.0.0.1:8080 r\n Accept: text/plain r\n r\n
http request由3部分组成:请求行,消息报头,请求正文。
1) 请求行:就是第一行,由http的方法开始,由空格分割。后面是url路径,http的版本。
2) 后面的User-Agent,Host,Accept都是消息报头。
3) Accept用于RESTFul。
4) 请求正文为空。
post请求
post请求用于请求表单,表单的内容就是消息正文。 POST /reg.jsp HTTP/ (CRLF) Accept:image/gif,image/x-xbit,... (CRLF) ... HOST:www.guet.edu.cn (CRLF) Content-Length:22 (CRLF) Connection:Keep-Alive (CRLF) Cache-Control:no-cache (CRLF) (CRLF) //该CRLF表示消息报头已经结束,在此之前为消息报头 user=jeffrey&pwd=1234 //此行以下为提交的数据
cowboy的入口
%% hello_world_app.erl Dispatch = cowboy_router:compile([ {'_', [ {"/", toppage_handler, []} ]} ]), {ok, _} = cowboy:start_http(http, 100, [{port, 8080}], [ {env, [{dispatch, Dispatch}]} ]),
用户需要提供一个url转发规则Dispatch。
然后启动cowboy:start_http
cowboy是如何启动的?
start_https(Ref, NbAcceptors, TransOpts, ProtoOpts) when is_integer(NbAcceptors), NbAcceptors > 0 -> ranch:start_listener(Ref, NbAcceptors, ranch_ssl, TransOpts, cowboy_protocol, ProtoOpts).
可以看到cowboy是运行在ranch上,所以需要提供一个回调函数:cowboy_protocol。所有http request的请求都是这个模块处理。
一旦客户端的浏览器和cowboy的服务器建立连接,ranch的acceptor连接池会把这个连接转交给一个cowboy_protocol的子进程。
而用户设置的dispatch是放在ProtoOpts里。
cowboy_protocol是如何启动的?
%% cowboy_protocol.erl start_link(Ref, Socket, Transport, Opts) -> Pid = spawn_link(?MODULE, init, [Ref, Socket, Transport, Opts]), {ok, Pid}.
ranch启动子进程处理连接的时候,会调用模块的start_link函数,也就是cowboy_protocol:start_link。
进一步调用cowboy_protocol:init
init(Ref, Socket, Transport, Opts) -> Compress = get_value(compress, Opts, false), MaxEmptyLines = get_value(max_empty_lines, Opts, 5), MaxHeaderNameLength = get_value(max_header_name_length, Opts, 64), MaxHeaderValueLength = get_value(max_header_value_length, Opts, 4096), MaxHeaders = get_value(max_headers, Opts, 100), MaxKeepalive = get_value(max_keepalive, Opts, 100), MaxRequestLineLength = get_value(max_request_line_length, Opts, 4096), Middlewares = get_value(middlewares, Opts, [cowboy_router, cowboy_handler]), Env = [{listener, Ref}|get_value(env, Opts, [])], OnRequest = get_value(onrequest, Opts, undefined), OnResponse = get_value(onresponse, Opts, undefined), Timeout = get_value(timeout, Opts, 5000), ok = ranch:accept_ack(Ref), wait_request(<<>>, #state{socket=Socket, transport=Transport, middlewares=Middlewares, compress=Compress, env=Env, max_empty_lines=MaxEmptyLines, max_keepalive=MaxKeepalive, max_request_line_length=MaxRequestLineLength, max_header_name_length=MaxHeaderNameLength, max_header_value_length=MaxHeaderValueLength, max_headers=MaxHeaders, onrequest=OnRequest, onresponse=OnResponse, timeout=Timeout, until=until(Timeout)}, 0).
这个函数主要是获取一些限制的参数,比如:Header的最大长度。
Middlewares 是’中间层‘处理模块。这个模块的调用时机是解析完Http request。用户可以指定,默认值是cowboy_router和cowboy_handler。
MaxKeepalive 是保持连接的最长时间。
wait_request 一边接收数据一边解析。
注意,wait_request参数描述了它的状态。一个空的Buffer,一个State,和初始化为0的ReqEmpty。
开始解析Http Request
对于http服务器来说主要工作就是:解析http request,然后根据请求中的url,和用户设置的转发规则,调用相应的处理模块。最后,把处理结果返回。
接收数据
wait_request(Buffer, State=#state{socket=Socket, transport=Transport, until=Until}, ReqEmpty) -> case recv(Socket, Transport, Until) of {ok, Data} -> parse_request(<< Buffer/binary, Data/binary >>, State, ReqEmpty); {error, _} -> terminate(State) end.
wait_request 仅仅是接收数据。只要收到一点数据就调用parse_request进行解析。
并不是收到完整的Http Request Header才开始解析。这样收到一点数据就解析一点数据效率更好。
另外,注意:本次收到得数据Data是append在Buffer后面的。因为,流式分析可能一次收到的数据不是完整的一行,需要再次进入这个函数,接收数据。
状态机
整个Http Request Header的解析就是一个状态机。起点是parse_request,终点就是解析完毕调用用户的回调。
请求行的解析
开始解析http请求包的第一行,也就是请求行。
状态机起点:parse_request。
parse_request(Buffer, State=#state{max_request_line_length=MaxLength, max_empty_lines=MaxEmpty}, ReqEmpty) -> case match_eol(Buffer, 0) of nomatch when byte_size(Buffer) > MaxLength -> error_terminate(414, State); nomatch -> wait_request(Buffer, State, ReqEmpty); 1 when ReqEmpty =:= MaxEmpty -> error_terminate(400, State); 1 -> << _:16, Rest/binary >> = Buffer, parse_request(Rest, State, ReqEmpty + 1); _ -> parse_method(Buffer, State, <<>>) end.
Buffer是收到的数据,首先调用match_eol顺序遍历找换行$n的位置。
如果是nomatch,有两种情况:
1) nomatch when byte_size(Buffer) > MaxLength 长度大于最大长度,返回414.
2) nomatch, 这个时候需要再接收一点数据,再次进入wait_request。
如果返回1,说明是一个空行。
如果返回非nomach,非1,则说明目前Buffer里至少有完整一行的数据。
态机进入parse_method
这里的method就是http的get, put, delete, post。
parse_method(<< C, Rest/bits >>, State, SoFar) -> case C of $r -> error_terminate(400, State); $s -> parse_uri(Rest, State, SoFar); _ -> parse_method(Rest, State, << SoFar/binary, C >>) end.
method和后面的uri的分鬲符是空格$s。 parse_method解析的过程是:对参数进行'Head|Tail'方式绑定。然后判断Head是不是想要的。
1) 如果不是,则参数用Tail,把Header用最后一个参数保存起来。parse_method(Rest, State, << SoFar/binary, C >>)
2_ 如果匹配上了,则进入下一个阶段的解析。parse_uri(Rest, State, SoFar)。
状态机进入parse_uri
这里的uri就是请求资源的uri。
parse_uri(<< $r, _/bits >>, State, _) -> error_terminate(400, State); parse_uri(<< "# ", Rest/bits >>, State, Method) -> parse_version(Rest, State, Method, <<"#">>, <<>>); parse_uri(<< "http://", Rest/bits >>, State, Method) -> parse_uri_skip_host(Rest, State, Method); parse_uri(<< "https://", Rest/bits >>, State, Method) -> parse_uri_skip_host(Rest, State, Method); parse_uri(Buffer, State, Method) -> parse_uri_path(Buffer, State, Method, <<>>).
如果Http Request Header里面的uri部分是#,直接进入下一阶段的解析:parse_version ; 如果是以'http://'或'https://'开头的,要跳过主机名:parse_uri_skip_host ; 最后的情况就是正常的uri,调用parse_uri_path。 注意:parse_uri的第3参数Method,是上一个状态parse_method的返回结果。
parse_uri_path(<< C, Rest/bits >>, State, Method, SoFar) -> case C of $r -> error_terminate(400, State); $s -> parse_version(Rest, State, Method, SoFar, <<>>); $? -> parse_uri_query(Rest, State, Method, SoFar, <<>>); $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<>>); _ -> parse_uri_path(Rest, State, Method, << SoFar/binary, C >>) end.
parse_uri_path的方式和parse_method的方式一样:取出头一个字节,判断是否匹配上了$s。 1) 如果没有匹配,把这个字节保存在最后一个参数SoFar后面。 2) 如果匹配上了,进入下一阶段的解析。同时最后一个参数SoFar就是这个阶段的解析战果。
状态机进入parse_version
这里的version就是http的版本。
parse_version(<< "HTTP/1.1r\n", Rest/bits >>, S, M, P, Q) -> parse_header(Rest, S, M, P, Q, 'HTTP/1.1', []); parse_version(<< "HTTP/1.0r\n", Rest/bits >>, S, M, P, Q) -> parse_header(Rest, S, M, P, Q, 'HTTP/1.0', []); parse_version(_, State, _, _, _) -> error_terminate(505, State).
parse_version的方式是直接进行字符串匹配了。因为version要么是"HTTP/1.1r\n", 要么是"HTTP/1.0\r\n"。 匹配上了后,进入下一阶段的解析,同时把本阶段的解析战果当作参数往后面传递。 至此,已经解析完了一个完整的一行。
请求头的解析
前面几个状态已经解析完了http的请求行。 下面开始解析http的请求头。
状态机进入parser_header
请求头的格式是分号分割的kv。
#+BEGIN_SRC erlang parse_header(Buffer, State=#state{max_header_name_length=MaxLength}, M, P, Q, V, H) -> case match_colon(Buffer, 0) of nomatch when byte_size(Buffer) > MaxLength -> error_terminate(400, State); nomatch -> wait_header(Buffer, State, M, P, Q, V, H); _ -> parse_hd_name(Buffer, State, M, P, Q, V, H, <<>>) end. #+END_SRC
首先match_colon试图匹配':',如果没有,则进入wait_header,继续接收数据。 如果匹配上了,则进入parse_hd_name。 注意:M, P, Q, V, H都是已经解析出来的战果。 看一下wait_header如何接收数据的。
#+BEGIN_SRC erlang wait_header(Buffer, State=#state{socket=Socket, transport=Transport, until=Until}, M, P, Q, V, H) -> case recv(Socket, Transport, Until) of {ok, Data} -> parse_header(<< Buffer/binary, Data/binary >>, State, M, P, Q, V, H); {error, timeout} -> error_terminate(408, State); {error, _} -> terminate(State) end. #+END_SRC
尝试接收数据,一旦接收到一点数据就调用parse_header继续解析请求头。 如之前所说,如果匹配上了':' 进入解析parse_hd_name
状态机进入parse_hd_name
解析http请求头里的一行中':'前面的name。
#+BEGIN_SRC erlang parse_hd_name(<< C, Rest/bits >>, S, M, P, Q, V, H, SoFar) -> case C of $: -> parse_hd_before_value(Rest, S, M, P, Q, V, H, SoFar); $s -> parse_hd_name_ws(Rest, S, M, P, Q, V, H, SoFar); $t -> parse_hd_name_ws(Rest, S, M, P, Q, V, H, SoFar); ?INLINE_LOWERCASE(parse_hd_name, Rest, S, M, P, Q, V, H, SoFar) end. #+END_SRC
同样的,在匹配上了':'之后,对进入parse_hd_before_value,同时把SoFar传入,作为本次解析的成果。 举个例子,解析'Accept: text/plain' 此时的SoFar就是Accept。
#+BEGIN_SRC erlang parse_hd_before_value(Buffer, State=#state{ max_header_value_length=MaxLength}, M, P, Q, V, H, N) -> case match_eol(Buffer, 0) of nomatch when byte_size(Buffer) > MaxLength -> error_terminate(400, State); nomatch -> wait_hd_before_value(Buffer, State, M, P, Q, V, H, N); _ -> parse_hd_value(Buffer, State, M, P, Q, V, H, N, <<>>) end. #+END_SRC
这个时候要匹配是有一个CRLF了。如果没有,就需要等待wait_hd_before_value。如果有了CRLF,就进入解析'text/plain'
状态机进入parse_hd_value
解析http 请求头每一行的value部分。
#+BEGIN_SRC erlang parse_hd_value(<< $r, Rest/bits >>, S, M, P, Q, V, Headers, Name, SoFar) -> case Rest of << $n >> -> wait_hd_value_nl(<<>>, S, M, P, Q, V, Headers, Name, SoFar); << $n, C, Rest2/bits >> when C =:= $\s; C =:= $\t -> parse_hd_value(Rest2, S, M, P, Q, V, Headers, Name, << SoFar/binary, C >>); << $n, Rest2/bits >> -> parse_header(Rest2, S, M, P, Q, V, [{Name, SoFar}|Headers]) end; #+END_SRC
匹配到了CRLF,就解析完了一行,此时的{Name, SoFar}就是key和value。 把这个元组放到最后一个参数。然后,再次进入parse_header,继续解析剩下的头。
状态机再次进入parse_hader
因为请求头的格式是一样的,解析方式也是一样的。 所以这个地方再次递归进来。只是最后一个参数多了一个上一次解析出来的元组。
#+BEGIN_SRC parse_header(<< $r, $\n, Rest/bits >>, S, M, P, Q, V, Headers) -> request(Rest, S, M, P, Q, V, lists:reverse(Headers)); parse_header(Buffer, State=#state{max_header_name_length=MaxLength}, M, P, Q, V, H) -> case match_colon(Buffer, 0) of nomatch when byte_size(Buffer) > MaxLength -> error_terminate(400, State); nomatch -> wait_header(Buffer, State, M, P, Q, V, H); _ -> parse_hd_name(Buffer, State, M, P, Q, V, H, <<>>) end. #+END_SRC
如果,当前Buffer的数据是$r, $\n说明,http的请求头已经解析完毕。 进入请求的处理。
状态机进入request
#+BEGIN_SRC erlang request(Buffer, State=#state{socket=Socket, transport=Transport, req_keepalive=ReqKeepalive, max_keepalive=MaxKeepalive, compress=Compress, onresponse=OnResponse}, Method, Path, Query, Version, Headers, Host, Port) -> case Transport:peername(Socket) of {ok, Peer} -> Req = cowboy_req:new(Socket, Transport, Peer, Method, Path, Query, Version, Headers, Host, Port, Buffer, ReqKeepalive < MaxKeepalive, Compress, OnResponse), onrequest(Req, State); {error, _} -> %% Couldn't read the peer address; connection is gone. terminate(State) end. #+END_SRC
首先检查:Socket的状态,生成一个cowboy_req的对象,然后进入onrequest。 onrequest可以执行用户定义的处理函数。如果没有则执行默认的。执行的函数是execute。
#+BEGIN_SRC execute(Req, State, Env, [Middleware|Tail]) -> case Middleware:execute(Req, Env) of {ok, Req2, Env2} -> execute(Req2, State, Env2, Tail); {suspend, Module, Function, Args} -> erlang:hibernate(?MODULE, resume, [State, Env, Tail, Module, Function, Args]); {halt, Req2} -> next_request(Req2, State, ok); {error, Code, Req2} -> error_terminate(Code, Req2, State) end. #+END_SRC
可以看到Middleware|Tail,就是在init中初始化的。 Middlewares = get_value(middlewares, Opts, [cowboy_router, cowboy_handler]), 所以,这个时候会执行cowboy_router:execute。
#+BEGIN_SRC execute(Req, Env) -> {_, Dispatch} = lists:keyfind(dispatch, 1, Env), [Host, Path] = cowboy_req:get([host, path], Req), case match(Dispatch, Host, Path) of {ok, Handler, HandlerOpts, Bindings, HostInfo, PathInfo} -> Req2 = cowboy_req:set_bindings(HostInfo, PathInfo, Bindings, Req), {ok, Req2, [{handler, Handler}, {handler_opts, HandlerOpts}|Env]}; {error, notfound, host} -> {error, 400, Req}; {error, badrequest, path} -> {error, 400, Req}; {error, notfound, path} -> {error, 404, Req} end. #+END_SRC
cowboy_router:execute中,调用match对用户设置的转发规则进行匹配(如何匹配的后续文章介绍),返回对应的Handler。 回到cowboy_protocol:execute,接下来执行cowboy_handler:execute。真正的开始调用用户的处理函数了。
#+BEGIN_SRC erlang handler_init(Req, State, Handler, HandlerOpts) -> Transport = cowboy_req:get(transport, Req), try Handler:init({Transport:name(), http}, Req, HandlerOpts) of {ok, Req2, HandlerState} -> handler_handle(Req2, State, Handler, HandlerState); {loop, Req2, HandlerState} -> handler_after_callback(Req2, State, Handler, HandlerState); {loop, Req2, HandlerState, hibernate} -> handler_after_callback(Req2, State#state{hibernate=true}, Handler, HandlerState); {loop, Req2, HandlerState, Timeout} -> State2 = handler_loop_timeout(State#state{loop_timeout=Timeout}), handler_after_callback(Req2, State2, Handler, HandlerState); {loop, Req2, HandlerState, Timeout, hibernate} -> State2 = handler_loop_timeout(State#state{ hibernate=true, loop_timeout=Timeout}), handler_after_callback(Req2, State2, Handler, HandlerState); {shutdown, Req2, HandlerState} -> terminate_request(Req2, State, Handler, HandlerState, {normal, shutdown}); %% @todo {upgrade, transport, Module} {upgrade, protocol, Module} -> upgrade_protocol(Req, State, Handler, HandlerOpts, Module); {upgrade, protocol, Module, Req2, HandlerOpts2} -> upgrade_protocol(Req2, State, Handler, HandlerOpts2, Module) catch Class:Reason -> cowboy_req:maybe_reply(500, Req), erlang:Class([ {reason, Reason}, {mfa, {Handler, init, 3}}, {stacktrace, erlang:get_stacktrace()}, {req, cowboy_req:to_list(Req)}, {opts, HandlerOpts} ]) end. #+END_SRC
先调用init初始化,即toppage_handler:init,然后根据返回值进行处理。这里是直接调用handler_handle。
#+BEGIN_SRC erlang handler_handle(Req, State, Handler, HandlerState) -> try Handler:handle(Req, HandlerState) of {ok, Req2, HandlerState2} -> terminate_request(Req2, State, Handler, HandlerState2, {normal, shutdown}) catch Class:Reason -> cowboy_req:maybe_reply(500, Req), handler_terminate(Req, Handler, HandlerState, Reason), erlang:Class([ {reason, Reason}, {mfa, {Handler, handle, 2}}, {stacktrace, erlang:get_stacktrace()}, {req, cowboy_req:to_list(Req)}, {state, HandlerState} ]) end. #+END_SRC
终于开始执行toppage_handler:handle。
#+BEGIN_SRC erlang handle(Req, State) -> {ok, Req2} = cowboy_req:reply(200, [ {<<"content-type">>, <<"text/plain">>} ], <<"Hello world!">>, Req), {ok, Req2, State}. #+END_SRC
总结
第一部先到这里结束。主要跟踪了一条http请求所需要经过的模块。有些细节有待进一步分析:比如请求的reply,规则是如何快速匹配上的。