《cowboy 源代码分析第一部 (Erlang实现的http服务器)》

cowboy是基于ranch的http服务器。特点是功能强打(支持完整的http协议websocket,spdy等),简洁,轻量级。

项目的连接再这里 https://github.com/extend/cowboy。

首先回顾一下Http 协议

在shell下 curl http://127.0.0.1:8080, 同时用tcpdump抓包可以看到一个最简单的Http的Get请求。

GET / HTTP1.1 r\n
User-Agent: curl17.36.0 r\n
Host: 127.0.0.1:8080 r\n
Accept: text/plain r\n
r\n

http request由3部分组成:请求行,消息报头,请求正文。

1) 请求行:就是第一行,由http的方法开始,由空格分割。后面是url路径,http的版本。

2) 后面的User-Agent,Host,Accept都是消息报头。

3) Accept用于RESTFul。

4) 请求正文为空。

post请求

post请求用于请求表单,表单的内容就是消息正文。
POST /reg.jsp HTTP/ (CRLF)
Accept:image/gif,image/x-xbit,... (CRLF)
...
HOST:www.guet.edu.cn (CRLF)
Content-Length:22 (CRLF)
Connection:Keep-Alive (CRLF)
Cache-Control:no-cache (CRLF)
(CRLF)         //该CRLF表示消息报头已经结束,在此之前为消息报头
user=jeffrey&pwd=1234  //此行以下为提交的数据

cowboy的入口

%% hello_world_app.erl
    Dispatch = cowboy_router:compile([
     {'_', [
         {"/", toppage_handler, []}
     ]}
 ]),
 {ok, _} = cowboy:start_http(http, 100, [{port, 8080}], [
     {env, [{dispatch, Dispatch}]}
 ]),

用户需要提供一个url转发规则Dispatch。

然后启动cowboy:start_http

cowboy是如何启动的?

start_https(Ref, NbAcceptors, TransOpts, ProtoOpts)
     when is_integer(NbAcceptors), NbAcceptors > 0 ->
 ranch:start_listener(Ref, NbAcceptors,
     ranch_ssl, TransOpts, cowboy_protocol, ProtoOpts).

可以看到cowboy是运行在ranch上,所以需要提供一个回调函数:cowboy_protocol。所有http request的请求都是这个模块处理。

一旦客户端的浏览器和cowboy的服务器建立连接,ranch的acceptor连接池会把这个连接转交给一个cowboy_protocol的子进程。

而用户设置的dispatch是放在ProtoOpts里。

cowboy_protocol是如何启动的?

%% cowboy_protocol.erl
start_link(Ref, Socket, Transport, Opts) ->
 Pid = spawn_link(?MODULE, init, [Ref, Socket, Transport, Opts]),
 {ok, Pid}.

ranch启动子进程处理连接的时候,会调用模块的start_link函数,也就是cowboy_protocol:start_link。

进一步调用cowboy_protocol:init

init(Ref, Socket, Transport, Opts) ->
 Compress = get_value(compress, Opts, false),
 MaxEmptyLines = get_value(max_empty_lines, Opts, 5),
 MaxHeaderNameLength = get_value(max_header_name_length, Opts, 64),
 MaxHeaderValueLength = get_value(max_header_value_length, Opts, 4096),
 MaxHeaders = get_value(max_headers, Opts, 100),
 MaxKeepalive = get_value(max_keepalive, Opts, 100),
 MaxRequestLineLength = get_value(max_request_line_length, Opts, 4096),
 Middlewares = get_value(middlewares, Opts, [cowboy_router, cowboy_handler]),
 Env = [{listener, Ref}|get_value(env, Opts, [])],
 OnRequest = get_value(onrequest, Opts, undefined),
 OnResponse = get_value(onresponse, Opts, undefined),
 Timeout = get_value(timeout, Opts, 5000),
 ok = ranch:accept_ack(Ref),
 wait_request(<<>>, #state{socket=Socket, transport=Transport,
     middlewares=Middlewares, compress=Compress, env=Env,
     max_empty_lines=MaxEmptyLines, max_keepalive=MaxKeepalive,
     max_request_line_length=MaxRequestLineLength,
     max_header_name_length=MaxHeaderNameLength,
     max_header_value_length=MaxHeaderValueLength, max_headers=MaxHeaders,
     onrequest=OnRequest, onresponse=OnResponse,
     timeout=Timeout, until=until(Timeout)}, 0).

