Erlang 简介与样例分析

Erlang 简介

Erlang 是由爱立信开发的一个函数式编程语言,主要用于并发编程,服务器架构等用途,Erlang得名于丹麦数学家及统计学家Agner Krarup Erlang,同时Erlang还可以表示Ericsson Language。

由于Erlang是函数式编程语言,其中一个最明显的特征就是没有循环,所有的循环都得通过递归(recurssive)的方式来实现。 Erlang无法定义一个变量, 其变量赋值方式类似于python,但是一个变量无法赋值两次,虽然条件很苛刻, 但是对比其他函数式编程语言,比如Haskell、Prolog 等还是要友好许多的。

题目简介 (UCPH 2021 AP Re-exam)

你的目的就是建立一个builder,其作用是给定一个plan, 你可以build这个plan,同时返回结果。 要求build的过程可以并发,既可以同时build多个plan,同时也可以多个builder来同时工作。

具体的, 一个plan可以分为很多个子类型,每个子类型又可以包含多个plan (嵌套结构):

Erlang 简介与样例分析

同时这个module还要提供一下接口:

Erlang 简介与样例分析

Erlang 基本语法

Erlang用来表示代码所属层级关系的是标点符号,其中 , 表示同级, 。 表示这一层级的结束,; 表示在同样的层级关系中。

Erlang 的文件开始需要有一个与文件名字同样的标识符:

-module(builder).

同时将所有的函数API放置到export接口中:

-export(
[startup/0
  , register_action/3
  , build/2
  , on_completion/2
  , status/1
  , finish/1]).

Erlang保持高并发性的方法就是通过 PID = spawn_link() 的方法来创建一个独立的并发进程,其中PID就是创建的新线程的地址,譬如:

startup() ->
  PID = spawn_link(fun() -> buildLoop([]) end),
  {ok, PID}.

{ok, PID} 是这个函数的返回值,而 buildLoop( [ ] ) 则是开启线程的函数名字。

在一个函数中我们可以通过进程的 PID 来进行消息的传递和发送:

register_action(Builder, Act, Fun) ->
  Builder ! {self(), {registerAction, Act, Fun}},
  receive
    {_From, Msg} -> Msg
  end.

其中 !为发送符号,代表向Builder标识的进程发送后面tuple元素标记的消息。 receive 代码块为接收消息的代码块,负责接收发送给所处进程的消息。

在Erlang里面,所有小写字母开始的为atom, 为不可变量,同时atom可以作为函数名;而大写字母开始的是一般变量名; 对于那些定义过但是不会使用的变量,会在变量名字面前加下划线来特定标识。

对于一个服务器进程,一般的写法是:

buildLoop(ActFun) ->
  receive
    {From, {registerAction, Act, Fun}} ->
      case checkActExists(Act, ActFun) of
        true ->
          From ! {self(), {error, already_defined}},
          NewActFun = ActFun;
        false ->
          NewActFun = ActFun ++ [{Act, Fun}],
          From ! {self(), {ok}}
      end,
      buildLoop(NewActFun);
    {From, {startBuild, Plan}} ->
      ResPID = spawn_link(fun() -> resultPool(Plan, ActFun, [], [], 0) end),
      ResPID ! {self(), {run}},
      From ! {self(), {ok, ResPID}},
      buildLoop(ActFun)
  end.

其中不同的block末尾采用了尾递归的技术,将服务器(循环)的状态保存了下来作为下次循环(新状态)的开始。

Erlang也提供了一些常用的函数接口,例如,对于一个tuple列表,要查找在tuple指定位置上匹配的那个元素,可以通过下面的方式实现,其函数调用方式与python还是类似的:

checkActExists(Act, ActFun) ->
  case lists:keyfind(Act, 1, ActFun) of
    false -> false;
    _Others -> true
  end.

