最新版本的cowboy已经把和网络相关的逻辑放到了ranch的代码里。ranch是一个类似ANET的网络库. 它的设计目标是高并发,低延迟,模块化。
之前一直在使用C++,最近学习了erlang,对比一下这两门不同思路的语言
1) 语言底层:最显着的不同是erlang是解释型动态语言,C++是静态的。 2) 语言功能:Erlang是函数式和轻量级进程(要彻底理解函数式最直接的就是通读SICP)。变量不可变。所以,函数是没有状态的,状态体现在函数的参数上。而C++可以修改一切内存. 另外轻量级进程:进程是用户态级别的结构体。所以进程的创建和切换代价很小。而C++的pthread对应系统的一个线程. 3) 设计范式:个人理解,从代码上erlang中的一个模块对应C++的一个类,都是封装了对一个实体的操作. 不同的是C++的操作是和对象绑定的(编译器自动把对象的指针作为成员函数的第一个参数),而erlang是手动把要操作的实体作为第一个参数。 erlang中的进程对应到C++的一份对象; erlang中的一个消息对应c++的一次对象的函数调用; 由于erlang中的进程是廉价的,所以有着和C++完全不同的面向进程的编程范式。因此OTP抽象出了’server模式‘,’监督模式‘,’状态机模式‘等模式。 而C++的23种设计模式本身就是基于单线程的思路。 举个例子,在C++中调用成员函数 `A a; a.f();' 被编译器翻译成 `A::f(&a);'。 而erlang中调用模块的函数修改对象的状态,要这样做 `A:f(Ref)',Ref是要修改的对象,A:f是修改对象的方法。
一个例子 hello,world!
ranch的源码自带了一个tcp_echo的例子
1) 要使用ranch首先要启动ranch,application:start(ranch)。 2) 提供一个回调模块echo_protocol。 3) 启动ranch监听ranch:start_ranch_acceptors_suplistener。
深入代码
从application:start(ranch)开始
ranch_sup监督者会启动一个ranch_server,负责维护配置信息,低层存储用ets。ranch_server是gen_server模式,接收各种消息。 仔细察看ranch_server.erl。这个模块提供了操作ranch_server的各种函数。 这个模块文件中所有被export出来的函数,就是对外部的接口,即外部模块可以通过调用这些模块改变’对象‘的状态。对象就是ranch_server进程。对象的指针就是每个函数的第一个参数。 举个例子 函数`set_port(Ref, Port)'修改Ref指向对象的port。
用户的代码是从 ranch:start_listener开始
用户需要指定网络参数
{ok, _} = ranch:start_listener(tcp_echo, 1, ranch_tcp, [{port, 5555}], echo_protocol, []),
注意:整个ranch对外面的接口全部在ranch.erl中。这里通过调用ranch:start_listener启动监听,同时提供一个数据到达后的回调函数。 tcp_echo:标志ranch之上的应用。所有的操作都要把tcp_echo带上。 ranch_tcp:ranch提供tcp和ssl两种协议。实现多态. [{port, 5555}]:tcp的参数。 echo_protocol:回调函数。
ranch启动服务
Res = supervisor:start_child(ranch_sup, child_spec(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts)),
注意:此时代码还是运行在tcp_echo的进程中。 ranch:start_listener在ranch_sup的监督树中添加一个ranch_listener_sup的监督进程。进程的描述由函数 `child_spec` 产生。
{{ranch_listener_sup, Ref}, {ranch_listener_sup, start_link, [ Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts ]}, permanent, 5000, supervisor, [ranch_listener_sup]}.
到这里tcp_echo_app返回。下面的工作交给ranch内部的相关模块。
ranch_listener_sup 监督者
这个模块是ranch中最重要的模块了。acceptors 池的管理,conns的管理都是该模块负责.
ranch_listener_sup 会启动两个监督者:ranch_conns_sup 和 ranch_acceptors_sup。
init({Ref, NbAcceptors, Transport, TransOpts, Protocol}) -> ChildSpecs = [ %% conns_sup {ranch_conns_sup, {ranch_conns_sup, start_link, [Ref, Transport, Protocol]}, permanent, infinity, supervisor, [ranch_conns_sup]}, %% acceptors_sup {ranch_acceptors_sup, {ranch_acceptors_sup, start_link, [Ref, NbAcceptors, Transport, TransOpts] }, permanent, infinity, supervisor, [ranch_acceptors_sup]} ], {ok, {{rest_for_one, 10, 10}, ChildSpecs}}.
注意:一定要把ranch_conns_sup放在前面。监督者行为模式会根据顺序启动.
ranch_conns_sup监督者
起的是监督者的作用,监控处理连接的进程 (回调函数运行的进程)。但并没有实现 supervisor 模式,因为这个监督者做的事情并不是简单的启动子进程。还要做检查子进程的状态,处理最大连接数等。
start_link(Ref, Transport, Protocol) -> proc_lib:start_link(?MODULE, init, [self(), Ref, Transport, Protocol]).
启动一个子进程,执行init函数。init函数初始化loop的状态。
%% CurConns -> 当前连接数目 %% MaxConns -> 最大连接数 %% conns sup 进程一直循环使用 receive 接收消息 loop(State=#state{parent=Parent, ref=Ref, transport=Transport, protocol=Protocol, opts=Opts, max_conns=MaxConns}, CurConns, NbChildren, Sleepers) -> receive %% ranch acceptor 获取 client socket 后,发送消息给 conns sup 进程 %% 此时 client socket 控制进程为 conns sup {?MODULE, start_protocol, To, Socket} -> %% 启动新进程,子进程就是最开始的地方使用者指定的回调 echo_protocol:start_link case Protocol:start_link(Ref, Socket, Transport, Opts) of {ok, Pid} -> %% 把socket的控制权从当前进程转给echo_protocol子进程,只有这样它才能收到网络数据。 %% 注意:这个时候子进程是阻塞住的,这是必须的!因为这个时候子进程还没有控制权,收不到数据。 Transport:controlling_process(Socket, Pid), %% 给子进程发送消息, 唤醒子进程。 Pid ! {shoot, Ref}, put(Pid, true), CurConns2 = CurConns + 1, %% 如果还没有到达最大的连接数目 if CurConns2 < MaxConns -> %% 给 acceptor 进程发送消息,此时acceptor进程阻塞在start_protocol函数的receive To ! self(), loop(State, CurConns2, NbChildren + 1, Sleepers); true -> %% 如果大于最大连接数,当前 acceptor 进入休眠队列(acceptor进程一直在等待消息) %% 注意:Sleeper多了一个acceptor进程。Erlang就是通过不停的更改参数,来改变进程的状态。 loop(State, CurConns2, NbChildren + 1, [To|Sleepers]) end; end; %% echo_protocol子进程exit时,发送 EXIT 新号给连接的进程,也就是 conns sup %% 通过设置trap_exit,来捕获退出消息。 %% 从Sleeper的头部取出一个acceptor,给它发送消息,让它唤醒,继续接受新的连接。 {'EXIT', Pid, _} -> erase(Pid), [To|Sleepers2] = Sleepers, To ! self(), loop(State, CurConns - 1, NbChildren - 1, Sleepers2); end.
一个简化版的loop函数。列出了两个最重要的消息类型。 1) start_protocol消息。这个消息的发送由调用本模块的start_protocol函数产生。在ranch_acceptor模块中,当有一个新的连接进来后调用该函数.
start_protocol(SupPid, Socket) -> SupPid ! {?MODULE, start_protocol, self(), Socket}, %% 这个地方ranch_acceptor会同步的等在这个地方 !!! receive SupPid -> ok end.
这个消息会使ranch_conns_sup启动一个子进程 ‘Protocol:start_link(Ref, Socket, Transport, Opts)’。Protocol就是使用者最初提供的一个回调。 2) ‘EXIT’消息, 通过设置 process_flag(trap_exit, true) 可以捕获到子进程的退出消息。并且唤醒一个阻塞的acceptor进程。
ranch_acceptors_sup监督者
执行accept进程的监督者,accept进程的个数是由NbAcceptors控制。 首先使用模块Transport:listen监听端口。 注意:Transport是一个变量,也就是说Erlang可以通过这种方式实现多态。 然后把conns_sup监督者的pid和listen返回的socket传给每一个acceptor子进程。
%% 参数:(tcp_echo, 1, ranch_tcp, [{port, 5555}]) init([Ref, NbAcceptors, Transport, TransOpts]) -> {ok, Socket} = Transport:listen(TransOpts), %% acceptor 池配置的多大,就有多少 ranch_acceptor Procs = [ {{acceptor, self(), N}, {ranch_acceptor, start_link, [ LSocket, Transport, ConnsSup ]}, permanent, brutal_kill, worker, []} || N <- lists:seq(1, NbAcceptors)], {ok, {{one_for_one, 10, 10}, Procs}}. ``` #### ranch_acceptor进程 ranch_acceptor的个数和Nbacceptors一样。 Erlang中可以多个进程同时accept一个socket。 ``` loop(LSocket, Transport, ConnsSup) -> _ = case Transport:accept(LSocket, infinity) of {ok, CSocket} -> %% 获取的连接的socket控制权转交给conns_sup进程,conns_sup再把控制权转交给它的子进程。 Transport:controlling_process(CSocket, ConnsSup), %% 同步的调用,需要等待 conns sup 进程 生成echo_protocol子进程后,返回 self() 为止。 ranch_conns_sup:start_protocol(ConnsSup, CSocket); {error, Reason} when Reason =/= closed -> ok end, ?MODULE:loop(LSocket, Transport, ConnsSup)
比较一下ranch和anet
1) ranch和anet都是高并发的网络库; ranch可以处理海量链接, anet可以处理高吞吐量, 低延迟;
2) 使用的时候都要提供一个回调函数;
3) anet封装了packet, 而ranch提供的还是流服务. 这是应用场景决定的: anet面向的内网, 内网一般是基于特定的业务, 而业务的数据是有结构的. ranch既可以面向内网,也可以面向外网. 而有的应用场景本身就是流式的, 比如http协议. ranch可以不需要等待一个http请求头部到达后再处理, 可以一边接收http请求头一遍解析http的状态机. 当然, 可以等到全部的http请求头到达后在一次性解析. 但是这样处理对于Erlang来说效率是慢的. 对于anet来说效率是快的. 使用者可以在回调中实现自己的协议, 使得ranch支持有结构的数据包.
4) anet中的超时是使用了一个timeout 线程, 每隔一段时间扫面一遍所有的链接最后一次收到数据的时间. 而ranch的超时可以在回调中receive设置超时,很自然的支持.
5) anet支持同一个链接复用不用的请求,通过channelID实现. client端每次发送包的时候都要动态生成一个包的channelID, 并且把<channelID, handler>插入到hashtable中. 发送给server的数据包头部带上这个channeldID. server端返回的时候把channelID 原值返回给client端. client端通过channelID 找到对应的handler. 而Erlang中处理并发的思路就是直接一个进程. 一个请求对应一个进程.