这个函数主要是获取一些限制的参数,比如:Header的最大长度。

Middlewares 是’中间层‘处理模块。这个模块的调用时机是解析完Http request。用户可以指定,默认值是cowboy_router和cowboy_handler。

MaxKeepalive 是保持连接的最长时间。

wait_request 一边接收数据一边解析。

注意,wait_request参数描述了它的状态。一个空的Buffer,一个State,和初始化为0的ReqEmpty。

开始解析Http Request

对于http服务器来说主要工作就是:解析http request,然后根据请求中的url,和用户设置的转发规则,调用相应的处理模块。最后,把处理结果返回。

接收数据

wait_request(Buffer, State=#state{socket=Socket, transport=Transport,
     until=Until}, ReqEmpty) ->
 case recv(Socket, Transport, Until) of
     {ok, Data} ->
         parse_request(<< Buffer/binary, Data/binary >>, State, ReqEmpty);
     {error, _} ->
         terminate(State)
 end.

wait_request 仅仅是接收数据。只要收到一点数据就调用parse_request进行解析。

并不是收到完整的Http Request Header才开始解析。这样收到一点数据就解析一点数据效率更好。

另外,注意:本次收到得数据Data是append在Buffer后面的。因为,流式分析可能一次收到的数据不是完整的一行,需要再次进入这个函数,接收数据。

状态机

整个Http Request Header的解析就是一个状态机。起点是parse_request,终点就是解析完毕调用用户的回调。

《cowboy 源代码分析第一部 (Erlang实现的http服务器)》

请求行的解析

开始解析http请求包的第一行,也就是请求行。

状态机起点:parse_request。

parse_request(Buffer, State=#state{max_request_line_length=MaxLength,
     max_empty_lines=MaxEmpty}, ReqEmpty) ->
 case match_eol(Buffer, 0) of
     nomatch when byte_size(Buffer) > MaxLength ->
         error_terminate(414, State);
     nomatch ->
         wait_request(Buffer, State, ReqEmpty);
     1 when ReqEmpty =:= MaxEmpty ->
         error_terminate(400, State);
     1 ->
         << _:16, Rest/binary >> = Buffer,
         parse_request(Rest, State, ReqEmpty + 1);
     _ ->
         parse_method(Buffer, State, <<>>)
 end.

Buffer是收到的数据,首先调用match_eol顺序遍历找换行$n的位置。

如果是nomatch,有两种情况:

1) nomatch when byte_size(Buffer) > MaxLength 长度大于最大长度,返回414.

2) nomatch, 这个时候需要再接收一点数据,再次进入wait_request。

如果返回1,说明是一个空行。

如果返回非nomach,非1,则说明目前Buffer里至少有完整一行的数据。

态机进入parse_method

这里的method就是http的get, put, delete, post。
parse_method(<< C, Rest/bits >>, State, SoFar) ->
             case C of
                     $r -> error_terminate(400, State);
                     $s -> parse_uri(Rest, State, SoFar);
                     _ -> parse_method(Rest, State, << SoFar/binary, C >>)
             end.
method和后面的uri的分鬲符是空格$s。
 parse_method解析的过程是:对参数进行'Head|Tail'方式绑定。然后判断Head是不是想要的。

1) 如果不是,则参数用Tail,把Header用最后一个参数保存起来。parse_method(Rest, State, << SoFar/binary, C >>)

2_ 如果匹配上了,则进入下一个阶段的解析。parse_uri(Rest, State, SoFar)。

状态机进入parse_uri

这里的uri就是请求资源的uri。
parse_uri(<< $r, _/bits >>, State, _) ->
            error_terminate(400, State);
    parse_uri(<< "# ", Rest/bits >>, State, Method) ->
            parse_version(Rest, State, Method, <<"#">>, <<>>);
    parse_uri(<< "http://", Rest/bits >>, State, Method) ->
            parse_uri_skip_host(Rest, State, Method);
    parse_uri(<< "https://", Rest/bits >>, State, Method) ->
            parse_uri_skip_host(Rest, State, Method);
    parse_uri(Buffer, State, Method) ->
            parse_uri_path(Buffer, State, Method, <<>>).