Erlang有专门的代码结构框架叫做gen_server,其中所有的消息都是通过gen_server内置的函数来更新到云端,一个称之为状态(state) 的容器里。 gen_server的精髓就是状态的设计,gen_server有自己的报错系统, 可以很快速定位到自己的问题所在,当然这里不方便使用gen_server,因为我们要求同时并发跑好多的builder, 而gen_server无法支持同时跑很多个gen_sevrer, 所有我们之类选择最简单的spawn_link()。

题目解法

这道题目的难点在于处理嵌套plan的手段,由于嵌套plan是需要并发性的,那么就无法通过简单的值传递来处理汇总结果信息,我们需要并发的开启多个线程,同时用单独的监视线程来汇总结果。

置顶向下的看,首先我们开启一个buildLoop()之后, 当其收到开始build的消息就开启另一个resultPool(),这个循环专门负责处理关于plan执行结果的查询,同时开启plan的执行。

{_From, {run}} ->
  Me = self(),
  Res = spawnReceiveforBuildPlan(Me, ActFun, Plan),
  NewRes = runCompFunction(Res, CompletionFunc),
  io:format("Final Res is ~w\n", [NewRes]),
  resultPool(Plan, ActFun, CompletionFunc, NewRes, OngoingNum);

spawnReceiveforBuildPlan()为外调函数,本质上是可以写进上述block中的,代码如下:

spawnReceiveforBuildPlan(PrimarySid, ActFun, Plan) ->
  TopSupervisorPid = spawn_link(fun() ->
    topSupervisor(PrimarySid, [], 0, ActFun, Plan) end),
  TopSupervisorPid ! {self(), {start, backup}},
  receive
    {_From, {finishPlan, Response}} ->
      case Response of
        [_Val] -> Response;
        _Others -> Response
      end
  end.

其中TopSupervisor用以执行汇总最后的结果,将其单独拿出来以避免之后嵌套plan带来的进程消息混乱,代码如下:

topSupervisor(Sid, Res, GoingNum, ActFun, Plan) ->
  receive
    {From, {start, backup}} ->
      Me = self(),
      Pid = spawn_link(fun() -> spawnSubBuilds(Sid, Me, ActFun, Plan) end),
      %io:format("spawnSubbuilds Pid is ~w, supervisor pid is ~w; ", [Pid, self()]),
      topSupervisor(From, Res, GoingNum, ActFun, Plan);

    {_From, {updateResult, NewRes}} ->
      Me = self(),
      Sid ! {Me, {finishPlan, NewRes}},
      topSupervisor(Sid, NewRes, GoingNum, ActFun, Plan)
  end.

spawnSubBuilds()函数用以开始一个plan的build,每次收到结果,其都会向父节点发送结果消息,具体的,对于一些简单的plan,例如{act, Act, Arg}, 我们的实现方式为:

case Plan of
  {act, Act, Arg} ->
    Res = doSingleAct(PrimarySid, Act, Arg, ActFun),
    io:format("acr pid is ~w, father pid is ~w ;\n", [self(), FatherSid]),
    FatherSid ! {self(), {updateResult, Res}},
    Res;
end

doSingleAct(PrimarySid, Act, Arg, ActFun) ->
  PrimarySid ! {self(), {updateServerOngoingNum, 1}},
  case checkActExists(Act, ActFun) of
    false -> {fail, no_such_action};
    true ->
      {_ActName, Fun} = lists:keyfind(Act, 1, ActFun),
      try Fun(Arg) of
        Res ->
          PrimarySid ! {self(), {updateServerOngoingNum, -1}},
          Res
      catch
        _Other ->
          PrimarySid ! {self(), {updateServerOngoingNum, -1}},
          {fail, function_terminated}
      end
  end.

对于需要并发性的进程,例如{any,plans},我们首先开始执行plan、将所有的plan开启单独的线程, 同时建立一个监视线程(SpySid), 用以汇总所有的消息, 最后SpySid会将所有的结果统计,以 doAny 关键字返回到最开始的调用点,同时向上传递结果,代码如下:

