简介
之前对比了cowboy的ranch网络模型和mochiweb的网络模型,最近需要优化重构aegis的网络模型,正好把霸爷推荐的hotwheel项目初步阅读一下。
hotwheel是就是一个消息中间件,类似于zeromq,但是仅仅是一个toy级别的项目。和zeromq的强大还是不能比的。但是zeromq和erlang的另一个开源项目RabbitMQ又相形见绌。
hotwheel进程模型
阅读OTP项目毫无疑问要从_app.erl文件开始。
janus_app启动4个进程。(什么?janus不是钢铁侠里面的超级智能的机器人吗!)
设计思路
hotwheel的一个sub连接在server上会spawn一个transport进程和client_proxy进程。 一个pub连接在server上会spawn一个pubsub进程。一个pubsub进程代表了一个topic。 pubsub进程的ets里存储了所有订阅该topic的client_proxy进程。 当一个topic频道上要发布一条消息的时候,该topic对应的pubsub进程会遍历ets里所有的client_proxy进程,发这条消息发送给client_proxy。 然后由client_proxy负责把消息通过自己的transport进程发送到网络上。 一个topic上进程数目 = N * 2 + 1 N是一个topic频道上是的publisher和subscriber数目之和(publisher和subscriber都有一个client_proxy, transport),1是一个pubsub进程。
janus_sup
实际上启动的是janus_acceptor.erl模块:
{janus_sup, {janus_acceptor, start_link, [self(), Port, Module]}, permanent, 2000, worker, [janus_acceptor] }
hotwheel的网络模型就这个模块里面。
janus_acceptor自己维护了网络相关的细节从listen到accept
gen_tcp:listen gen_tcp:accept
在收到新的连接使用了gen_tcp:controlling_process,标准的作法是:
a) 先启动一个网络层模块transport进程(负责收发数据)。
b) gen_tcp:controlling_process把新连接的socket转手给transport进程。
c) 给transport发消息,告诉transport进程“socket已经成功的交给你了,你可以开始干活了。”
b) 和 c)是在同步transport进程。transport进程流程继续往下走必须要收到c)的消息。
代码如下:
handle_connection(State, Socket) -> {ok, Pid} = janus_app:start_transport(State#state.port), ok = gen_tcp:controlling_process(Socket, Pid), (State#state.module):set_socket(Pid, Socket).
janus_transport_sup
实际上启动的是transport.erl模块,simple_one_for_one动态增加:
{janus_transport_sup, {supervisor, start_link, [{local, janus_transport_sup}, ?MODULE, [Module]]}, permanent, infinity, supervisor, [] }
transport最重要的工作是接收tcp数据,发数据发到tcp协议上。
handle_info({tcp, Socket, <<"<regular-socket/>", 0, Bin/binary>>}, State) when Socket == State#state.socket -> inet:setopts(Socket, [{active, once}]), dispatch(Bin, janus_flash, State);
分析一下这段代码:
收到数据后把sock继续设置为once ( inet:setopts(Socket, [{active, once}]) )。
这段代码和上面的controlling_process构成了erlang中经典的处理网络连接的方式之一。
另一种经典的思想是接收进程即是服务进程。spawn一个新进程接收连接,老的接收链接进程变成了服务进程。在mochiweb有用到,另外Joe Armstrong的著作中也以这种方式举例。
优缺点:
使用controlling_process方式有一个同步点,在controlling_process成功前,如果开始收数据比如gen_tcp:recv,或者设置{active, once},数据会被传给接收进程,而不是服务进程。
使用老进程做为服务进程,不利于模块的抽象。原因是:新spawn出来的进程需要提供接收新连接的代码逻辑,而老的接收连接的进程变成了服务被成功accept了的连接。这样业务代码和网络代码冗杂在了一起。
另外,这段代码比较优雅的地方是,把对数据的处理放到了一个dispatch函数里,进而封装了Mod模块里,这样方便Mod模块的热升级,达到支持热升级协议处理模块的目的。
dispatch(Data, Mod, State = #state{transport = Mod}) -> {ok, Keep, TS} = Mod:process(Data, State#state.state), keep_alive_or_close(Keep, State#state{state = TS}).
janus_topman_sup
实际上启动的是topman.erl模块。
{janus_topman_sup, {topman, start, []}, permanent, 2000, worker, [topman] }
topman模块是hotwheel的核心,subscirbe 和 publish topic的时候都要经过topman模块。
handle_cast({publish, Msg, Topic}, State) -> {Srv, State1} = ensure_server(Topic, State), pubsub:publish(Srv, Msg), {noreply, State1};
topman的dict里面存储了所有pubsub进程和topic之间的映射关系。 向一个topic上publish消息:ensure_server函数在topman的dict里面找是否存在给这个topic服务的pusub进程。 如果没有,则spawn一个pubsub进程,并纪录topic和这个pubsub进程之间的映射关系。 如果找到了对应的pubsub进程,则把这个消息直接推送给pubsub进程。 pubsub进程,遍历自己的ets表,把消息推给所有的client_proxy进程。
handle_cast({subscribe, Pid, Topic}, State) -> {Srv, State1} = ensure_server(Topic, State), pubsub:subscribe(Srv, Pid), {noreply, State1};
subscribe一个topic:首先ensure_server确保topic有对应的pubsub进程,然后把这个事件推给pubsub进程。pubsub进程会纪录topic到Pid的映射。
pubsub进程
当pub一个topic的时候,由topman里的ensure_server函数启动一个pubsub进程。 一个pubsub进程代表了一个topic。
subsribe
handle_cast({subscribe, Pid}, State) -> %% automatically unsubscribe when dead Ref = erlang:monitor(process, Pid), Pid ! ack, ets:insert(State#state.subs, {Pid, Ref}), {noreply, State};
Pid是订阅该topic的client_proxy进程的pid,把Pid存入ets表中。
publish
handle_cast({publish, Msg}, State) -> {struct, L} = Msg, TS = tuple_to_list(Start), JSON = {struct, [{<<"timestamp">>, TS}|L]}, Msg1 = {message, iolist_to_binary(mochijson2:encode(JSON))}, F = fun({Pid, _Ref}, _) -> Pid ! Msg1 end, erlang:process_flag(priority, high), ets:foldr(F, ignore, State#state.subs), erlang:process_flag(priority, normal), {noreply, State};
构造消息体,然后遍历ets表,把Msg以erlang消息的形式发送给所有的client_proxy进程。
总结
两个点需要总结:
a) hotwheel使用经典的网络模型:
一个acceptorloop进程+N个worker进程
controlling_process只是由于erlang的特殊性为这种模型服务而已。
b) hotwheel进程模型
client_proxy进程代理了一个subscribe客户端的所有细节。 pubsub进程代理了一个topic所有细节。
一个erlang模块封装了完整的功能,不同模块间通过erlang消息互通有无。从这个角度看,谁说不是面向对象呢?