《hotwheel 网络模型和进程模型》

简介

之前对比了cowboy的ranch网络模型和mochiweb的网络模型,最近需要优化重构aegis的网络模型,正好把霸爷推荐的hotwheel项目初步阅读一下。

hotwheel是就是一个消息中间件,类似于zeromq,但是仅仅是一个toy级别的项目。和zeromq的强大还是不能比的。但是zeromq和erlang的另一个开源项目RabbitMQ又相形见绌。

hotwheel进程模型

阅读OTP项目毫无疑问要从_app.erl文件开始。

janus_app启动4个进程。(什么?janus不是钢铁侠里面的超级智能的机器人吗!)

《hotwheel 网络模型和进程模型》

设计思路

hotwheel的一个sub连接在server上会spawn一个transport进程和client_proxy进程。
一个pub连接在server上会spawn一个pubsub进程。一个pubsub进程代表了一个topic。
pubsub进程的ets里存储了所有订阅该topic的client_proxy进程。
当一个topic频道上要发布一条消息的时候,该topic对应的pubsub进程会遍历ets里所有的client_proxy进程,发这条消息发送给client_proxy。
然后由client_proxy负责把消息通过自己的transport进程发送到网络上。
一个topic上进程数目 = N * 2 + 1
N是一个topic频道上是的publisher和subscriber数目之和(publisher和subscriber都有一个client_proxy, transport),1是一个pubsub进程。

janus_sup

实际上启动的是janus_acceptor.erl模块:
{janus_sup,
        {janus_acceptor, start_link, [self(), Port, Module]},
        permanent,
        2000,
        worker,
        [janus_acceptor]
       }

hotwheel的网络模型就这个模块里面。

janus_acceptor自己维护了网络相关的细节从listen到accept

gen_tcp:listen
   gen_tcp:accept

在收到新的连接使用了gen_tcp:controlling_process,标准的作法是:

a) 先启动一个网络层模块transport进程(负责收发数据)。

b) gen_tcp:controlling_process把新连接的socket转手给transport进程。

c) 给transport发消息,告诉transport进程“socket已经成功的交给你了,你可以开始干活了。”

b) 和 c)是在同步transport进程。transport进程流程继续往下走必须要收到c)的消息。

代码如下:

handle_connection(State, Socket) ->
      {ok, Pid} = janus_app:start_transport(State#state.port),
      ok = gen_tcp:controlling_process(Socket, Pid),
      (State#state.module):set_socket(Pid, Socket).

janus_transport_sup

实际上启动的是transport.erl模块,simple_one_for_one动态增加:
{janus_transport_sup,
        {supervisor, start_link, [{local, janus_transport_sup}, 
                                  ?MODULE, [Module]]},
        permanent,
        infinity,
        supervisor,
        []
    }

transport最重要的工作是接收tcp数据,发数据发到tcp协议上。

handle_info({tcp, Socket, <<"<regular-socket/>", 0, Bin/binary>>}, State)
     when Socket == State#state.socket ->
       inet:setopts(Socket, [{active, once}]),
       dispatch(Bin, janus_flash, State);

分析一下这段代码:

收到数据后把sock继续设置为once ( inet:setopts(Socket, [{active, once}]) )。

这段代码和上面的controlling_process构成了erlang中经典的处理网络连接的方式之一。

另一种经典的思想是接收进程即是服务进程。spawn一个新进程接收连接,老的接收链接进程变成了服务进程。在mochiweb有用到,另外Joe Armstrong的著作中也以这种方式举例。

优缺点:

使用controlling_process方式有一个同步点,在controlling_process成功前,如果开始收数据比如gen_tcp:recv,或者设置{active, once},数据会被传给接收进程,而不是服务进程。

使用老进程做为服务进程,不利于模块的抽象。原因是:新spawn出来的进程需要提供接收新连接的代码逻辑,而老的接收连接的进程变成了服务被成功accept了的连接。这样业务代码和网络代码冗杂在了一起。

另外,这段代码比较优雅的地方是,把对数据的处理放到了一个dispatch函数里,进而封装了Mod模块里,这样方便Mod模块的热升级,达到支持热升级协议处理模块的目的。

dispatch(Data, Mod, State = #state{transport = Mod}) ->
      {ok, Keep, TS} = Mod:process(Data, State#state.state),
      keep_alive_or_close(Keep, State#state{state = TS}).

janus_topman_sup

实际上启动的是topman.erl模块。
{janus_topman_sup,
        {topman, start, []},
        permanent,
        2000,
        worker,
        [topman]
       }
topman模块是hotwheel的核心,subscirbe 和 publish topic的时候都要经过topman模块。
handle_cast({publish, Msg, Topic}, State) ->
        {Srv, State1} = ensure_server(Topic, State),
    pubsub:publish(Srv, Msg),
    {noreply, State1};
topman的dict里面存储了所有pubsub进程和topic之间的映射关系。
向一个topic上publish消息:ensure_server函数在topman的dict里面找是否存在给这个topic服务的pusub进程。
    如果没有,则spawn一个pubsub进程,并纪录topic和这个pubsub进程之间的映射关系。
如果找到了对应的pubsub进程,则把这个消息直接推送给pubsub进程。
pubsub进程,遍历自己的ets表,把消息推给所有的client_proxy进程。
handle_cast({subscribe, Pid, Topic}, State) ->
        {Srv, State1} = ensure_server(Topic, State),
    pubsub:subscribe(Srv, Pid),
    {noreply, State1};
subscribe一个topic:首先ensure_server确保topic有对应的pubsub进程,然后把这个事件推给pubsub进程。pubsub进程会纪录topic到Pid的映射。

pubsub进程

当pub一个topic的时候,由topman里的ensure_server函数启动一个pubsub进程。
一个pubsub进程代表了一个topic。

subsribe

handle_cast({subscribe, Pid}, State) ->
         %% automatically unsubscribe when dead
        Ref = erlang:monitor(process, Pid),
    Pid ! ack,
    ets:insert(State#state.subs, {Pid, Ref}),
    {noreply, State};
Pid是订阅该topic的client_proxy进程的pid,把Pid存入ets表中。

publish

handle_cast({publish, Msg}, State) ->
         {struct, L} = Msg,
     TS = tuple_to_list(Start),
     JSON = {struct, [{<<"timestamp">>, TS}|L]},
     Msg1 = {message, iolist_to_binary(mochijson2:encode(JSON))},
     F = fun({Pid, _Ref}, _) -> Pid ! Msg1 end,
     erlang:process_flag(priority, high),
     ets:foldr(F, ignore, State#state.subs),
     erlang:process_flag(priority, normal),
     {noreply, State};
构造消息体,然后遍历ets表,把Msg以erlang消息的形式发送给所有的client_proxy进程。

总结

两个点需要总结:

a) hotwheel使用经典的网络模型:

一个acceptorloop进程+N个worker进程

controlling_process只是由于erlang的特殊性为这种模型服务而已。

b) hotwheel进程模型

client_proxy进程代理了一个subscribe客户端的所有细节。
       pubsub进程代理了一个topic所有细节。

一个erlang模块封装了完整的功能,不同模块间通过erlang消息互通有无。从这个角度看,谁说不是面向对象呢?

上一篇:内核代码阅读(2) - 内核中的C语言和汇编


下一篇:内核代码阅读(13) - sys_mmap