{any, Plans} ->
  Me = self(),
  % zero length plan fails
  case length(Plans) == 0 of
    true ->
      FatherSid ! {Me, {updateResult, {fail, []}}};
    false ->
      io:format("initial do any pid is ~w;\n  ", [Me]),
      doAny(PrimarySid, Me, ActFun, Plans),
      receive
        {_From, {okAny, Res}} ->
          FatherSid ! {Me, {updateResult, Res}},
          Res
      end
  end;

% doAny implementation
doAny(PrimarySid, FatherSid, ActFun, Plans) ->
  Me = self(),
  MaximumStateNum = 100000,
  SpySid = spawn_link(fun() -> spyLoopAny(0, MaximumStateNum, Me, {1, 2}, PrimarySid) end),
  io:format("Spy pid is:~w \n", [SpySid]),
  State = spawnManyPlans(PrimarySid, SpySid, ActFun, Plans),
  SpySid ! {Me, {updatePlanSize, length(State)}},
  io:format("update planze, ~w\n", [length(State)]),
  receive
    {_From, {ok, Res}} ->
      io:format("receive in doAny Res ~w\n; ", [Res]),
      FatherSid ! {Me, {okAny, Res}}
  end.

% spyloop implementation
spyLoopAny(OngoingNum, StateNum, FatherSid, LastRes, PrimarySid) ->
  receive
    {_From, {updateResult, Res}} ->
      PrimarySid ! {self(), {updateServerOngoingNum, -1}},
      io:format("spyLoopAny Res ~w; \n", [Res]),
      NewOngoingNum = OngoingNum + 1,
      case Res of
        {success, _Info} ->
          % TODO: add complete status
          FatherSid ! {self(), {ok, Res}};
        {fail, _Info} ->
          io:format("Newnumis~w,statenumis~w\n", [NewOngoingNum, StateNum]),
          case NewOngoingNum >= StateNum of
            true -> FatherSid ! {self(), {ok, {fail, []}}};
            false -> spyLoopAny(NewOngoingNum, StateNum, FatherSid, Res, PrimarySid)
          end
      end;
    {_From, {updatePlanSize, Num}} ->
      % avoid late update plan size
      case LastRes of
        {success, _Info} ->
          FatherSid ! {self(), {ok, LastRes}};
        {fail, _Info} ->
          case OngoingNum >= Num of
            true -> FatherSid ! {self(), {ok, LastRes}};
            false ->
              spyLoopAny(OngoingNum, Num, FatherSid, LastRes, PrimarySid)
          end;
        _Others ->
          io:format("update~w\n", [Num]),
          spyLoopAny(OngoingNum, Num, FatherSid, LastRes, PrimarySid)
      end
  end.

最后,开启很多个sub-plan进程的代码如下,这里的函数的返回值是按照顺序开启的所有sub-plan的子进程的PID值:

spawnManyPlans(PrimarySid, SpySid, ActFun, Plans) ->
  case Plans of
    [] -> [];
    [H | T] ->
      io:format("begin to spawn plans\n"),
      PrimarySid ! {self(), {updateServerOngoingNum, 1}},
      S = spawn_link(fun() -> spawnSubBuilds(PrimarySid, SpySid, ActFun, H) end),
      L = spawnManyPlans(PrimarySid, SpySid, ActFun, T),
      [S] ++ L
  end.

全部代码

% finished independently by romaLzhih, 罗马字母3

-module(builder).

% You are allowed to split your Erlang code in as many files as you
% find appropriate.
% However, you MUST have a module (this file) called builder.

% Export at least the API:
-export(
[startup/0
  , register_action/3
  , build/2
  , on_completion/2
  , status/1
  , finish/1]).

% You may have other exports as well

%%%===================================================================
%%% API
%%%===================================================================

startup() ->
  PID = spawn_link(fun() -> buildLoop([]) end),
  {ok, PID}.

register_action(Builder, Act, Fun) ->
  Builder ! {self(), {registerAction, Act, Fun}},
  receive
    {_From, Msg} -> Msg
  end.