如果Http Request Header里面的uri部分是#,直接进入下一阶段的解析:parse_version ;
 如果是以'http://'或'https://'开头的,要跳过主机名:parse_uri_skip_host ;
 最后的情况就是正常的uri,调用parse_uri_path。
 注意:parse_uri的第3参数Method,是上一个状态parse_method的返回结果。
parse_uri_path(<< C, Rest/bits >>, State, Method, SoFar) ->
            case C of
                    $r -> error_terminate(400, State);
                    $s -> parse_version(Rest, State, Method, SoFar, <<>>);
                    $? -> parse_uri_query(Rest, State, Method, SoFar, <<>>);
                    $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<>>);
                    _ -> parse_uri_path(Rest, State, Method, << SoFar/binary, C >>)
            end.
parse_uri_path的方式和parse_method的方式一样:取出头一个字节,判断是否匹配上了$s。
 1) 如果没有匹配,把这个字节保存在最后一个参数SoFar后面。
 2) 如果匹配上了,进入下一阶段的解析。同时最后一个参数SoFar就是这个阶段的解析战果。

状态机进入parse_version

这里的version就是http的版本。
parse_version(<< "HTTP/1.1r\n", Rest/bits >>, S, M, P, Q) ->
             parse_header(Rest, S, M, P, Q, 'HTTP/1.1', []);
     parse_version(<< "HTTP/1.0r\n", Rest/bits >>, S, M, P, Q) ->
             parse_header(Rest, S, M, P, Q, 'HTTP/1.0', []);
     parse_version(_, State, _, _, _) ->
             error_terminate(505, State).
parse_version的方式是直接进行字符串匹配了。因为version要么是"HTTP/1.1r\n", 要么是"HTTP/1.0\r\n"。
 匹配上了后,进入下一阶段的解析,同时把本阶段的解析战果当作参数往后面传递。
 至此,已经解析完了一个完整的一行。

请求头的解析

前面几个状态已经解析完了http的请求行。
下面开始解析http的请求头。

状态机进入parser_header

请求头的格式是分号分割的kv。
#+BEGIN_SRC erlang
     parse_header(Buffer, State=#state{max_header_name_length=MaxLength},
        M, P, Q, V, H) ->
    case match_colon(Buffer, 0) of
        nomatch when byte_size(Buffer) > MaxLength ->
            error_terminate(400, State);
        nomatch ->
            wait_header(Buffer, State, M, P, Q, V, H);
        _ ->
            parse_hd_name(Buffer, State, M, P, Q, V, H, <<>>)
    end.
     #+END_SRC
首先match_colon试图匹配':',如果没有,则进入wait_header,继续接收数据。
 如果匹配上了,则进入parse_hd_name。
 注意:M, P, Q, V, H都是已经解析出来的战果。
 
 看一下wait_header如何接收数据的。
#+BEGIN_SRC erlang
     wait_header(Buffer, State=#state{socket=Socket, transport=Transport,
        until=Until}, M, P, Q, V, H) ->
    case recv(Socket, Transport, Until) of
        {ok, Data} ->
            parse_header(<< Buffer/binary, Data/binary >>,
                State, M, P, Q, V, H);
        {error, timeout} ->
            error_terminate(408, State);
        {error, _} ->
            terminate(State)
    end.
     #+END_SRC
尝试接收数据,一旦接收到一点数据就调用parse_header继续解析请求头。
 如之前所说,如果匹配上了':' 进入解析parse_hd_name

状态机进入parse_hd_name

解析http请求头里的一行中':'前面的name。
#+BEGIN_SRC erlang
     parse_hd_name(<< C, Rest/bits >>, S, M, P, Q, V, H, SoFar) ->
    case C of
        $: -> parse_hd_before_value(Rest, S, M, P, Q, V, H, SoFar);
        $s -> parse_hd_name_ws(Rest, S, M, P, Q, V, H, SoFar);
        $t -> parse_hd_name_ws(Rest, S, M, P, Q, V, H, SoFar);
        ?INLINE_LOWERCASE(parse_hd_name, Rest, S, M, P, Q, V, H, SoFar)
    end.
     #+END_SRC
同样的,在匹配上了':'之后,对进入parse_hd_before_value,同时把SoFar传入,作为本次解析的成果。
 举个例子,解析'Accept: text/plain'
 此时的SoFar就是Accept。
