Erlang 简介
Erlang 是由爱立信开发的一个函数式编程语言,主要用于并发编程,服务器架构等用途,Erlang得名于丹麦数学家及统计学家Agner Krarup Erlang,同时Erlang还可以表示Ericsson Language。
由于Erlang是函数式编程语言,其中一个最明显的特征就是没有循环,所有的循环都得通过递归(recurssive)的方式来实现。 Erlang无法定义一个变量, 其变量赋值方式类似于python,但是一个变量无法赋值两次,虽然条件很苛刻, 但是对比其他函数式编程语言,比如Haskell、Prolog 等还是要友好许多的。
题目简介 (UCPH 2021 AP Re-exam)
你的目的就是建立一个builder,其作用是给定一个plan, 你可以build这个plan,同时返回结果。 要求build的过程可以并发,既可以同时build多个plan,同时也可以多个builder来同时工作。
具体的, 一个plan可以分为很多个子类型,每个子类型又可以包含多个plan (嵌套结构):
同时这个module还要提供一下接口:
Erlang 基本语法
Erlang用来表示代码所属层级关系的是标点符号,其中 , 表示同级, 。 表示这一层级的结束,; 表示在同样的层级关系中。
Erlang 的文件开始需要有一个与文件名字同样的标识符:
-module(builder).
同时将所有的函数API放置到export接口中:
-export( [startup/0 , register_action/3 , build/2 , on_completion/2 , status/1 , finish/1]).
Erlang保持高并发性的方法就是通过 PID = spawn_link() 的方法来创建一个独立的并发进程,其中PID就是创建的新线程的地址,譬如:
startup() -> PID = spawn_link(fun() -> buildLoop([]) end), {ok, PID}.
{ok, PID} 是这个函数的返回值,而 buildLoop( [ ] ) 则是开启线程的函数名字。
在一个函数中我们可以通过进程的 PID 来进行消息的传递和发送:
register_action(Builder, Act, Fun) -> Builder ! {self(), {registerAction, Act, Fun}}, receive {_From, Msg} -> Msg end.
其中 !为发送符号,代表向Builder标识的进程发送后面tuple元素标记的消息。 receive 代码块为接收消息的代码块,负责接收发送给所处进程的消息。
在Erlang里面,所有小写字母开始的为atom, 为不可变量,同时atom可以作为函数名;而大写字母开始的是一般变量名; 对于那些定义过但是不会使用的变量,会在变量名字面前加下划线来特定标识。
对于一个服务器进程,一般的写法是:
buildLoop(ActFun) -> receive {From, {registerAction, Act, Fun}} -> case checkActExists(Act, ActFun) of true -> From ! {self(), {error, already_defined}}, NewActFun = ActFun; false -> NewActFun = ActFun ++ [{Act, Fun}], From ! {self(), {ok}} end, buildLoop(NewActFun); {From, {startBuild, Plan}} -> ResPID = spawn_link(fun() -> resultPool(Plan, ActFun, [], [], 0) end), ResPID ! {self(), {run}}, From ! {self(), {ok, ResPID}}, buildLoop(ActFun) end.
其中不同的block末尾采用了尾递归的技术,将服务器(循环)的状态保存了下来作为下次循环(新状态)的开始。
Erlang也提供了一些常用的函数接口,例如,对于一个tuple列表,要查找在tuple指定位置上匹配的那个元素,可以通过下面的方式实现,其函数调用方式与python还是类似的:
checkActExists(Act, ActFun) -> case lists:keyfind(Act, 1, ActFun) of false -> false; _Others -> true end.
Erlang有专门的代码结构框架叫做gen_server,其中所有的消息都是通过gen_server内置的函数来更新到云端,一个称之为状态(state) 的容器里。 gen_server的精髓就是状态的设计,gen_server有自己的报错系统, 可以很快速定位到自己的问题所在,当然这里不方便使用gen_server,因为我们要求同时并发跑好多的builder, 而gen_server无法支持同时跑很多个gen_sevrer, 所有我们之类选择最简单的spawn_link()。
题目解法
这道题目的难点在于处理嵌套plan的手段,由于嵌套plan是需要并发性的,那么就无法通过简单的值传递来处理汇总结果信息,我们需要并发的开启多个线程,同时用单独的监视线程来汇总结果。
置顶向下的看,首先我们开启一个buildLoop()之后, 当其收到开始build的消息就开启另一个resultPool(),这个循环专门负责处理关于plan执行结果的查询,同时开启plan的执行。
{_From, {run}} -> Me = self(), Res = spawnReceiveforBuildPlan(Me, ActFun, Plan), NewRes = runCompFunction(Res, CompletionFunc), io:format("Final Res is ~w\n", [NewRes]), resultPool(Plan, ActFun, CompletionFunc, NewRes, OngoingNum);
spawnReceiveforBuildPlan()为外调函数,本质上是可以写进上述block中的,代码如下:
spawnReceiveforBuildPlan(PrimarySid, ActFun, Plan) -> TopSupervisorPid = spawn_link(fun() -> topSupervisor(PrimarySid, [], 0, ActFun, Plan) end), TopSupervisorPid ! {self(), {start, backup}}, receive {_From, {finishPlan, Response}} -> case Response of [_Val] -> Response; _Others -> Response end end.
其中TopSupervisor用以执行汇总最后的结果,将其单独拿出来以避免之后嵌套plan带来的进程消息混乱,代码如下:
topSupervisor(Sid, Res, GoingNum, ActFun, Plan) -> receive {From, {start, backup}} -> Me = self(), Pid = spawn_link(fun() -> spawnSubBuilds(Sid, Me, ActFun, Plan) end), %io:format("spawnSubbuilds Pid is ~w, supervisor pid is ~w; ", [Pid, self()]), topSupervisor(From, Res, GoingNum, ActFun, Plan); {_From, {updateResult, NewRes}} -> Me = self(), Sid ! {Me, {finishPlan, NewRes}}, topSupervisor(Sid, NewRes, GoingNum, ActFun, Plan) end.
spawnSubBuilds()函数用以开始一个plan的build,每次收到结果,其都会向父节点发送结果消息,具体的,对于一些简单的plan,例如{act, Act, Arg}, 我们的实现方式为:
case Plan of {act, Act, Arg} -> Res = doSingleAct(PrimarySid, Act, Arg, ActFun), io:format("acr pid is ~w, father pid is ~w ;\n", [self(), FatherSid]), FatherSid ! {self(), {updateResult, Res}}, Res; end doSingleAct(PrimarySid, Act, Arg, ActFun) -> PrimarySid ! {self(), {updateServerOngoingNum, 1}}, case checkActExists(Act, ActFun) of false -> {fail, no_such_action}; true -> {_ActName, Fun} = lists:keyfind(Act, 1, ActFun), try Fun(Arg) of Res -> PrimarySid ! {self(), {updateServerOngoingNum, -1}}, Res catch _Other -> PrimarySid ! {self(), {updateServerOngoingNum, -1}}, {fail, function_terminated} end end.
对于需要并发性的进程,例如{any,plans},我们首先开始执行plan、将所有的plan开启单独的线程, 同时建立一个监视线程(SpySid), 用以汇总所有的消息, 最后SpySid会将所有的结果统计,以 doAny 关键字返回到最开始的调用点,同时向上传递结果,代码如下:
{any, Plans} -> Me = self(), % zero length plan fails case length(Plans) == 0 of true -> FatherSid ! {Me, {updateResult, {fail, []}}}; false -> io:format("initial do any pid is ~w;\n ", [Me]), doAny(PrimarySid, Me, ActFun, Plans), receive {_From, {okAny, Res}} -> FatherSid ! {Me, {updateResult, Res}}, Res end end; % doAny implementation doAny(PrimarySid, FatherSid, ActFun, Plans) -> Me = self(), MaximumStateNum = 100000, SpySid = spawn_link(fun() -> spyLoopAny(0, MaximumStateNum, Me, {1, 2}, PrimarySid) end), io:format("Spy pid is:~w \n", [SpySid]), State = spawnManyPlans(PrimarySid, SpySid, ActFun, Plans), SpySid ! {Me, {updatePlanSize, length(State)}}, io:format("update planze, ~w\n", [length(State)]), receive {_From, {ok, Res}} -> io:format("receive in doAny Res ~w\n; ", [Res]), FatherSid ! {Me, {okAny, Res}} end. % spyloop implementation spyLoopAny(OngoingNum, StateNum, FatherSid, LastRes, PrimarySid) -> receive {_From, {updateResult, Res}} -> PrimarySid ! {self(), {updateServerOngoingNum, -1}}, io:format("spyLoopAny Res ~w; \n", [Res]), NewOngoingNum = OngoingNum + 1, case Res of {success, _Info} -> % TODO: add complete status FatherSid ! {self(), {ok, Res}}; {fail, _Info} -> io:format("Newnumis~w,statenumis~w\n", [NewOngoingNum, StateNum]), case NewOngoingNum >= StateNum of true -> FatherSid ! {self(), {ok, {fail, []}}}; false -> spyLoopAny(NewOngoingNum, StateNum, FatherSid, Res, PrimarySid) end end; {_From, {updatePlanSize, Num}} -> % avoid late update plan size case LastRes of {success, _Info} -> FatherSid ! {self(), {ok, LastRes}}; {fail, _Info} -> case OngoingNum >= Num of true -> FatherSid ! {self(), {ok, LastRes}}; false -> spyLoopAny(OngoingNum, Num, FatherSid, LastRes, PrimarySid) end; _Others -> io:format("update~w\n", [Num]), spyLoopAny(OngoingNum, Num, FatherSid, LastRes, PrimarySid) end end.
最后,开启很多个sub-plan进程的代码如下,这里的函数的返回值是按照顺序开启的所有sub-plan的子进程的PID值:
spawnManyPlans(PrimarySid, SpySid, ActFun, Plans) -> case Plans of [] -> []; [H | T] -> io:format("begin to spawn plans\n"), PrimarySid ! {self(), {updateServerOngoingNum, 1}}, S = spawn_link(fun() -> spawnSubBuilds(PrimarySid, SpySid, ActFun, H) end), L = spawnManyPlans(PrimarySid, SpySid, ActFun, T), [S] ++ L end.
全部代码
% finished independently by romaLzhih, 罗马字母3 -module(builder). % You are allowed to split your Erlang code in as many files as you % find appropriate. % However, you MUST have a module (this file) called builder. % Export at least the API: -export( [startup/0 , register_action/3 , build/2 , on_completion/2 , status/1 , finish/1]). % You may have other exports as well %%%=================================================================== %%% API %%%=================================================================== startup() -> PID = spawn_link(fun() -> buildLoop([]) end), {ok, PID}. register_action(Builder, Act, Fun) -> Builder ! {self(), {registerAction, Act, Fun}}, receive {_From, Msg} -> Msg end. build(Builder, Plan) -> Builder ! {self(), {startBuild, Plan}}, receive % handle possible error here {_From, BuildRef} -> BuildRef end. on_completion(BuildRef, Fun) -> BuildRef ! {self(), {addCompFunc, Fun}}. status(BuildRef) -> BuildRef ! {self(), {queryStatus}}, receive {_From, Mag} -> Mag end. % builder loop buildLoop(ActFun) -> receive {From, {registerAction, Act, Fun}} -> case checkActExists(Act, ActFun) of true -> From ! {self(), {error, already_defined}}, NewActFun = ActFun; false -> NewActFun = ActFun ++ [{Act, Fun}], From ! {self(), {ok}} end, buildLoop(NewActFun); {From, {startBuild, Plan}} -> ResPID = spawn_link(fun() -> resultPool(Plan, ActFun, [], [], 0) end), ResPID ! {self(), {run}}, From ! {self(), {ok, ResPID}}, buildLoop(ActFun) % TODO stop all sub plans end. % plan loop resultPool(Plan, ActFun, CompletionFunc, BuildPlanRes, OngoingNum) -> receive % run the plan {_From, {run}} -> Me = self(), Res = spawnReceiveforBuildPlan(Me, ActFun, Plan), NewRes = runCompFunction(Res, CompletionFunc), io:format("Final Res is ~w\n", [NewRes]), resultPool(Plan, ActFun, CompletionFunc, NewRes, OngoingNum); % add comp Func {_From, {addCompFunc, Fun}} -> NewCompletionFunc = CompletionFunc ++ [Fun], NewRes = runCompFunction(BuildPlanRes, CompletionFunc), resultPool(Plan, ActFun, NewCompletionFunc, NewRes, OngoingNum); % how many ongoing sub-plan {_From, {updateServerOngoingNum, Num}} -> NewOngoingNum = OngoingNum + Num, resultPool(Plan, ActFun, CompletionFunc, BuildPlanRes, NewOngoingNum); %query state {From, {queryStatus}} -> case BuildPlanRes of [] -> From ! {self(), {ongoing, OngoingNum}}; {fail, Val} -> From ! {self(), {failure, Val}}; {success, Val} -> From ! {self(), {success, Val}}; _Others -> From ! {self(), {abort, aborted}} end end. runCompFunction(Res, Fun) -> case Res of [] -> []; _Others -> case Fun of [] -> Res; _Others -> runManyCompFunctions(Res, Fun) end end. runManyCompFunctions(Res, Fun) -> case Fun of [] -> Res; [H | T] -> try H(Res) of NewRes -> runManyCompFunctions(NewRes, T) catch _e -> {errorCompleteFunction, aborted} end end. finish(Builder) -> exit(Builder, normal). spawnReceiveforBuildPlan(PrimarySid, ActFun, Plan) -> TopSupervisorPid = spawn_link(fun() -> topSupervisor(PrimarySid, [], 0, ActFun, Plan) end), TopSupervisorPid ! {self(), {start, backup}}, receive {_From, {finishPlan, Response}} -> case Response of [_Val] -> Response; _Others -> Response end end. % top supervisor used to distribute build to sub supervisors topSupervisor(Sid, Res, GoingNum, ActFun, Plan) -> receive {From, {start, backup}} -> Me = self(), Pid = spawn_link(fun() -> spawnSubBuilds(Sid, Me, ActFun, Plan) end), %io:format("spawnSubbuilds Pid is ~w, supervisor pid is ~w; ", [Pid, self()]), topSupervisor(From, Res, GoingNum, ActFun, Plan); {_From, {updateResult, NewRes}} -> Me = self(), Sid ! {Me, {finishPlan, NewRes}}, topSupervisor(Sid, NewRes, GoingNum, ActFun, Plan) end. % start semantics analysis spawnSubBuilds(PrimarySid, FatherSid, ActFun, Plan) -> case Plan of {act, Act, Arg} -> Res = doSingleAct(PrimarySid, Act, Arg, ActFun), io:format("acr pid is ~w, father pid is ~w ;\n", [self(), FatherSid]), FatherSid ! {self(), {updateResult, Res}}, Res; {seq, Plans} -> Res = doSeq(PrimarySid, self(), ActFun, Plans), FatherSid ! {self(), {updateResult, Res}}, %io:format("seq pid is ~w, father pid is ~w \n;", [self(), FatherSid]), Res; {any, Plans} -> Me = self(), % zero length plan fails case length(Plans) == 0 of true -> FatherSid ! {Me, {updateResult, {fail, []}}}; false -> io:format("initial do any pid is ~w;\n ", [Me]), doAny(PrimarySid, Me, ActFun, Plans), receive {_From, {okAny, Res}} -> FatherSid ! {Me, {updateResult, Res}}, Res end end; {all, Plans} -> Me = self(), % by default there is no empty plans doAll(PrimarySid, Me, ActFun, Plans), receive {_From, {okAll, Res}} -> FatherSid ! {Me, {updateResult, Res}}, io:format("Res pass in spawn sub builds ~w\n", [Res]), Res end; {failure_is_success, NewPlan} -> Me = self(), spawn_link(fun() -> doFailisSucs(PrimarySid, Me, ActFun, NewPlan) end), receive {_From, {okFailIsSucs, Res}} -> FatherSid ! {Me, {updateResult, Res}}, Res end; {and_then, NewPlan, FunBuild} -> Me = self(), spawn_link(fun() -> doAndThen(PrimarySid, Me, ActFun, NewPlan, FunBuild) end), receive {_From, {okAndThen, Res}} -> FatherSid ! {Me, {updateResult, Res}}, Res end; {within, Limit, NewPlan} -> Me = self(), io:format("time limit is ~w\n", [Limit]), spawn_link(fun() -> doTimeLimit(PrimarySid, Me, ActFun, NewPlan, 10 * Limit) end), receive {_From, {okTime, Res}} -> FatherSid ! {Me, {updateResult, Res}}, Res end end. checkActExists(Act, ActFun) -> case lists:keyfind(Act, 1, ActFun) of false -> false; _Others -> true end. % do act build, if failed/exception, throw terminated doSingleAct(PrimarySid, Act, Arg, ActFun) -> PrimarySid ! {self(), {updateServerOngoingNum, 1}}, case checkActExists(Act, ActFun) of false -> {fail, no_such_action}; true -> {_ActName, Fun} = lists:keyfind(Act, 1, ActFun), try Fun(Arg) of Res -> PrimarySid ! {self(), {updateServerOngoingNum, -1}}, Res catch _Other -> PrimarySid ! {self(), {updateServerOngoingNum, -1}}, {fail, function_terminated} end end. doSeq(PrimarySid, FatherSid, ActFun, Plans) -> case Plans of [] -> []; [H | T] -> PrimarySid ! {self(), {updateServerOngoingNum, 1}}, Res = spawnSubBuilds(PrimarySid, self(), ActFun, H), PrimarySid ! {self(), {updateServerOngoingNum, -1}}, case Res of {fail, Info} -> {fail, Info}; {success, Info} -> case length(T) == 0 of true -> {success, Info}; false -> doSeq(PrimarySid, self(), ActFun, T) end end end. doAny(PrimarySid, FatherSid, ActFun, Plans) -> Me = self(), MaximumStateNum = 100000, SpySid = spawn_link(fun() -> spyLoopAny(0, MaximumStateNum, Me, {1, 2}, PrimarySid) end), io:format("Spy pid is:~w \n", [SpySid]), State = spawnManyPlans(PrimarySid, SpySid, ActFun, Plans), SpySid ! {Me, {updatePlanSize, length(State)}}, io:format("update planze, ~w\n", [length(State)]), receive {_From, {ok, Res}} -> io:format("receive in doAny Res ~w\n; ", [Res]), FatherSid ! {Me, {okAny, Res}} end. % exit(SpySid, normal). %return plans' state list spawnManyPlans(PrimarySid, SpySid, ActFun, Plans) -> case Plans of [] -> []; [H | T] -> io:format("begin to spawn plans\n"), PrimarySid ! {self(), {updateServerOngoingNum, 1}}, S = spawn_link(fun() -> spawnSubBuilds(PrimarySid, SpySid, ActFun, H) end), L = spawnManyPlans(PrimarySid, SpySid, ActFun, T), [S] ++ L end. spyLoopAny(OngoingNum, StateNum, FatherSid, LastRes, PrimarySid) -> receive {_From, {updateResult, Res}} -> PrimarySid ! {self(), {updateServerOngoingNum, -1}}, io:format("spyLoopAny Res ~w; \n", [Res]), NewOngoingNum = OngoingNum + 1, case Res of {success, _Info} -> % TODO: add complete status FatherSid ! {self(), {ok, Res}}; {fail, _Info} -> io:format("Newnumis~w,statenumis~w\n", [NewOngoingNum, StateNum]), case NewOngoingNum >= StateNum of true -> FatherSid ! {self(), {ok, {fail, []}}}; false -> spyLoopAny(NewOngoingNum, StateNum, FatherSid, Res, PrimarySid) end end; {_From, {updatePlanSize, Num}} -> % avoid late update plan size case LastRes of {success, _Info} -> FatherSid ! {self(), {ok, LastRes}}; {fail, _Info} -> case OngoingNum >= Num of true -> FatherSid ! {self(), {ok, LastRes}}; false -> spyLoopAny(OngoingNum, Num, FatherSid, LastRes, PrimarySid) end; _Others -> io:format("update~w\n", [Num]), spyLoopAny(OngoingNum, Num, FatherSid, LastRes, PrimarySid) end end. doAll(PrimarySid, FatherSid, ActFun, Plans) -> Me = self(), MaximumStateNum = 100000, SpySid = spawn_link(fun() -> spyLoopAll(0, MaximumStateNum, Me, [], PrimarySid) end), State = spawnManyPlans(PrimarySid, SpySid, ActFun, Plans), SpySid ! {Me, {updatePlanSize, length(State)}}, receive {_From, {ok, Res}} -> io:format("Res is ~w\n", [Res]), case Res of [{_Pid, {fail, Info}}] -> FatherSid ! {Me, {okAll, {fail, Info}}}; _Others -> DrawRes = mappingRes(State, Res), FatherSid ! {Me, {okAll, {success, DrawRes}}} end end. mappingRes(State, Res) -> case State of [] -> []; [Pid | T] -> {_P, {_St, Val}} = lists:keyfind(Pid, 1, Res), [Val] ++ mappingRes(T, Res) end. spyLoopAll(OngoingNum, StateNum, FatherSid, LastRes, PrimarySid) -> receive {From, {updateResult, Res}} -> case Res of {success, _Info} -> PrimarySid ! {self(), {updateServerOngoingNum, -1}}, % may bug here, depends on program speed NewLastRes = [{From, Res}] ++ LastRes, NewOngoingNum = OngoingNum + 1, case NewOngoingNum >= StateNum of true -> FatherSid ! {self(), {ok, NewLastRes}}; false -> spyLoopAll(NewOngoingNum, StateNum, FatherSid, NewLastRes, PrimarySid) end; {fail, _Info} -> PrimarySid ! {self(), {updateServerOngoingNum, -1}}, NewState = [{From, Res}], FatherSid ! {self(), {ok, NewState}} end; {_From, {updatePlanSize, NewStateNum}} -> case NewStateNum > length(LastRes) of true -> spyLoopAll(OngoingNum, NewStateNum, FatherSid, LastRes, PrimarySid); false -> FatherSid ! {self(), {ok, LastRes}} end end. doFailisSucs(PrimarySid, FatherSid, ActFun, Plan) -> Me = self(), PrimarySid ! {self(), {updateServerOngoingNum, 1}}, spawn_link(fun() -> spawnSubBuilds(PrimarySid, Me, ActFun, Plan) end), receive {_From, {updateResult, Res}} -> PrimarySid ! {self(), {updateServerOngoingNum, -1}}, case Res of {success, Val} -> FatherSid ! {Me, {okFailIsSucs, {fail, Val}}}; {fail, Val} -> FatherSid ! {Me, {okFailIsSucs, {success, Val}}} end end. doAndThen(PrimarySid, FatherSid, ActFun, Plan, FunBuild) -> Me = self(), PrimarySid ! {self(), {updateServerOngoingNum, 1}}, spawn_link(fun() -> spawnSubBuilds(PrimarySid, Me, ActFun, Plan) end), receive {_From, {updateResult, Res}} -> case Res of {success, Val} -> io:format("run plan change\n"), try FunBuild(Val) of NewPlan -> spawn_link(fun() -> spawnSubBuilds(PrimarySid, Me, ActFun, NewPlan) end), receive {_From1, {updateResult, NewRes}} -> PrimarySid ! {self(), {updateServerOngoingNum, -1}}, FatherSid ! {Me, {okAndThen, NewRes}} end catch _ -> io:format("catchexception\n"), PrimarySid ! {self(), {updateServerOngoingNum, -1}}, FatherSid ! {Me, {okAndThen, {fail, not_a_fun_plan}}} end; {fail, Val} -> PrimarySid ! {self(), {updateServerOngoingNum, -1}}, FatherSid ! {Me, {okAndThen, {fail, Val}}} end end. doTimeLimit(PrimarySid, FatherSid, ActFun, Plan, Limit) -> Me = self(), PrimarySid ! {self(), {updateServerOngoingNum, 1}}, PID = spawn_link(fun() -> spawnSubBuilds(PrimarySid, Me, ActFun, Plan) end), receive {_From, {updateResult, Res}} -> PrimarySid ! {self(), {updateServerOngoingNum, -1}}, FatherSid ! {Me, {okTime, Res}} after Limit -> io:format("hi\n"), exit(PID, normal), FatherSid ! {Me, {okTime, {fail, limit_exceeded}}} end.