build(Builder, Plan) ->
  Builder ! {self(), {startBuild, Plan}},
  receive
  % handle possible error here
    {_From, BuildRef} -> BuildRef
  end.

on_completion(BuildRef, Fun) ->
  BuildRef ! {self(), {addCompFunc, Fun}}.

status(BuildRef) ->
  BuildRef ! {self(), {queryStatus}},
  receive
    {_From, Mag} -> Mag
  end.

% builder loop
buildLoop(ActFun) ->
  receive
    {From, {registerAction, Act, Fun}} ->
      case checkActExists(Act, ActFun) of
        true ->
          From ! {self(), {error, already_defined}},
          NewActFun = ActFun;
        false ->
          NewActFun = ActFun ++ [{Act, Fun}],
          From ! {self(), {ok}}
      end,
      buildLoop(NewActFun);
    {From, {startBuild, Plan}} ->
      ResPID = spawn_link(fun() -> resultPool(Plan, ActFun, [], [], 0) end),
      ResPID ! {self(), {run}},
      From ! {self(), {ok, ResPID}},
      buildLoop(ActFun)
  % TODO stop all sub plans
  end.

% plan loop
resultPool(Plan, ActFun, CompletionFunc, BuildPlanRes, OngoingNum) ->
  receive
  % run the plan
    {_From, {run}} ->
      Me = self(),
      Res = spawnReceiveforBuildPlan(Me, ActFun, Plan),
      NewRes = runCompFunction(Res, CompletionFunc),
      io:format("Final Res is ~w\n", [NewRes]),
      resultPool(Plan, ActFun, CompletionFunc, NewRes, OngoingNum);
  % add comp Func
    {_From, {addCompFunc, Fun}} ->
      NewCompletionFunc = CompletionFunc ++ [Fun],
      NewRes = runCompFunction(BuildPlanRes, CompletionFunc),
      resultPool(Plan, ActFun, NewCompletionFunc, NewRes, OngoingNum);
  % how many ongoing sub-plan
    {_From, {updateServerOngoingNum, Num}} ->
      NewOngoingNum = OngoingNum + Num,
      resultPool(Plan, ActFun, CompletionFunc, BuildPlanRes, NewOngoingNum);
  %query state
    {From, {queryStatus}} ->
      case BuildPlanRes of
        [] -> From ! {self(), {ongoing, OngoingNum}};
        {fail, Val} -> From ! {self(), {failure, Val}};
        {success, Val} -> From ! {self(), {success, Val}};
        _Others -> From ! {self(), {abort, aborted}}
      end
  end.

runCompFunction(Res, Fun) ->
  case Res of
    [] -> [];
    _Others ->
      case Fun of
        [] -> Res;
        _Others -> runManyCompFunctions(Res, Fun)
      end
  end.

runManyCompFunctions(Res, Fun) ->
  case Fun of
    [] -> Res;
    [H | T] ->
      try H(Res) of
        NewRes -> runManyCompFunctions(NewRes, T)
      catch
        _e -> {errorCompleteFunction, aborted}
      end
  end.

finish(Builder) ->
  exit(Builder, normal).

spawnReceiveforBuildPlan(PrimarySid, ActFun, Plan) ->
  TopSupervisorPid = spawn_link(fun() ->
    topSupervisor(PrimarySid, [], 0, ActFun, Plan) end),
  TopSupervisorPid ! {self(), {start, backup}},
  receive
    {_From, {finishPlan, Response}} ->
      case Response of
        [_Val] -> Response;
        _Others -> Response
      end
  end.

% top supervisor used to distribute build to sub supervisors
topSupervisor(Sid, Res, GoingNum, ActFun, Plan) ->
  receive
    {From, {start, backup}} ->
      Me = self(),
      Pid = spawn_link(fun() -> spawnSubBuilds(Sid, Me, ActFun, Plan) end),
      %io:format("spawnSubbuilds Pid is ~w, supervisor pid is ~w; ", [Pid, self()]),
      topSupervisor(From, Res, GoingNum, ActFun, Plan);

    {_From, {updateResult, NewRes}} ->
      Me = self(),
      Sid ! {Me, {finishPlan, NewRes}},
      topSupervisor(Sid, NewRes, GoingNum, ActFun, Plan)
  end.