#+BEGIN_SRC erlang
     parse_hd_before_value(Buffer, State=#state{
        max_header_value_length=MaxLength}, M, P, Q, V, H, N) ->
    case match_eol(Buffer, 0) of
        nomatch when byte_size(Buffer) > MaxLength ->
            error_terminate(400, State);
        nomatch ->
            wait_hd_before_value(Buffer, State, M, P, Q, V, H, N);
        _ ->
            parse_hd_value(Buffer, State, M, P, Q, V, H, N, <<>>)
    end.
     #+END_SRC
这个时候要匹配是有一个CRLF了。如果没有,就需要等待wait_hd_before_value。如果有了CRLF,就进入解析'text/plain'

状态机进入parse_hd_value

解析http 请求头每一行的value部分。
#+BEGIN_SRC erlang
     parse_hd_value(<< $r, Rest/bits >>, S, M, P, Q, V, Headers, Name, SoFar) ->
    case Rest of
        << $n >> ->
            wait_hd_value_nl(<<>>, S, M, P, Q, V, Headers, Name, SoFar);
        << $n, C, Rest2/bits >> when C =:= $\s; C =:= $\t ->
            parse_hd_value(Rest2, S, M, P, Q, V, Headers, Name,
                << SoFar/binary, C >>);
        << $n, Rest2/bits >> ->
            parse_header(Rest2, S, M, P, Q, V, [{Name, SoFar}|Headers])
    end;
     #+END_SRC
匹配到了CRLF,就解析完了一行,此时的{Name, SoFar}就是key和value。
 把这个元组放到最后一个参数。然后,再次进入parse_header,继续解析剩下的头。

状态机再次进入parse_hader

因为请求头的格式是一样的,解析方式也是一样的。 所以这个地方再次递归进来。只是最后一个参数多了一个上一次解析出来的元组。
#+BEGIN_SRC
     parse_header(<< $r, $\n, Rest/bits >>, S, M, P, Q, V, Headers) ->
    request(Rest, S, M, P, Q, V, lists:reverse(Headers));
     parse_header(Buffer, State=#state{max_header_name_length=MaxLength},
        M, P, Q, V, H) ->
    case match_colon(Buffer, 0) of
        nomatch when byte_size(Buffer) > MaxLength ->
            error_terminate(400, State);
        nomatch ->
            wait_header(Buffer, State, M, P, Q, V, H);
        _ ->
            parse_hd_name(Buffer, State, M, P, Q, V, H, <<>>)
    end.
     #+END_SRC
如果,当前Buffer的数据是$r, $\n说明,http的请求头已经解析完毕。
 进入请求的处理。

状态机进入request

#+BEGIN_SRC erlang
     request(Buffer, State=#state{socket=Socket, transport=Transport,
        req_keepalive=ReqKeepalive, max_keepalive=MaxKeepalive,
        compress=Compress, onresponse=OnResponse},
        Method, Path, Query, Version, Headers, Host, Port) ->
    case Transport:peername(Socket) of
        {ok, Peer} ->
            Req = cowboy_req:new(Socket, Transport, Peer, Method, Path,
                Query, Version, Headers, Host, Port, Buffer,
                ReqKeepalive < MaxKeepalive, Compress, OnResponse),
            onrequest(Req, State);
        {error, _} -> %% Couldn't read the peer address; connection is gone.
            terminate(State)
    end.
     #+END_SRC
首先检查:Socket的状态,生成一个cowboy_req的对象,然后进入onrequest。
    onrequest可以执行用户定义的处理函数。如果没有则执行默认的。执行的函数是execute。
#+BEGIN_SRC
     execute(Req, State, Env, [Middleware|Tail]) ->
    case Middleware:execute(Req, Env) of
        {ok, Req2, Env2} ->
            execute(Req2, State, Env2, Tail);
        {suspend, Module, Function, Args} ->
            erlang:hibernate(?MODULE, resume,
                [State, Env, Tail, Module, Function, Args]);
        {halt, Req2} ->
            next_request(Req2, State, ok);
        {error, Code, Req2} ->
            error_terminate(Code, Req2, State)
    end.
     #+END_SRC
可以看到Middleware|Tail,就是在init中初始化的。
     Middlewares = get_value(middlewares, Opts, [cowboy_router, cowboy_handler]),
     所以,这个时候会执行cowboy_router:execute。