% start semantics analysis
spawnSubBuilds(PrimarySid, FatherSid, ActFun, Plan) ->
  case Plan of
    {act, Act, Arg} ->
      Res = doSingleAct(PrimarySid, Act, Arg, ActFun),
      io:format("acr pid is ~w, father pid is ~w ;\n", [self(), FatherSid]),
      FatherSid ! {self(), {updateResult, Res}},
      Res;
    {seq, Plans} ->
      Res = doSeq(PrimarySid, self(), ActFun, Plans),
      FatherSid ! {self(), {updateResult, Res}},
      %io:format("seq pid is ~w, father pid is ~w \n;", [self(), FatherSid]),
      Res;
    {any, Plans} ->
      Me = self(),
      % zero length plan fails
      case length(Plans) == 0 of
        true ->
          FatherSid ! {Me, {updateResult, {fail, []}}};
        false ->
          io:format("initial do any pid is ~w;\n  ", [Me]),
          doAny(PrimarySid, Me, ActFun, Plans),
          receive
            {_From, {okAny, Res}} ->
              FatherSid ! {Me, {updateResult, Res}},
              Res
          end
      end;
    {all, Plans} ->
      Me = self(),
      % by default there is no empty plans
      doAll(PrimarySid, Me, ActFun, Plans),
      receive
        {_From, {okAll, Res}} ->
          FatherSid ! {Me, {updateResult, Res}},
          io:format("Res pass in spawn sub builds ~w\n", [Res]),
          Res
      end;
    {failure_is_success, NewPlan} ->
      Me = self(),
      spawn_link(fun() -> doFailisSucs(PrimarySid, Me, ActFun, NewPlan) end),
      receive
        {_From, {okFailIsSucs, Res}} ->
          FatherSid ! {Me, {updateResult, Res}},
          Res
      end;
    {and_then, NewPlan, FunBuild} ->
      Me = self(),
      spawn_link(fun() -> doAndThen(PrimarySid, Me, ActFun, NewPlan, FunBuild) end),
      receive
        {_From, {okAndThen, Res}} ->
          FatherSid ! {Me, {updateResult, Res}},
          Res
      end;
    {within, Limit, NewPlan} ->
      Me = self(),
      io:format("time limit is ~w\n", [Limit]),
      spawn_link(fun() -> doTimeLimit(PrimarySid, Me, ActFun, NewPlan, 10 * Limit) end),
      receive
        {_From, {okTime, Res}} ->
          FatherSid ! {Me, {updateResult, Res}},
          Res
      end
  end.

checkActExists(Act, ActFun) ->
  case lists:keyfind(Act, 1, ActFun) of
    false -> false;
    _Others -> true
  end.

% do act build, if failed/exception, throw terminated
doSingleAct(PrimarySid, Act, Arg, ActFun) ->
  PrimarySid ! {self(), {updateServerOngoingNum, 1}},
  case checkActExists(Act, ActFun) of
    false -> {fail, no_such_action};
    true ->
      {_ActName, Fun} = lists:keyfind(Act, 1, ActFun),
      try Fun(Arg) of
        Res ->
          PrimarySid ! {self(), {updateServerOngoingNum, -1}},
          Res
      catch
        _Other ->
          PrimarySid ! {self(), {updateServerOngoingNum, -1}},
          {fail, function_terminated}
      end
  end.

doSeq(PrimarySid, FatherSid, ActFun, Plans) ->
  case Plans of
    [] -> [];
    [H | T] ->
      PrimarySid ! {self(), {updateServerOngoingNum, 1}},
      Res = spawnSubBuilds(PrimarySid, self(), ActFun, H),
      PrimarySid ! {self(), {updateServerOngoingNum, -1}},
      case Res of
        {fail, Info} -> {fail, Info};
        {success, Info} ->
          case length(T) == 0 of
            true -> {success, Info};
            false -> doSeq(PrimarySid, self(), ActFun, T)
          end
      end
  end.

doAny(PrimarySid, FatherSid, ActFun, Plans) ->
  Me = self(),
  MaximumStateNum = 100000,
  SpySid = spawn_link(fun() -> spyLoopAny(0, MaximumStateNum, Me, {1, 2}, PrimarySid) end),
  io:format("Spy pid is:~w \n", [SpySid]),
  State = spawnManyPlans(PrimarySid, SpySid, ActFun, Plans),
  SpySid ! {Me, {updatePlanSize, length(State)}},
  io:format("update planze, ~w\n", [length(State)]),
  receive
    {_From, {ok, Res}} ->
      io:format("receive in doAny Res ~w\n; ", [Res]),
      FatherSid ! {Me, {okAny, Res}}
  end.
% exit(SpySid, normal).

%return plans' state list
spawnManyPlans(PrimarySid, SpySid, ActFun, Plans) ->
  case Plans of
    [] -> [];
    [H | T] ->
      io:format("begin to spawn plans\n"),
      PrimarySid ! {self(), {updateServerOngoingNum, 1}},
      S = spawn_link(fun() -> spawnSubBuilds(PrimarySid, SpySid, ActFun, H) end),
      L = spawnManyPlans(PrimarySid, SpySid, ActFun, T),
      [S] ++ L
  end.

spyLoopAny(OngoingNum, StateNum, FatherSid, LastRes, PrimarySid) ->
  receive
    {_From, {updateResult, Res}} ->
      PrimarySid ! {self(), {updateServerOngoingNum, -1}},
      io:format("spyLoopAny Res ~w; \n", [Res]),
      NewOngoingNum = OngoingNum + 1,
      case Res of
        {success, _Info} ->
          % TODO: add complete status
          FatherSid ! {self(), {ok, Res}};
        {fail, _Info} ->
          io:format("Newnumis~w,statenumis~w\n", [NewOngoingNum, StateNum]),
          case NewOngoingNum >= StateNum of
            true -> FatherSid ! {self(), {ok, {fail, []}}};
            false -> spyLoopAny(NewOngoingNum, StateNum, FatherSid, Res, PrimarySid)
          end
      end;
    {_From, {updatePlanSize, Num}} ->
      % avoid late update plan size
      case LastRes of
        {success, _Info} ->
          FatherSid ! {self(), {ok, LastRes}};
        {fail, _Info} ->
          case OngoingNum >= Num of
            true -> FatherSid ! {self(), {ok, LastRes}};
            false ->
              spyLoopAny(OngoingNum, Num, FatherSid, LastRes, PrimarySid)
          end;
        _Others ->
          io:format("update~w\n", [Num]),
          spyLoopAny(OngoingNum, Num, FatherSid, LastRes, PrimarySid)
      end
  end.

doAll(PrimarySid, FatherSid, ActFun, Plans) ->
  Me = self(),

  MaximumStateNum = 100000,
  SpySid = spawn_link(fun() -> spyLoopAll(0, MaximumStateNum, Me, [], PrimarySid) end),
  State = spawnManyPlans(PrimarySid, SpySid, ActFun, Plans),
  SpySid ! {Me, {updatePlanSize, length(State)}},
  receive
    {_From, {ok, Res}} ->
      io:format("Res is ~w\n", [Res]),
      case Res of
        [{_Pid, {fail, Info}}] -> FatherSid ! {Me, {okAll, {fail, Info}}};
        _Others ->
          DrawRes = mappingRes(State, Res),
          FatherSid ! {Me, {okAll, {success, DrawRes}}}
      end
  end.

mappingRes(State, Res) ->
  case State of
    [] -> [];
    [Pid | T] ->
      {_P, {_St, Val}} = lists:keyfind(Pid, 1, Res),
      [Val] ++ mappingRes(T, Res)
  end.