#+BEGIN_SRC
     execute(Req, Env) ->
    {_, Dispatch} = lists:keyfind(dispatch, 1, Env),
    [Host, Path] = cowboy_req:get([host, path], Req),
    case match(Dispatch, Host, Path) of
        {ok, Handler, HandlerOpts, Bindings, HostInfo, PathInfo} ->
            Req2 = cowboy_req:set_bindings(HostInfo, PathInfo, Bindings, Req),
            {ok, Req2, [{handler, Handler}, {handler_opts, HandlerOpts}|Env]};
        {error, notfound, host} ->
            {error, 400, Req};
        {error, badrequest, path} ->
            {error, 400, Req};
        {error, notfound, path} ->
            {error, 404, Req}
    end.
     #+END_SRC
cowboy_router:execute中,调用match对用户设置的转发规则进行匹配(如何匹配的后续文章介绍),返回对应的Handler。
    回到cowboy_protocol:execute,接下来执行cowboy_handler:execute。真正的开始调用用户的处理函数了。
#+BEGIN_SRC erlang
     handler_init(Req, State, Handler, HandlerOpts) ->
    Transport = cowboy_req:get(transport, Req),
    try Handler:init({Transport:name(), http}, Req, HandlerOpts) of
        {ok, Req2, HandlerState} ->
            handler_handle(Req2, State, Handler, HandlerState);
        {loop, Req2, HandlerState} ->
            handler_after_callback(Req2, State, Handler, HandlerState);
        {loop, Req2, HandlerState, hibernate} ->
            handler_after_callback(Req2, State#state{hibernate=true},
                Handler, HandlerState);
        {loop, Req2, HandlerState, Timeout} ->
            State2 = handler_loop_timeout(State#state{loop_timeout=Timeout}),
            handler_after_callback(Req2, State2, Handler, HandlerState);
        {loop, Req2, HandlerState, Timeout, hibernate} ->
            State2 = handler_loop_timeout(State#state{
                hibernate=true, loop_timeout=Timeout}),
            handler_after_callback(Req2, State2, Handler, HandlerState);
        {shutdown, Req2, HandlerState} ->
            terminate_request(Req2, State, Handler, HandlerState,
                {normal, shutdown});
        %% @todo {upgrade, transport, Module}
        {upgrade, protocol, Module} ->
            upgrade_protocol(Req, State, Handler, HandlerOpts, Module);
        {upgrade, protocol, Module, Req2, HandlerOpts2} ->
            upgrade_protocol(Req2, State, Handler, HandlerOpts2, Module)
    catch Class:Reason ->
        cowboy_req:maybe_reply(500, Req),
        erlang:Class([
            {reason, Reason},
            {mfa, {Handler, init, 3}},
            {stacktrace, erlang:get_stacktrace()},
            {req, cowboy_req:to_list(Req)},
            {opts, HandlerOpts}
        ])
    end.
     #+END_SRC
先调用init初始化,即toppage_handler:init,然后根据返回值进行处理。这里是直接调用handler_handle。
#+BEGIN_SRC erlang
     handler_handle(Req, State, Handler, HandlerState) ->
    try Handler:handle(Req, HandlerState) of
        {ok, Req2, HandlerState2} ->
            terminate_request(Req2, State, Handler, HandlerState2,
                {normal, shutdown})
    catch Class:Reason ->
        cowboy_req:maybe_reply(500, Req),
        handler_terminate(Req, Handler, HandlerState, Reason),
        erlang:Class([
            {reason, Reason},
            {mfa, {Handler, handle, 2}},
            {stacktrace, erlang:get_stacktrace()},
            {req, cowboy_req:to_list(Req)},
            {state, HandlerState}
        ])
    end.
     #+END_SRC
终于开始执行toppage_handler:handle。
#+BEGIN_SRC erlang
     handle(Req, State) ->
    {ok, Req2} = cowboy_req:reply(200, [
        {<<"content-type">>, <<"text/plain">>}
    ], <<"Hello world!">>, Req),
    {ok, Req2, State}.
     #+END_SRC

总结

第一部先到这里结束。主要跟踪了一条http请求所需要经过的模块。有些细节有待进一步分析:比如请求的reply,规则是如何快速匹配上的。
上一篇:【我的Android进阶之旅】Android 源代码中的Java代码中//$NON-NLS-1$ 注释是什么意思?


下一篇:云开发初探 —— 更简便的小程序开发模式