spyLoopAll(OngoingNum, StateNum, FatherSid, LastRes, PrimarySid) ->
  receive
    {From, {updateResult, Res}} ->
      case Res of
        {success, _Info} ->
          PrimarySid ! {self(), {updateServerOngoingNum, -1}},
          % may bug here, depends on program speed
          NewLastRes = [{From, Res}] ++ LastRes,
          NewOngoingNum = OngoingNum + 1,
          case NewOngoingNum >= StateNum of
            true ->
              FatherSid ! {self(), {ok, NewLastRes}};
            false ->
              spyLoopAll(NewOngoingNum, StateNum, FatherSid, NewLastRes, PrimarySid)
          end;
        {fail, _Info} ->
          PrimarySid ! {self(), {updateServerOngoingNum, -1}},
          NewState = [{From, Res}],
          FatherSid ! {self(), {ok, NewState}}
      end;
    {_From, {updatePlanSize, NewStateNum}} ->
      case NewStateNum > length(LastRes) of
        true ->
          spyLoopAll(OngoingNum, NewStateNum, FatherSid, LastRes, PrimarySid);
        false ->
          FatherSid ! {self(), {ok, LastRes}}
      end
  end.

doFailisSucs(PrimarySid, FatherSid, ActFun, Plan) ->
  Me = self(),
  PrimarySid ! {self(), {updateServerOngoingNum, 1}},
  spawn_link(fun() -> spawnSubBuilds(PrimarySid, Me, ActFun, Plan) end),
  receive
    {_From, {updateResult, Res}} ->
      PrimarySid ! {self(), {updateServerOngoingNum, -1}},
      case Res of
        {success, Val} ->
          FatherSid ! {Me, {okFailIsSucs, {fail, Val}}};
        {fail, Val} ->
          FatherSid ! {Me, {okFailIsSucs, {success, Val}}}
      end
  end.


doAndThen(PrimarySid, FatherSid, ActFun, Plan, FunBuild) ->
  Me = self(),
  PrimarySid ! {self(), {updateServerOngoingNum, 1}},
  spawn_link(fun() -> spawnSubBuilds(PrimarySid, Me, ActFun, Plan) end),
  receive
    {_From, {updateResult, Res}} ->
      case Res of
        {success, Val} ->
          io:format("run plan change\n"),
          try FunBuild(Val) of
            NewPlan ->
              spawn_link(fun() -> spawnSubBuilds(PrimarySid, Me, ActFun, NewPlan) end),
              receive
                {_From1, {updateResult, NewRes}} ->
                  PrimarySid ! {self(), {updateServerOngoingNum, -1}},
                  FatherSid ! {Me, {okAndThen, NewRes}}
              end
          catch
            _ ->
              io:format("catchexception\n"),
              PrimarySid ! {self(), {updateServerOngoingNum, -1}},
              FatherSid ! {Me, {okAndThen, {fail, not_a_fun_plan}}}
          end;
        {fail, Val} ->
          PrimarySid ! {self(), {updateServerOngoingNum, -1}},
          FatherSid ! {Me, {okAndThen, {fail, Val}}}
      end
  end.


doTimeLimit(PrimarySid, FatherSid, ActFun, Plan, Limit) ->
  Me = self(),
  PrimarySid ! {self(), {updateServerOngoingNum, 1}},
  PID = spawn_link(fun() -> spawnSubBuilds(PrimarySid, Me, ActFun, Plan) end),
  receive
    {_From, {updateResult, Res}} ->
      PrimarySid ! {self(), {updateServerOngoingNum, -1}},
      FatherSid ! {Me, {okTime, Res}}
  after Limit ->
    io:format("hi\n"),
    exit(PID, normal),
    FatherSid ! {Me, {okTime, {fail, limit_exceeded}}}
  end.
上一篇:thingsboard rpc应用程序控制传感器模拟


下一篇:TED地道用语学习 - <脆弱的力量> - 1